反应式库比较
反应式库比较
- Composable(可组装)
- Lazy(延迟执行)
- Reusable(可重用)
- Asynchronous(异步)
- Cacheable(可缓存)
- Push or Pull(推还是拉)
- Backpressure(反压)
- Operator fusion(操作融合)
针对上面这些维度,我们比较以下的这些类:
- CompletableFuture
- Stream
- Optional
- Observable (RxJava 1)
- Observable (RxJava 2)
- Flowable (RxJava 2)
- Flux (Reactor Core)

Composable(可组装)
上面所有的这
-
CompletableFuture: 提供很多的.then*()
方法,这些方法允许我们构建一个流水线,在不同的执行阶段之间传递一个单一的值(或者没有值) ,以及传递异常对象。 -
Stream: 提供很多的可以链式编程方式连接起来的操作,不同的操作阶段之间可以传递N 个值。 -
Optional: 提供一些中间操作,如:.map(), .flatMap(), .filter(). -
Observable, Flowable, Flux: 跟Stream 相同
Lazy(延迟执行)
-
CompletableFuture : 非延迟执行,它本质上只是一个异步结果的持有者。这些对象创建出来是为了代表对应的工作,CompletableFuture 创建的时候,对应的工作已经开始执行了。它不知道任何的关于工作的具体内容,只是关心结果。所以,没有办法能走到上游去从上到下执行整个流水线。当结果被塞到CompletableFuture 对象的时候,下一个阶段开始执行。 -
Stream : 所有的中间操作都是延迟执行的。所有的终端操作,会触发整个计算。 -
Optional : 非延迟执行,所有的操作会马上发生。 -
Observable, Flowable, Flux : 延迟执行,没有订阅者的话,什么都不会做,只有当有订阅者的时候才会执行。
Reusable(可重用)
-
CompletableFuture : 可以重用,它只是在一个值外面做了一层包装。但需要注意一点,这个包装是可更改的。.obtrude*()
方法会更改它的内容,如果你确定没有人会调用到这类方法,那么重用它还是安全的。 -
Stream : 不能重用。Stream 只能被操作(调用中间操作或者终端操作)一次。如果一个Stream 的实现检测到流被重复使用了,它可以抛出一个IllegalStateException 。但是因为某些流操作会返回他们的receiver ,而不是一个新的Stream 对象,并不是在所有的情况下都能够检测出重用。 -
Optional : 完全可重用,因为它是不可变对象,而且所有工作都是立即执行的。 -
Observable, Flowable, Flux : 就是设计来可重用的。所有的执行会从初始点开始,走过所有阶段,前提是有订阅者。
Asynchronous(异步)
-
CompletableFuture : 这个类存在的目的就是异步的把多个操作链接起来。CompletableFuture 代表一个工作,后面跟一个Executor 关联起来。如果你不明确指定一个executor ,那么系统会使用公共的ForkJoinPool 线程池来执行。这个线程池可以用ForkJoinPool.commonPool() 获取到。默认的设置下它会创建系统硬件支持的线程数一样多的线程(通常就是跟CPU 的核心数,如果你的CPU 支持超线程,那么可能再翻一倍) 。不过你也可以设置ForkJoinPool 线程池的线程数,用以下JVM option: -Djava.util.concurrent.ForkJoinPool.common.parallelism=?
,或者每次调用的时候提供一个定制的Executor 。 -
Stream : 不支持创建异步过程,但是可以支持并行的计算——通过stream.parallel() 等方式创建并行流。 -
Optional : 不支持,它只是一个容器。 -
Observable, Flowable, Flux : 目标就是为了构建异步的系统,但是默认情况下还是同步的。subscribeOn 和observeOn 允许你来控制消息的订阅以及消息的接收(指定当你的observer 的onNext / onError / onCompleted 被调用的时候做什么事情) 。
Observable
.fromCallable(() -> {
log.info("Reading on thread: " + currentThread().getName());
return readFile("input.txt");
})
.map(text -> {
log.info("Map on thread: " + currentThread().getName());
return text.length();
})
.subscribeOn(Schedulers.io()) // <-- setting scheduler
.subscribe(value -> {
log.info("Result on thread: " + currentThread().getName());
});
/**
Reading file on thread: RxIoScheduler-2
Map on thread: RxIoScheduler-2
Result on thread: RxIoScheduler-2
**/
相反的,
Observable
.fromCallable(() -> {
log.info("Reading on thread: " + currentThread().getName());
return readFile("input.txt");
})
.observeOn(Schedulers.computation()) // <-- setting scheduler
.map(text -> {
log.info("Map on thread: " + currentThread().getName());
return text.length();
})
.subscribeOn(Schedulers.io()) // <-- setting scheduler
.subscribe(value -> {
log.info("Result on thread: " + currentThread().getName());
});
/**
Reading file on thread: RxIoScheduler-2
Map on thread: RxComputationScheduler-1
Result on thread: RxComputationScheduler-1
**/
Cacheable(可缓存)
可缓存和可重用之间的区别是什么?举个例子,我们有一个流水线
可以看出,一个类如果是可缓存的,必然得是可重用的。
-
CompletableFuture : 跟可重用的答案一样。 -
Stream : 不能缓存中间操作的结果,除非调用了终端操作。 -
Optional: ‘可缓存’,实际上,所有工作立即执行,并且做完后就保存了一个不变值,自然‘可缓存’。
-
Observable, Flowable, Flux : 默认情况下是不可缓存的,但是你可以把一个这些类转变成缓存,只要调用.cache() 就可以。示例:
Observable<Integer> work = Observable.fromCallable(() -> {
System.out.println("Doing some work");
return 10;
});
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
/**
Doing some work
10
Doing some work
20
**/
如果用
Observable<Integer> work = Observable.fromCallable(() -> {
System.out.println("Doing some work");
return 10;
}).cache(); // <- apply caching
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
/**
Doing some work
10
20
**/
Push or Pull(推模式还是拉模式)
-
Stream 和Optional : 是拉模式的。你调用不同的方法(.get(), .collect() 等)从流水线拉取结果。拉模式经常与阻塞、同步是相关联的,而这也合理。你调用一个方法,然后线程等待数据。线程会阻塞直到数据到达。 -
CompletableFuture, Observable, Flowable, Flux : 是推模式的。你订阅一个流水线,然后当有东西可以处理的时候你会得到通知。推模式通常意味着非阻塞、异步。当流水线在某个线程上执行的时候,你可以做任何事情。你已经定义了一段待执行的代码,作为下一个阶段的任务,当通知到达的时候,这个代码就会被执行。
Backpressure(反压)
要做到支持反压,流水线必须是推模式的。Backpressure(反压)描述的是在流水线中会发生的一种场景:某些异步的阶段处理速度跟不上,需要告诉上游生产者放慢速度。直接失败是不可接受的,因为会丢失太多数据。

-
Stream & Optional : 不支持反压,因为他们是拉模式。 -
CompletableFuture : 不需要面对这个问题,因为它只产生0 个或者1 个结果。 -
Observable(RxJava 1), Flowable, Flux : 提供一组方案解决这个问题。常用的策略是:
- : Buffering(缓冲)
: 把所有的onNext 的值保存到缓冲区,直到下游消费它们。 Drop Recent: 如果下游处理跟不上的话,丢弃最近的onNext 值。Use Latest: 如果下游处理跟不上的话,只提供最近的onNext 值,之前的值会被覆盖。None: onNext 事件直接被触发,不带任何缓冲或丢弃处理。Exception: 如果下游处理跟不上的话,触发一个异常。
Observable(RxJava 2) : 不解决这个问题。很多RxJava 1 的使用者用Observable 来处理不适用反压的事件,或者是使用Observable 的时候不用任何策略处理反压,这会导致不可预知的异常。所以,RxJava 2 明确地区分两种情况,提供支持反压的Flowable 和不支持反压的Observable 。
Operator fusion(操作融合)
操作融合背后的想法是,在生命周期的不同点上,改变执行阶段的链条,从而消除库的架构因素所造成的额外开销。所有这些优化都是在内部处理掉的,对外部用户来说是透明的。只有
Macro-fusion: 用一个操作替换2 个或更多的相继的操作

Micro-fusion: 在一个输出队列中结束的操作,和在一个前驱队列中开始的操作,能够共用同一个队列的实例。比如说,与其调用request(1) 然后处理onOnext() ,我们可以:

订阅者可以向父
