结合操作

Combing(组合)

Merge(合并)

在”异步的世界“中经常会创建这样的场景,我们有多个来源但是又只想有一个结果:多输入,单输出。RxJavamerge()方法将帮助你把两个甚至更多的Observables合并到他们发射的数据项里。下图给出了把两个序列合并在一个最终发射的Observable

正如你看到的那样,发射的数据被交叉合并到一个Observable里面。注意如果你同步的合并Observable,它们将连接在一起并且不会交叉。

Observable<Integer> observable_1 = Observable.from(new Integer[]{1, 2});

Observable<Integer> observable_2 = Observable.from(new Integer[]{2, 3});

Observable<Integer> observable_combined = Observable.merge(observable_1, observable_2);

observable_combined.subscribe(
        (value) -> {

            System.out.println(Thread.currentThread().getName() + " Emited!");
            System.out.println(value);
        }
);

需要注意的是,在上述代码中,最终值的输出序列还是1,2,2,3,这是因为两个Observable都是在Main Thread中执行,我们来看看如果用subscribeOn让每个Observable在不同线程中执行的效果:

Observable<Object> observable_1 = Observable.create(subscriber -> {
    try {
        Thread.sleep(1000l);

        subscriber.onNext(1);

        Thread.sleep(3000l);

        subscriber.onNext(2);

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).subscribeOn(Schedulers.newThread());

Observable<Object> observable_2 = Observable.create(subscriber -> {
    try {

        Thread.sleep(2000l);

        subscriber.onNext(3);

        Thread.sleep(4000l);

        subscriber.onNext(4);

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
})
        .subscribeOn(Schedulers.newThread());

Observable<Object> observable_combined = Observable.merge(observable_1, observable_2);

observable_combined.subscribe(
        (value) -> {

            System.out.println(Thread.currentThread().getName() + " Emited!");

            System.out.println(value);
        }
);

最终的结果如下所示:

RxNewThreadScheduler-1 Emited!
1
RxNewThreadScheduler-2 Emited!
3
RxNewThreadScheduler-1 Emited!
2
RxNewThreadScheduler-2 Emited!
4
上一页
下一页