10.3通用响应式操作实战

10.3通用响应式操作实战

FluxMonoReactor提供的最重要的组成部分,而这两个响应式类型所提供的操作就是粘合剂,这些操作将它们结合在一起,来创建数据流动的通道。在FluxMono之间,存在超过500种操作,其中的每一个可以被归类为:

  • 创建操作
  • 联合操作
  • 传输操作
  • 逻辑处理操作

500个操作都印在这里来看看它们是如何工作的,这是件有趣的事情,但是在这一章节中没有这么大的空间给它。因此我在这个章节中选择了几个最有用的操作,我们先从创建操作开始。

注意:

哪里有Mono的例子吗?MonoFlux有很多相同的操作,所以它没有必要展示两次同样的操作。此外,Mono虽然是有用的,但是对比Flux的操作来说,Mono看上去还是没有那么有趣。在我们所写的例子中,大多数使用的都是FluxMono通常情况下有与Flux对等的操作。

10.3.1创建响应式类型

当时长使用Spring中的响应式类型时,会从respository或是service中得到Flux或是Mono,因此需要你自己创建一个。但是有时候你需要创建一个新的响应式发布者。

Reactor为创建FluxMono提供了多个操作。在本节中,我们将介绍一些最有用的创建操作。

从对象进行创建

如果你想从Flux或是Mono创建一个或多个对象,你可以FluxMono中的静态方法just()去创建一个响应式类型,其中的数据由这些对象驱动。例如,下面这个测试方法就是使用5String对象来创建一个Flux

@Test
public void createAFlux_just() {
    Flux<String> fruitFlux = Flux
        .just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}

这样就创建了一个Flux,但它没有订阅者。要是没有订阅者,数据不会流动。以花园软管的思路进行类比,你已经把软管接到出水口了,另一端就是从自来水公司流出的水。但是水不会流动,除非你打开水龙头。对响应式类型的订阅就是打开数据流的方式。

要添加一个订阅者,可以调用Flux中的subscribe()方法:

fruitFlux.subscribe(
    f -> System.out.println("Here's some fruit: " + f);
);

subscribe()中的lambda表达式实际上是java.util.Consumer,用于创建响应式流的Subscriber。由于调用了subscribe()方法,数据开始流动了。在这个例子中,不存在中间操作,因此数据直接从Flux流到了Subscriber

为了在运行过程中观察响应式类型,一个好方法就是将FluxMono打印到控制台里面。但是,测试FluxMono更好的方式是使用Reactor中的StepVerifier。给定一个FluxMonoStepVerifier订阅这个响应式类型,然后对流中流动的数据应用断言,最后验证流以预期方式完成。

例如,为了验证规定的数据流经fruitFlux,你可以写一个测试,如下所示:

StepVerifier.create(fruitFlux)
    .expectNext("Apple")
    .expectNext("Orange")
    .expectNext("Grape")
    .expectNext("Banana")
    .expectNext("Strawberry")
    .verifyComplete();

这个例子中,StepVerifier订阅了Flux,然后对每一个匹配到的期望的水果名字做断言。最后,它验证了Strawberry是由Flux生成的,对Flux的验证完毕。

在本章余下的示例中,将使用StepVerifier编写测试用例以验证某些行为,并帮助你了解某些操作是如何工作的,从而了解一些Reactor最有用的操作。

从集合创建

Flux也可从任何的集合创建,如Iterable或是Java Stream。图10.3使用弹珠图绘制了这是如何运行的:

10.3 Flux可由数组、Iterable或是Stream创建

![Acrobat_EgcrkessnX](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Acrobat_EgcrkessnX.png)

为了从数组创建一个Flux,调用静态方法fromArray(),然后将数组作为数据源传入:

@Test
public void createAFlux_fromArray() {
    String[] fruits = new String[] {
        "Apple", "Orange", "Grape", "Banana", "Strawberry" };

    Flux<String> fruitFlux = Flux.fromArray(fruits);
    StepVerifier.create(fruitFlux)
        .expectNext("Apple")
        .expectNext("Orange")
        .expectNext("Grape")
        .expectNext("Banana")
        .expectNext("Strawberry")
        .verifyComplete();
}

因为当你从对象列表中创建Flux的时候,源数组包含了你使用到的相同的水果名称,所以被Flux所命中的数据有相同的值。这样一来,你就在验证这个Flux之前使用相同的StepVerifier

如果你需要从java.util.Listjava.util.Set或任何实现了java.lang.Iterable接口的类创建Flux,你可以将它传入静态方法fromIterable()中:

@Test
public void createAFlux_fromIterable() {
    List<String> fruitList = new ArrayList<>();
    fruitList.add("Apple");
    fruitList.add("Orange");
    fruitList.add("Grape");
    fruitList.add("Banana");
    fruitList.add("Strawberry");
    Flux<String> fruitFlux = Flux.fromIterable(fruitList);
    // ... verify steps
}

或是,如果你突然想要把你用得顺手的Java Stream作为Flux的源,你将会用到fromStream()方法:

@Test
public void createAFlux_fromStream() {
    Stream<String> fruitStream =
        Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
    Flux<String> fruitFlux = Flux.fromStream(fruitStream);
    // ... verify steps
}

这里还是一样地使用StepVerifier去验证需要发布到Flux的数据。

生成Flux数据

有时你不会使用到任何数据,仅仅需要把Flux作为一个计数器,输出递增的值。可以使用静态方法range()去创建一个Flux计数器。图10.4展示了range()是如何运作的。

10.4

![Acrobat_hparWHSeug](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Acrobat_hparWHSeug.png)

下面的测试方法展示了如何创建一个范围的Flux

@Test
public void createAFlux_range() {
    Flux<Integer> intervalFlux = Flux.range(1, 5);
    StepVerifier.create(intervalFlux)
        .expectNext(1)
        .expectNext(2)
        .expectNext(3)
        .expectNext(4)
        .expectNext(5)
        .verifyComplete();
}

在这个例子中,Flux的范围为15StepVerifier验证了它将会产生五个值,它们是整数的15

另一这类似于ranger()的来创建Flux的方法时interval()。如同range()方法一样,interval()创建一个输出递增值的Flux。不过,inerval()特别的一点在于,不是传递给它起始值和结束值,而是指定发出数据的时间间隔或频率。图10.5展示了interval()创建方法的弹珠图:

10.5

![Snipaste_2020-03-25_13-50-21](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Snipaste_2020-03-25_13-50-21.png)

例如,可以使用静态的interval()方法来创建每秒发送一个值的Flux,如下所示:

@Test
public void createAFlux_interval() {
    Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
    StepVerifier.create(intervalFlux)
        .expectNext(0L)
        .expectNext(1L)
        .expectNext(2L)
        .expectNext(3L)
        .expectNext(4L)
        .verifyComplete();
}

注意,由Flux发出的值是从0开始一个一个递增的。另外,由于interval()没有给出的最大值,它会永远地运行。同时,也可以使用take()操作将结果限制为前5个数据。我们将在下一章更多地讨论take()操作。

10.3.2响应式类型结合

你可能发现你需要将两种响应式类型以某种方式合并到一起。或者,在其他情况下,你可能需要将Flux分解成多个响应式类型。在本节中,我们将研究ReactorFluxMono的结合和分解操作。

合并响应式类型

假设你有两个Flux流,并需要建立一个汇聚结果的Flux,它会因为能够得到上流的Flux流,所以能够产生数据。为了将一个Flux与另一个合并,可以使用mergeWith()操作,如在图10.6展示的弹珠图一样:

10.6合并两个交替发送信息的Flux流为一个新的Flux

![Snipaste_2020-03-25_14-31-23](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Snipaste_2020-03-25_14-31-23.png)

例如,假设第一个Flux其值是电视和电影人物的名字,第二个Flux其值是食品的名称。下面的测试方法将展示如何使用mergeWith()方法合并两个Flux对象:

@Test
public void mergeFluxes() {
    Flux<String> characterFlux = Flux
        .just("Garfield", "Kojak", "Barbossa")
        .delayElements(Duration.ofMillis(500));

    Flux<String> foodFlux = Flux
        .just("Lasagna", "Lollipops", "Apples")
        .delaySubscription(Duration.ofMillis(250))
        .delayElements(Duration.ofMillis(500));

    Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
    StepVerifier.create(mergedFlux)
        .expectNext("Garfield")
        .expectNext("Lasagna")
        .expectNext("Kojak")
        .expectNext("Lollipops")
        .expectNext("Barbossa")
        .expectNext("Apples")
        .verifyComplete();
}

通常情况下,Flux会尽可能快的快地发送数据。因此,需要在创建Flux的时候使用delayElements()操作,用来将数据发送速度减慢 —— 每0.5s发送一个数据。此外,你将delaySubscription()操作应用于foodFlux,使得它在延迟250ms后才会发送数据,因此foodFlux将会在characterFlux之后执行。

合并这两个Flux对象后,新的合并后的Flux被创建。当StepVerifier订阅合并后的Flux时,它会依次订阅两个Flux源。

合并后的Flux发出的数据的顺序,与源发出的数据的时间顺序一致。由于两个Flux都被设置为固定频率发送数据,因此值会通过合并后的Flux交替出现 —— character…food…character…food一直这样下去。如何其中任何一个Flux的发送时间被修改了的话,你可能会看到2charater跟在1food后面或是2food跟在1character后面的情况。

由于mergeWith()不能保证源之间的完美交替,这时可以考虑zip()操作。当两个Flux被压缩在一起时,结果就是一个新的Flux生产出一个元组数据,这个元组包含了来自每一个源Flux的数据项。图10.7展示出了两个Flux是如何被缩在一起的:

10.7将两个Flux流压缩为一个Flux

![Snipaste_2020-03-25_14-31-23](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Snipaste_2020-03-25_14-31-23-1585192471116.png)

为了看看zip()操作的执行情况,参考一下下面的测试方法,它把character Fluxfood Flux压缩在了一起:

@Test
public void zipFluxes() {
    Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");

    Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux);

    StepVerifier.create(zippedFlux)
        .expectNextMatches(p ->
        	p.getT1().equals("Garfield") &&
            p.getT2().equals("Lasagna"))
        .expectNextMatches(p ->
            p.getT1().equals("Kojak") &&
            p.getT2().equals("Lollipops"))
        .expectNextMatches(p ->
            p.getT1().equals("Barbossa") &&
            p.getT2().equals("Apples"))
        .verifyComplete();
}

注意,与mergeWith()不同的是,zip()操作是一个静态的创建操作,通过它创建的Flux使characterfood完美对齐。从压缩后的Flux发送出来的每个项目都是Tuple2(包含两个对象的容器,其中包含每一个源Flux的数据。

如果你不想使用Tuple2,而是想用一些使用其他类型,你可以提供给zip()你想产生任何对象的Function接口。

**10.8 **

![Snipaste_2020-03-26_11-37-13](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Snipaste_2020-03-26_11-37-13.png)

例如,以下的试验方法说明了如何压缩的character Fluxfood Flux,使得它产生String类型的的Flux对象:

@Test
public void zipFluxesToObject() {
    Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");
    Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");

    Flux<String> zippedFlux = Flux.zip(characterFlux, foodFlux,
                                   (c, f) -> c + " eats " + f);

    StepVerifier.create(zippedFlux)
        .expectNext("Garfield eats Lasagna")
        .expectNext("Kojak eats Lollipops")
        .expectNext("Barbossa eats Apples")
        .verifyComplete();
}

zip()Function接口(这里给出一个lambda表达式)简单地把两个值连接成一句话,由压缩后的Flux进行数据发送。

选择第一个响应式类型进行发布

假设你有两个Flux对象,你只是想创建一个新的发送从第一个Flux产生值的Flux,而不是将两个Flux合并在一起。如图10.9所示,first()操作选择两个Flux对象的第一个对象然后输出它的值。

10.9第一操作选择了第一个Flux来发送一个消息,此后仅该Flux产生消息

![Snipaste_2020-03-26_13-49-36](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Snipaste_2020-03-26_13-49-36.png)

下面的测试方法创建一个fast Fluxslow Flux(这里的 “slow” 的意思是它在订阅之后100ms才发布数据。通过使用first(),它创建了一个新的Flux,将只会发布从第一个源Flux发布的数据:

@Test
public void firstFlux() {
    Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
        .delaySubscription(Duration.ofMillis(100));
    Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");

    Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);

    StepVerifier.create(firstFlux)
        .expectNext("hare")
        .expectNext("cheetah")
        .expectNext("squirrel")
        .verifyComplete();
}

在这种情况下,因为在fast Flux已经开始发布后100msslow Flux才开始发布数据,这样导致新创建的Flux将完全忽略slow Flux,而只发布fast flux中的数据。

10.3.3转换和过滤响应式流

当数据流过stream,你可能需要过滤或是修改一些值。在本节中,我们将看到的是转换和过滤流过响应式流中的数据。

从响应式类型中过滤数据

当数据从Flux中流出时,过滤数据的最基本方法之一就是简单地忽略前几个条目。如图10.10所示,skip()操作正是这样做的。

10.10 skip操作在将剩余消息传递给结果流之前跳过指定数量的消息

![Snipaste_2020-03-26_14-09-54](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\Snipaste_2020-03-26_14-09-54.png)

给定一个包含多个条目的Fluxskip()操作将创建一个新的Flux,该Flux在从源Flux发出剩余项之前跳过指定数量的项。下面的测试方法演示如何使用skip()

@Test
public void skipAFew() {
    Flux<String> skipFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .skip(3);

    StepVerifier.create(skipFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
}

在本例中,有五个字符串项的流。对该流调用skip(3)将生成一个新的流,该流跳过前三个项,并且只发布最后两个项。

你也许不是想跳过特定数量的项目,而是需要过一段时间再跳过前几个项目。skip()操作的另一种形式(如图10.11所示)是生成一个流,该流在从源流发出项之前等待一段指定的时间。

10.11 skip操作的另一种形式是在将消息传递到结果流之前等待一段时间

![10.11](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.11.png)

下面的测试方法使用skip()创建一个在发出任何值之前等待4秒的Flux。由于该Flux是从项之间具有1秒延迟(使用delayElements())的Flux创建的,因此只会发出最后两个项:

@Test
public void skipAFewSeconds() {
    Flux<String> skipFlux = Flux.just(
        "one", "two", "skip a few", "ninety nine", "one hundred")
        .delayElements(Duration.ofSeconds(1))
        .skip(Duration.ofSeconds(4));

    StepVerifier.create(skipFlux)
        .expectNext("ninety nine", "one hundred")
        .verifyComplete();
}

你已经看到了take()操作的一个例子,但是根据skip()操作,take()可以看作是skip()的反面。skip()跳过前几个项,take()只发出前几个项(如图10.12所示

@Test
public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .take(3);

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

10.12 take操作只传递来自传入流量的前几个消息,然后取消订阅

![10.12](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.12.png)

skip()一样,take()也有一个基于持续时间而不是项目计数的可选项。它会在一段时间之后,将接收并发出与通过源Flux一样多的项。如图10.13所示:

10.13 take操作的另一种形式是在某个时间过去后,将消息传递给结果流

![10.13](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.13.png)

以下测试方法使用take()的替代形式在订阅后的前3.5秒内发出尽可能多的项:

@Test
public void take() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .delayElements(Duration.ofSeconds(1))
        .take(Duration.ofMillis(3500));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Grand Canyon")
        .verifyComplete();
}

skip()take()操作可以看作是基于计数或持续时间的筛选条件的操作。对于更通用的Flux值过滤,会发现filter()操作非常有用。

给定一个决定一个项是否通过FluxPredicatefilter()操作允许你根据需要的任何条件有选择地发布。图10.14中的弹珠图显示了filter()的工作原理。

10.14传入的流可以被过滤,以便生成的流只接收与给定谓词匹配的消息。

![10.14](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.14.png)

要查看filter()的运行情况,请考虑以下测试方法:

@Test
public void filter() {
    Flux<String> nationalParkFlux = Flux.just(
        "Yellowstone", "Yosemite", "Grand Canyon","Zion", "Grand Teton")
        .filter(np -> !np.contains(" "));

    StepVerifier.create(nationalParkFlux)
        .expectNext("Yellowstone", "Yosemite", "Zion")
        .verifyComplete();
}

这里,filter()被赋予一个Predicate,它只接受没有空格的String。因此“Grand Canyon” 和 “Grand Teton” 被过滤掉。

也许你需要过滤的是你已经收到的任何项目。distinct()操作(如图10.15所示)产生一个只发布源Flux中尚未发布的项的Flux

10.15 distinct操作过滤掉所有重复的消息

![10.15](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.15.png)

在下面的测试中,只有唯一的String值将从不同的Flux中发出:

@Test
public void distinct() {
    Flux<String> animalFlux = Flux.just(
        "dog", "cat", "bird", "dog", "bird", "anteater")
        .distinct();

    StepVerifier.create(animalFlux)
        .expectNext("dog", "cat", "bird", "anteater")
        .verifyComplete();
}

尽管 “dog” 和 “bird” 分别从源Flux中发布两次,但在distinct Flux中只发布一次。

映射响应式数据

对于FluxMono,最常用的操作之一是将已发布的项转换为其他形式或类型。Reactor为此提供map()flatMap()操作。

map()操作会创建一个Flux,该Flux在重新发布之前,按照给定函数对其接收的每个对象执行指定的转换。图10.16说明了map()操作的工作原理。

10.16 map操作在结果流上执行将传入消息转换为新消息

![10.16](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.16.png)

在以下测试方法中,表示篮球运动员的String值的Flux映射到Player对象的新Flux

@Test
public void map() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .map(n -> {
            String[] split = n.split("\\s");
            return new Player(split[0], split[1]);
        });

    StepVerifier.create(playerFlux)
        .expectNext(new Player("Michael", "Jordan"))
        .expectNext(new Player("Scottie", "Pippen"))
        .expectNext(new Player("Steve", "Kerr"))
        .verifyComplete();
}

map()Function接口(作为lambda)将传入String以空格进行拆分,并使用生成的字符串数组创建Player对象。虽然用just()创建的流携带的是String对象,但是由map()生成的流携带的是Player对象。

关于map()的重要理解是,映射是同步执行的,因为每个项都是由源Flux发布的。如果要异步执行映射,应考虑使用flatMap()操作。

flatMap()操作需要一些思考和实践才能变得很熟练。如图10.17所示,flatMap()不是简单地将一个对象映射到另一个对象,而是将每个对象映射到一个新的MonoFluxMonoFlux的结果被压成一个新的Flux。当与subscribeOn()一起使用时,flatMap()可以释放Reactor类型的异步能力。

10.17转换映射操作使用中间流来执行转换,从而允许异步转换

![10.17](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.17.png)

下面的测试方法展示了flatMap()subscribeOn()的用法:

@Test
public void flatMap() {
    Flux<Player> playerFlux = Flux
        .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
        .flatMap(n -> Mono.just(n).map(p -> {
            String[] split = p.split("\\s");
            return new Player(split[0], split[1]);
        })
        .subscribeOn(Schedulers.parallel())
        );

    List<Player> playerList = Arrays.asList(
        new Player("Michael", "Jordan"),
        new Player("Scottie", "Pippen"Pippen"),
        new Player("Steve", "Kerr"));

    StepVerifier.create(playerFlux)
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .expectNextMatches(p -> playerList.contains(p))
        .verifyComplete();
}

请注意,flatMap()被赋予一个lambda函数,该函数将传入String转换为String类型的Mono。然后对Mono应用map()操作,将String转换为Player

如果你停在那里,产生的Flux将携带Player对象,以与map()示例相同的顺序同步生成。但是对Mono做的最后一件事是调用subscribeOn()来指示每个订阅应该在一个并行线程中进行。因此,可以异步和并行地执行多个传入String对象的映射操作。

尽管subscribeOn()的名称与subscribe()类似,但它们却截然不同。subscribe()是一个动词,它订阅一个响应式流并有效地将其启动,而subscribeOn()则更具描述性,它指定了应该 如何 并发地处理订阅。Reactor不强制任何特定的并发模型;通过subscribeOn()可以使用Schedulers程序中的一个静态方法指定要使用的并发模型。在本例中,使用了parallel(),它是使用固定大小线程池的工作线程(大小与CPU内核的数量一样。但是调度程序支持多个并发模型,如表10.1所述。

10.1 Schedulers并发模型

Schedulers方法 描述
.immediate() 在当前线程中执行订阅
.single() 在单个可重用线程中执行订阅,对所有调用方重复使用同一线程
.newSingle() 在每个调用专用线程中执行订阅
.elastic() 在从无限弹性池中提取的工作进程中执行订阅,根据需要创建新的工作线程,并释放空闲的工作线程(默认情况下60秒)
.parallel() 在从固定大小的池中提取的工作进程中执行订阅,该池的大小取决于CPU核心的数量。

使用flatMap()subscribeOn()的好处是,可以通过将工作分成多个并行线程来增加流的吞吐量。但由于这项工作是并行完成的,无法保证先完成哪项工作,因此无法知道产生的Flux中排放的项目的顺序。因此,StepVerifier只能验证发出的每个项是否存在于Player对象的预期列表中,并且在Flux完成之前将有三个这样的项。

在响应式流上缓冲数据

在处理流经Flux的数据的过程中,你可能会发现将数据流分解成比特大小的块是有帮助的。buffer()操作(如图10.18所示)可以解决这个问题。

10.18缓冲区操作会产生一个给定最大大小的列表流,这些列表是从传入的流中收集的

![10.18](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.18.png)

给定一个String值的Flux,每个值都包含一个水果的名称,你可以创建一个新的List集合的Flux,其中每个List的元素数不超过指定的数目:

@Test
public void buffer() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");

    Flux<List<String>> bufferedFlux = fruitFlux.buffer(3);

    StepVerifier
        .create(bufferedFlux)
        .expectNext(Arrays.asList("apple", "orange", "banana"))
        .expectNext(Arrays.asList("kiwi", "strawberry"))
        .verifyComplete();
}

在这种情况下,字符串元素的流量被缓冲到一个列表集合的新流量中,每个列表集合包含的项不超过三个。因此,发出五个字符串值的原始磁通量将转换为发出两个列表集合的磁通量,一个包含三个水果,另一个包含两个水果。

那又怎么样?将值从反应性流量缓冲到非反应性列表集合似乎适得其反。但是,当将buffer()与flatMap()结合使用时,它可以并行处理每个列表集合:

Flux.just("apple", "orange", "banana", "kiwi", "strawberry")
    .buffer(3)
    .flatMap(x ->
         Flux.fromIterable(x)
             .map(y -> y.toUpperCase())
             .subscribeOn(Schedulers.parallel())
             .log()
    ).subscribe();

在这个新示例中,仍然将五个String值的Flux缓冲到List集合的新Flux中,然后将flatMap()应用于List集合的Flux。这将获取每个List缓冲区并从其元素创建一个新的Flux,然后对其应用map()操作。因此,每个缓冲List在单独的线程中进一步并行处理。

为了证明它是有效的,我还包含了一个要应用于每个子Fluxlog()操作。log()操作只记录所有的Reactor Streams事件,这样你就可以看到真正发生了什么。因此,以下条目将写入日志(为了简洁起见,删除了时间组件

[main] INFO reactor.Flux.SubscribeOn.1 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 - onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()

日志条目清楚地显示,第一个缓冲区(apple、orangebanana)中的水果在parallel-1线程中处理。同时,在第二个缓冲区(kiwistrawberry)中的水果在parallel-2线程中进行处理。从每个缓冲区的日志条目交织在一起这一事实可以明显看出,这两个缓冲区是并行处理的。

如果出于某种原因,需要将Flux发出的所有内容收集到List中,则可以调用不带参数的buffer()

Flux<List<List>> bufferedFlux = fruitFlux.buffer();

这将产生一个新的Flux,该Flux会发出一个包含源Flux发布的所有项的List。使用collectList()操作也可以实现同样的功能,如图10.19中的弹珠图所示:

10.19 collect-list操作产生一个Mono,其中包含由传入Flux发出的所有消息的列表

![10.19](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.19.png)

collectList()生成一个发布ListMono,而不是生成一个发布ListMono。以下测试方法说明了如何使用它:

@Test
public void collectList() {
    Flux<String> fruitFlux = Flux.just(
        "apple", "orange", "banana", "kiwi", "strawberry");
    Mono<List<String>> fruitListMono = fruitFlux.collectList();

    StepVerifier
        .create(fruitListMono)
        .expectNext(Arrays.asList(
            "apple", "orange", "banana", "kiwi", "strawberry"))
        .verifyComplete();
}

一种更有趣的收集Flux发送的项目的方法是把它们存到Map中。如图10.20所示,collectMap()操作产生一个Mono,它发布一个Map,其中填充了由给定Function计算其键值的条目。

10.20 collect-map操作产生一个Mono,其中包含由传入Flux发出的消息的Map,其中的键来自传入消息的某些特性

![10.20](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.20.png)

要查看collectMap()的实际操作,请查看以下测试方法:

@Test
public void collectMap() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");
    Mono<Map<Character, String>> animalMapMono =
        animalFlux.collectMap(a -> a.charAt(0));

    StepVerifier
        .create(animalMapMono)
        .expectNextMatches(map -> {
            return
                map.size() == 3 &&
                map.get('a').equals("aardvark") &&
                map.get('e').equals("eagle") &&
                map.get('k').equals("kangaroo");
        })
        .verifyComplete();
}

Flux发出了一些动物的名字。在该Flux中,可以使用collectMap()创建一个新的Mono,该Mono发送一个Map,其中的键值由动物名称的第一个字母确定,并且该值是动物名称本身。如果两个动物名以同一个字母开头(如elephanteaglekoalakangaroo,则流经流的最后一个条目将覆盖所有先前的条目。

10.3.4对反应类型执行逻辑操作

有时你只需要知道MonoFlux发布的条目是否符合某些条件。all()any()操作将执行这样的逻辑。图10.2110.22说明了all()any()是如何工作的:

10.21可以对Flux进行测试以确保所有消息在所有操作中都满足某些条件

![10.21](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.21.png)

10.22可以对Flux进行测试以确保在任何操作中至少有一条消息满足某些条件

![10.22](E:\Document\spring-in-action-v5-translate\第三部分 响应式Spring\10Reactor介绍\10.22.png)

假设你想知道由Flux发布的每个String都包含字母a或字母k。下面的测试演示如何使用all()检查该条件:

@Test
public void all() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
    StepVerifier.create(hasAMono)
        .expectNext(true)
        .verifyComplete();

    Mono<Boolean> hasKMono = animalFlux.all(a -> a.contains("k"));
    StepVerifier.create(hasKMono)
        .expectNext(false)
        .verifyComplete();
}

在第一个StepVerifier中,检查字母aall操作应用于源Flux,从而生成Boolean类型的Mono。在本例中,所有的动物名都包含字母a,因此从产生的Mono发出true。但是在第二个StepVerifier中,得到的Mono将发出false,因为并非所有的动物名都包含k

与其执行全部满足或完全不满足的检查,不如满足至少有一个条目匹配。在这种情况下,any()操作就是你所需要的。这个新的测试用例使用any()检查字母tz

@Test
public void any() {
    Flux<String> animalFlux = Flux.just(
        "aardvark", "elephant", "koala", "eagle", "kangaroo");

    Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
    StepVerifier.create(hasAMono)
        .expectNext(true)
        .verifyComplete();

    Mono<Boolean> hasZMono = animalFlux.any(a -> a.contains("z"));
    StepVerifier.create(hasZMono)
        .expectNext(false)
        .verifyComplete();
}

在第一个StepVerifier中,你会看到生成的Mono发出true,因为至少有一个动物名有字母t(特别是elephant。在第二个StepVerifier中,生成的Mono发出false,因为没有一个动物名包含z

上一页
下一页