Spring Reactive Stack(一)Spring WebFlux响应式Web框架入门

Spring WebFluxSpring Framework 5.0中引入的以Reactor为基础的响应式编程Web框架。

WebFlux 的异步处理是基于Reactor实现的,是将输入流适配成MonoFlux进行统一处理。

1. 响应式流(Reactive Streams)

  • Reactor 是一个响应式流,它有对应的发布者(Publisher),用两个类来表示:

    • Flux(返回0-n个元素)
    • Mono(返回0或1个元素)
  • Reactor 的订阅者(Subscriber)则是由Spring框架去完成。

  • 响应式流(Reactive Streams) 其实就是一个规范,其特点:

    • 无阻塞;
    • 一个数据流;
    • 可以异步执行;
    • 能够处理背压;
  • 背压(Backpressure) 可以简单理解为 消费决定生产,生产者可以根据消费压力进行动态调节生产速率的机制。

2. 发布者(Publisher)

由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。

Reactive Stream规范中这种被定义为PublisherPublisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber的需求推送元素。一个Publisher可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。

可以通过下图Excel来理解,1-9行可以看作发布者Publisher提供的元素序列,10-13行的结果计算看作订阅者Subscriber

响应式的一个重要特点:当没有订阅时发布者(Publisher)什么也不做。

FluxMono都是PublisherReactor3实现。Publisher提供了subscribe方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher不会做任何事情,他根据消费情况进行响应。Publisher可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型MonoFlux

WebFlux中,你的方法只需返回Mono或Flux即可。你的代码基本也只和MonoFlux打交道。而WebFlux则会实现SubscriberonNext时将业务开发人员编写的MonoFlux转换为HTTP Response返回给客户端。

3. Mono和Flux的抽象模型

Mono和Flux都是Publisher(发布者)的实现模型。

3.1 Flux

Flux是一个发出(emit)0-N个元素组成的异步序列的Publisher<T>,可以被onComplete信号或者onError信号所终止。 在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError。 下面这张图表示了Flux的抽象模型:

3.2 Mono

Mono是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。 下面这张图表示了Mono的抽象模型(整体和Flux差不多,只不过这里只会发出0-1个元素):

4. Mono API

MonoFlux都是实现org.reactivestreams.Publisher接口的抽象类。

Mono代表0-1个元素的发布者(Publisher)。

  • Mono里面有很多API
    • just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
    • empty():创建一个不包含任何元素,只发布结束消息的序列。
    • justOrEmpty(Optional<? extends T> data):从一个Optional对象或可能为null的对象中创建Mono。只有Optional对象中包含值或对象不为null时,Mono序列才产生对应的元素。
    • error(Throwable error):创建一个只包含错误消息的序列。
    • never():创建一个不包含任何消息通知的序列。
    • delay(Duration duration)delayMillis(long duration):创建一个Mono序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
    • fromCallable()fromCompletionStage()fromFuture()fromRunnable()fromSupplier():分别从 CallableCompletionStageCompletableFutureRunnableSupplier中创建Mono
    • ignoreElements(Publisher source):创建一个Mono序列,忽略作为源的Publisher中的所有元素,只产生结束消息。
    • create():通过create()方法来使用MonoSink来创建Mono
  • API使用案例如下所示。
@Slf4j
@SpringBootTest
public class MonoTest {
    @Test
    public void mono() {
        // 通过just直接赋值
        Mono.just("my name is charles").subscribe(log::info);
        // empty 创建空mono
        Mono.empty().subscribe();
        // ustOrEmpty 只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
        Mono.justOrEmpty(null).subscribe(System.out::println);
        Mono.justOrEmpty("测试justOrEmpty").subscribe(System.out::println);
        Mono.justOrEmpty(Optional.of("测试justOrEmpty")).subscribe(System.out::println);
        // error 创建一个只包含错误消息的序列。
        Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println);
        // never 创建一个不包含任何消息通知的序列。
        Mono.never().subscribe(System.out::println);
        // 延迟生成0
        Mono.delay(Duration.ofMillis(2)).map(String::valueOf).subscribe(log::info);
        // 通过fromRunnable创建,并实现异常处理
        Mono.fromRunnable(() -> {
            System.out.println("thread run");
            throw new RuntimeException("thread run error");
        }).subscribe(System.out::println, System.err::println);
        // 通过Callable
        Mono.fromCallable(() -> "callback function").subscribe(log::info);
        // future
        Mono.fromFuture(CompletableFuture.completedFuture("from future")).subscribe(log::info);
        // 通过runnable
        Mono<Void> runnableMono = Mono.fromRunnable(() -> log.warn(Thread.currentThread().getName()));
        runnableMono.subscribe();
        // 通过使用 Supplier
        Mono.fromSupplier(() -> new Date().toString()).subscribe(log::info);
        // flux中
        Mono.from(Flux.just("from", "flux")).subscribe(log::info);  // 只返回flux第一个
        //通过 create()方法来使用 MonoSink 来创建 Mono。
        Mono.create(sink -> sink.success("测试create")).subscribe(System.out::println);
    }
}

运行结果:

5. Flux API

MonoFlux都是实现org.reactivestreams.Publisher接口的抽象类。

Flux表示连续序列,和Mono的创建方法有些不同,MonoFlux的简化版,Flux可以用来表示流。

  • Flux API
    • just():可以指定序列中包含的全部元素。
    • range():可以用来创建连续数值。
    • empty():创建一个不包含任何元素。
    • error(Throwable error):创建一个只包含错误消息的序列。
    • fromIterable():通过迭代器创建如list,set
    • fromStream():通过流创建
    • fromArray(T[]):通过列表创建 如 String[], Integer[]
    • merge():通过将两个flux合并得到新的flux
    • interval():每隔一段时间生成一个数字,从1开始递增
  • API使用案例如下所示。
@Slf4j
@SpringBootTest
public class FluxTest {
    @Test
    public void flux () throws InterruptedException {
        // 通过just赋值
        Flux<Integer> intFlux = Flux.just(1, 2, 3, 4, 5);
        // 以6开始,取4个值:6,7,8,9
        Flux<Integer> rangeFlux = Flux.range(6, 4);
        // 通过merge合并
        Flux<Integer> intMerge = Flux.merge(intFlux, rangeFlux);
        intMerge.subscribe(System.out::print);
        System.out.println();//换行
        // 通过fromArray构建
        Flux.fromArray(new Integer[]{1,3,5,7,9}).subscribe(System.out::print);
        System.out.println();//换行
        // 通过流和迭代器创建
        Flux<String> strFluxFromStream = Flux.fromStream(Stream.of(" just", " test", " reactor", " Flux", " and", " Mono"));
        Flux<String> strFluxFromList = Flux.fromIterable(Arrays.asList(" just", " test", " reactor", " Flux", " and", " Mono"));
        // 通过merge合并
        Flux<String> strMerge = Flux.merge(strFluxFromStream, strFluxFromList);
        strMerge.subscribe(System.out::print);
        System.out.println();
        // 通过interval创建流数据
        Flux.interval(Duration.ofMillis(100)).map(String::valueOf)
            .subscribe(System.out::print);
        Thread.sleep(2000);
    }
}

运行结果:

123456789
13579
 just test reactor Flux and Mono just test reactor Flux and Mono
012345678910111213141516171819

6. subscribe方法

subscribe()方法表示对数据流的订阅动作,subscribe()方法有多个重载的方法,最多可以传入四个参数;

    @Test
    public void subscribe () throws InterruptedException {
        // 测试Mono
        Mono.just(1).subscribe(System.out::println);
        // 测试Flux
        Flux.just('a', 'b').subscribe(System.out::println);
        // 测试2个参数的subscribe方法
        Flux.just('i', 'j').map(chr -> {
                if ('j'== chr) throw new RuntimeException("test 2 parameters");
                else return String.valueOf(chr);
            })
            .subscribe(System.out::println,    // 参数1,接受内容
                       err -> log.error(err.getMessage())); // 参数2,对err处理的lambda函数
        // 测试3个参数的subscribe方法
        Flux.just("你", "我", "他", "它", "ta")
            .subscribe(System.out::print,   // 参数1,接受内容
                       System.err::println, // 参数2,对err处理的lambda函数
                       () -> System.out.println("complete for 3"));// 参数3,完成subscribe之后执行的lambda函数
        // 测试4个参数的subscribe方法
        Flux.interval(Duration.ofMillis(100))
            .map(i -> {
                if (i == 3) throw new RuntimeException("fake a mistake");
                else return String.valueOf(i);
            })
            .subscribe(info -> log.info("info: {}", info), // 参数1,接受内容
                       err -> log.error("error: {}", err.getMessage()),// 参数2,对err处理的lambda函数
                       () -> log.info("Done"),     // 参数3,完成subscribe之后执行的lambda函数
                       sub -> sub.request(10));  // 参数4,Subscription操作,设定从源头获取元素的个数
        Thread.sleep(2000);
    }

运行结果:

7. 使用StepVerifier测试响应式异步代码

通过expectNext执行类似断言的功能,如果断言不符合实际情况,就会报错。

@Test
public void StepVerifier () {
    // 使用StepVerifier测试Flux,正常
    Flux flux = Flux.just(1, 2, 3, 4, 5, 6);
    StepVerifier.create(flux)
                // 测试下一个期望的数据元素
                .expectNext(1, 2, 3, 4, 5, 6)
                // 测试下一个元素是否为完成信号
                .expectComplete()
                .verify();
    // 使用StepVerifier测试Mono,报错
    Mono<String> mono = Mono.just("charles").log();
    StepVerifier.create(mono)
                .expectNext("char")
                .verifyComplete();
}

运行结果:

java.lang.AssertionError: expectation "expectNext(char)" failed (expected value: char; actual value: charles)
...

参考连接:https://mp.weixin.qq.com/s/O1VGS7d1TLQhgrCaQ-UQCw

END .

相关系列文章

×