辅助操作

Utility

delay:延迟发射或者监听

顾名思义,delay操作会延时一段时间再发射数据。有两种方式实现这个效果;一是缓存这些数据,等一段时间后再发射;或者是把Subscriber订阅的时间延迟。

delay()

简单的delay函数只是把每个数据都延时一段时间再发射,相当于把整个数据流都往后推迟了。

Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
            .delay(1, TimeUnit.SECONDS)
            .timeInterval()
            .subscribe(System.out::println);

输出:

TimeInterval [intervalInMilliseconds=1109, value=0]
TimeInterval [intervalInMilliseconds=94, value=1]
TimeInterval [intervalInMilliseconds=100, value=2]
TimeInterval [intervalInMilliseconds=100, value=3]
TimeInterval [intervalInMilliseconds=101, value=4]

可以看到,第一个数据差不多被延迟了1s,后面每隔100ms左右发射下一个数据。还可以分别推迟每个数据的时间。

这个重载函数的参数为一个函数,该函数的参数为源Observable发射的数据返回一个 信号Observable。当信号Observable发射数据的时候,也就是源Observable的数据发射的时候。

Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
    .delay(i -> Observable.timer(i * 100, TimeUnit.MILLISECONDS))
    .timeInterval()
    .subscribe(System.out::println);

输出:

TimeInterval [intervalInMilliseconds=152, value=0]
TimeInterval [intervalInMilliseconds=173, value=1]
TimeInterval [intervalInMilliseconds=199, value=2]
TimeInterval [intervalInMilliseconds=201, value=3]
TimeInterval [intervalInMilliseconds=199, value=4]

Observable每隔100ms发射一个数据,而结果显示为200ms发射一个数据。interval0开始发射数据,i结果为0、1、2等,每隔数据推迟了 i*100ms 再发射。所以后面每隔数据都比前一个数据多推迟了100ms,结果就是每个数据差不多间隔200ms发射。

delaySubscription

除了缓存数据,延迟发射缓冲的数据以外,还可以选择使用推迟订阅的方式。根据Observablehot或者cold则会有不同的结果。后面会专门的介绍coldhot Observable的区别。这里的示例为cold Observable,推迟订阅到cold Observable和推迟整个数据流是一样的效果。但是由于推迟订阅不需要缓存发射的数据,所以更加高效。

Observable.interval(100, TimeUnit.MILLISECONDS).take(5)
    .delaySubscription(1000, TimeUnit.MILLISECONDS)
    .timeInterval()
    .subscribe(System.out::println);

输出:

TimeInterval [intervalInMilliseconds=1114, value=0]
TimeInterval [intervalInMilliseconds=92, value=1]
TimeInterval [intervalInMilliseconds=101, value=2]
TimeInterval [intervalInMilliseconds=100, value=3]
TimeInterval [intervalInMilliseconds=99, value=4]

可以看到整个数据流推迟了1000ms。同样还有一个重载函数,可以使用另外一个Observable来告诉Subscriber何时订阅:

public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>> subscriptionDelay)
上一页
下一页