结合操作
Combing( 组合)
Merge( 合并)
在”异步的世界“中经常会创建这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。

Observable<Integer> observable_1 = Observable.from(new Integer[]{1, 2});
Observable<Integer> observable_2 = Observable.from(new Integer[]{2, 3});
Observable<Integer> observable_combined = Observable.merge(observable_1, observable_2);
observable_combined.subscribe(
(value) -> {
System.out.println(Thread.currentThread().getName() + " Emited!");
System.out.println(value);
}
);
需要注意的是,在上述代码中,最终值的输出序列还是subscribeOn
让每个
Observable<Object> observable_1 = Observable.create(subscriber -> {
try {
Thread.sleep(1000l);
subscriber.onNext(1);
Thread.sleep(3000l);
subscriber.onNext(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).subscribeOn(Schedulers.newThread());
Observable<Object> observable_2 = Observable.create(subscriber -> {
try {
Thread.sleep(2000l);
subscriber.onNext(3);
Thread.sleep(4000l);
subscriber.onNext(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.subscribeOn(Schedulers.newThread());
Observable<Object> observable_combined = Observable.merge(observable_1, observable_2);
observable_combined.subscribe(
(value) -> {
System.out.println(Thread.currentThread().getName() + " Emited!");
System.out.println(value);
}
);
最终的结果如下所示:
RxNewThreadScheduler-1 Emited!
1
RxNewThreadScheduler-2 Emited!
3
RxNewThreadScheduler-1 Emited!
2
RxNewThreadScheduler-2 Emited!
4