Spring Reactive Stack(二)Reactor异常处理

不管是在响应式编程还是普通的程序设计中,异常处理都是一个非常重要的方面。

对于Flux或者Mono来说,所有的异常都是一个终止的操作,即使你使用了异常处理,原生成序列也不会继续。 但是如果你对异常进行了处理,那么它会将oneError信号转换成为新的序列的开始,并将替换掉之前上游产生的序列。

先看一个Flux产生异常的例子

@Test
void test1() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .subscribe(System.out::println);
}

会得到一个异常ErrorCallbackNotImplemented

100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

1. onError方法

ReactorsubscribeonError方法(subscribe第二个参数),就是try catch的一个具体应用:

@Test
void test2() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .subscribe(System.out::println,
            error -> System.err.println("Error: " + error));
}

运行结果:

100 / 10 = 1
100 / 5 = 2
Error: java.lang.ArithmeticException: / by zero

2. onErrorReturn方法

onErrorReturn可以在遇到异常的时候fallback到一个静态的默认值:

@Test
void test3() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .onErrorReturn("Divided by zero :(")
        .subscribe(System.out::println);
}

运行结果:

100 / 10 = 1
100 / 5 = 2
Divided by zero :(

onErrorReturn还支持一个Predicate参数,用来判断要falback的异常是否满足条件。

public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue) 

3. onErrorResume方法

onErrorResume可以在捕获异常之后调用其他的方法。

@Test
void test4() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .onErrorResume(e -> System.out::println)
        .subscribe(System.out::println);
}

运行结果:

100 / 10 = 1
100 / 5 = 2
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber@23469199

4. retry方法

retry的作用就是当遇到异常的时候,重启一个新的序列

@Test
void test8() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .retry(1)
        .elapsed()
        .subscribe(System.out::println);
}

运行结果:

[4,100 / 10 = 1]
[0,100 / 5 = 2]
[9,100 / 10 = 1]
[0,100 / 5 = 2]
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

elapsed是用来展示产生的value时间之间的duration。从结果我们可以看到,retry之前是不会产生异常信息的。

5. doOnError方法

doOnError只记录异常信息,不破坏原来的React结构。

@Test
void test5() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .doOnError(error -> System.out.println("we got the error: "+ error))
        .subscribe(System.out::println);
}

运行结果:

100 / 10 = 1
100 / 5 = 2
we got the error: java.lang.ArithmeticException: / by zero
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero

doOn系列方法是publisher的同步钩子方法,在subscriber触发一系列事件的时候触发

6. doFinally方法

doFinally可以像传统的同步代码那样使用finally去做一些事情,比如关闭http连接,清理资源等。

@Test
void test6() {
    Flux.just(10, 5, 0)
        .map(i -> "100 / " + i + " = " + (10 / i))
        .doFinally(error -> System.out.println("Finally,I will make sure to do something:"+error))
        .subscribe(System.out::println);
}

运行结果:

100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
	at ...
Finally,我会确保做一些事情:onError

第二种收尾操作的方法是using,我们先看一个using的定义:

public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
    Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)

可以看到using支持三个参数,resourceSupplier是一个生成器,用来在subscribe的时候生成要发送的resource对象。 sourceSupplier是一个生成Publisher的工厂,接收resourceSupplier传过来的resource,然后生成Publisher对象。 resourceCleanup用来对resource进行收尾操作。

@Test
void test7() {
    AtomicBoolean isDisposed = new AtomicBoolean();
    Disposable disposableInstance = new Disposable() {
        @Override
        public void dispose() {
            isDisposed.set(true);
        }
        @Override
        public String toString() {
            return "DISPOSABLE";
        }
    };
    Flux.using(
        () -> disposableInstance,
        disposable -> Flux.just(disposable.toString()),
        Disposable::dispose)
        .subscribe(System.out::println);
}
END .

相关系列文章

×