Flowable
Flowable
在以前的
创建与控制
// Simple Flowable
Flowable<Integer> integerFlowable = Flowable.just(1, 2, 3, 4);
// Flowable from Observable
Observable<Integer> integerObservable = Observable.just(1, 2, 3);
Flowable<Integer> integerFlowable = integerObservable
.toFlowable(BackpressureStrategy.BUFFER);
// from FlowableOnSubscribe
FlowableOnSubscribe<Integer> flowableOnSubscribe
= flowable -> flowable.onNext(1);
Flowable<Integer> integerFlowable = Flowable
.create(flowableOnSubscribe, BackpressureStrategy.BUFFER);
BackpressureStrategy
诸如
Buffer
如果我们使用
public void thenAllValuesAreBufferedAndReceived() {
List testList = IntStream.range(0, 100000)
.boxed()
.collect(Collectors.toList());
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.BUFFER)
.observeOn(Schedulers.computation()).test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());
assertEquals(testList, receivedInts);
}
这类似于在
Drop
我们可以使用
public void whenDropStrategyUsed_thenOnBackpressureDropped() {
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.computation())
.test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(!receivedInts.contains(100000));
}
Latest
使用
public void whenLatestStrategyUsed_thenTheLastElementReceived() {
Observable observable = Observable.fromIterable(testList);
TestSubscriber<Integer> testSubscriber = observable
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(Schedulers.computation())
.test();
testSubscriber.awaitTerminalEvent();
List<Integer> receivedInts = testSubscriber.getEvents()
.get(0)
.stream()
.mapToInt(object -> (int) object)
.boxed()
.collect(Collectors.toList());
assertThat(receivedInts.size() < testList.size());
assertThat(receivedInts.contains(100000));
}
当我们查看代码时,
Error
当我们使用
public void whenErrorStrategyUsed_thenExceptionIsThrown() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.ERROR)
.observeOn(Schedulers.computation())
.test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
Missing
如果使用
public void whenMissingStrategyUsed_thenException() {
Observable observable = Observable.range(1, 100000);
TestSubscriber subscriber = observable
.toFlowable(BackpressureStrategy.MISSING)
.observeOn(Schedulers.computation())
.test();
subscriber.awaitTerminalEvent();
subscriber.assertError(MissingBackpressureException.class);
}
在我们的测试中,我们同时针对