原理浅析

RxJava原理与机制浅析

ReactiveX框架(基于RxJava)实现原理浅析

RxJava基本流程和lift源码分析

RxJava原理解析

Subscribe

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码)

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到,subscriber() 做了3件事:

  1. 调用 Subscriber.onStart()。这个方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber)。在这里,事件发送的逻辑开始运行。从这也可以看出,在RxJava中,Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便unsubscribe().

整个过程中对象间的关系如下图:

关系静图

或者可以看动图:

关系静图

除了 subscribe(Observer)subscribe(Subscriber)subscribe() 还支持不完整定义的回调,RxJava会自动根据定义创建出 Subscriber。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自动创建 Subscriber,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber,并使用 onNextAction、onErrorAction 和 onCompletedAction 来定义 onNext()、onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1Action0Action0RxJava的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的闭包。Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj)onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj)onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0Action1API中使用最广泛,但RxJava是提供了多个 ActionX 形式的接口(例如Action2,Action3)的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,ObserverSubscriber 具有相同的角色,而且 Observersubscribe() 过程中最终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer,这样更加严谨。

Transform

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在RxJava的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下lift()的内部实现(仅核心代码)

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的onSubscribe所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——

  • subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。

  • 当含有 lift():

    当含有 lift(): 1.lift()创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;

    当含有 lift(): 1.lift()创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe

    当含有 lift(): 1.lift()创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe

    当含有 lift(): 1.lift()创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe4.而这个新 OnSubscribecall() 方法中的 onSubscribe,就是指的原始 Observable 中的原始 OnSubscribe,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的Subscriber(Operator就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的变换代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。

    当含有 lift(): 1.lift()创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe3.当用户调用经过 lift() 后的 Observablesubscribe() 的时候,使用的是 lift() 所返回的新的 Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe4.而这个新 OnSubscribecall() 方法中的 onSubscribe,就是指的原始 Observable 中的原始 OnSubscribe,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的Subscriber(Operator就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的变换代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。 这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

如果你更喜欢具象思维,可以看图:

lift() 原理图

或者可以看动图:

lift 原理动图

两次和多次的 lift() 同理,如下图:

两次 lift

举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

讲述 lift() 的原理只是为了让你更好地了解RxJava,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(map() flatMap())进行组合来实现需求,因为直接使用lift()非常容易发生一些难以发现的错误。