Spring Reactive Stack(二)Reactor异常处理
不管是在响应式编程还是普通的程序设计中,异常处理都是一个非常重要的方面。
对于Flux
或者Mono
来说,所有的异常都是一个终止的操作,即使你使用了异常处理,原生成序列也不会继续。
但是如果你对异常进行了处理,那么它会将oneError
信号转换成为新的序列的开始,并将替换掉之前上游产生的序列。
先看一个Flux
产生异常的例子
@Test
void test1() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.subscribe(System.out::println);
}
会得到一个异常ErrorCallbackNotImplemented
:
100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
1. onError方法
Reactor
中subscribe
的onError
方法(subscribe第二个参数),就是try catch
的一个具体应用:
@Test
void test2() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.subscribe(System.out::println,
error -> System.err.println("Error: " + error));
}
运行结果:
100 / 10 = 1
100 / 5 = 2
Error: java.lang.ArithmeticException: / by zero
2. onErrorReturn方法
onErrorReturn
可以在遇到异常的时候fallback
到一个静态的默认值:
@Test
void test3() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.onErrorReturn("Divided by zero :(")
.subscribe(System.out::println);
}
运行结果:
100 / 10 = 1
100 / 5 = 2
Divided by zero :(
onErrorReturn
还支持一个Predicate
参数,用来判断要falback
的异常是否满足条件。
public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)
3. onErrorResume方法
onErrorResume
可以在捕获异常之后调用其他的方法。
@Test
void test4() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.onErrorResume(e -> System.out::println)
.subscribe(System.out::println);
}
运行结果:
100 / 10 = 1
100 / 5 = 2
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber@23469199
4. retry方法
retry
的作用就是当遇到异常的时候,重启一个新的序列
@Test
void test8() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.retry(1)
.elapsed()
.subscribe(System.out::println);
}
运行结果:
[4,100 / 10 = 1]
[0,100 / 5 = 2]
[9,100 / 10 = 1]
[0,100 / 5 = 2]
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
elapsed
是用来展示产生的value时间之间的duration。从结果我们可以看到,retry
之前是不会产生异常信息的。
5. doOnError方法
doOnError
只记录异常信息,不破坏原来的React
结构。
@Test
void test5() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.doOnError(error -> System.out.println("we got the error: "+ error))
.subscribe(System.out::println);
}
运行结果:
100 / 10 = 1
100 / 5 = 2
we got the error: java.lang.ArithmeticException: / by zero
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
doOn
系列方法是publisher
的同步钩子方法,在subscriber
触发一系列事件的时候触发
6. doFinally方法
doFinally
可以像传统的同步代码那样使用finally
去做一些事情,比如关闭http
连接,清理资源等。
@Test
void test6() {
Flux.just(10, 5, 0)
.map(i -> "100 / " + i + " = " + (10 / i))
.doFinally(error -> System.out.println("Finally,I will make sure to do something:"+error))
.subscribe(System.out::println);
}
运行结果:
100 / 10 = 1
100 / 5 = 2
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero
Caused by: java.lang.ArithmeticException: / by zero
at ...
Finally,我会确保做一些事情:onError
第二种收尾操作的方法是using
,我们先看一个using
的定义:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)
可以看到using
支持三个参数,resourceSupplier
是一个生成器,用来在subscribe
的时候生成要发送的resource
对象。
sourceSupplier
是一个生成Publisher
的工厂,接收resourceSupplier
传过来的resource
,然后生成Publisher
对象。
resourceCleanup
用来对resource
进行收尾操作。
@Test
void test7() {
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
@Override
public void dispose() {
isDisposed.set(true);
}
@Override
public String toString() {
return "DISPOSABLE";
}
};
Flux.using(
() -> disposableInstance,
disposable -> Flux.just(disposable.toString()),
Disposable::dispose)
.subscribe(System.out::println);
}
相关系列文章
- Spring Reactive Stack(六)响应式 HTTP 请求客户端 WebClient
- Spring Reactive Stack(五)服务端事件推送Server-Sent Events
- Spring Reactive Stack(四)响应式方式访问Redis
- Spring Reactive Stack(三)使用R2DBC访问MySQL
- Spring Reactive Stack(二)Reactor异常处理
- Spring Reactive Stack(一)Spring WebFlux响应式Web框架入门