Observable

Observable (可观察对象)

Observables是多个值的惰性推送集合。当订阅下面代码中的Observable的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流:

const observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

要调用Observable并看到这些值,我们需要订阅Observable

console.log("just before subscribe");
observable.subscribe({
  next: (x) => console.log("got value " + x),
  error: (err) => console.error("something wrong occurred: " + err),
  complete: () => console.log("done"),
});
console.log("just after subscribe");

Observables是使用Rx.Observable.create或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送next/error/complete通知给观察者,而且执行可能会被清理。这四个方面全部编码在Observables实例中,但某些方面是与其他类型相关的,像Observer (观察者)Subscription (订阅)

拉取(Pull) vs.推送(Push)

拉取和推送是两种不同的协议,用来描述数据生产者(Producer)如何与数据消费者(Consumer)进行通信的。在拉取体系中,由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。每个JavaScript函数都是拉取体系。函数是数据的生产者,调用该函数的代码通过从函数调用中“取出”一个单个返回值来对该函数进行消费。ES2015引入了generator函数和iterators (function*),这是另外一种类型的拉取体系。调用iterator.next()的代码是消费者,它会从iterator(生产者)那“取出”多个值。

在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。在当今的JavaScript世界中,Promises是最常见的推送体系类型。Promise(生产者)将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由Promise来决定何时把值“推送”给回调函数。RxJS引入了Observables,一个新的JavaScript推送体系。Observable是多个值的生产者,并将值“推送”给观察者(消费者)

  • Function是惰性的评估运算,调用时会同步地返回一个单一值。
  • Generator是惰性的评估运算,调用时会同步地返回零到(有可能的)无限多个值。
  • Promise是最终可能(或可能不)返回单个值的运算。
  • Observable是惰性的评估运算,它可以从它被调用的时刻起同步或异步地返回零到(有可能的)无限多个值。

创建Observables

Rx.Observable.createObservable构造函数的别名,它接收一个参数:subscribe函数。下面的示例创建了一个Observable,它每隔一秒会向观察者发送字符串 ‘hi’ 。

const observable = Rx.Observable.create(function subscribe(observer) {
  const id = setInterval(() => {
    observer.next("hi");
  }, 1000);
});

Observables可以使用create来创建,但通常我们使用所谓的创建操作符,of、from、interval、等等。

// 来自一个或多个值
Rx.Observable.of("foo", "bar");

// 来自数组
Rx.Observable.from([1, 2, 3]);

// 来自事件
Rx.Observable.fromEvent(document.querySelector("button"), "click");

// 来自 Promise
Rx.Observable.fromPromise(fetch("/users"));

// 来自回调函数(最后一个参数得是回调函数,比如下面的 cb)
// fs.exists = (path, cb(exists))
const exists = Rx.Observable.bindCallback(fs.exists);
exists("file.txt").subscribe((exists) =>
  console.log("Does file exist?", exists)
);

// 来自回调函数(最后一个参数得是回调函数,比如下面的 cb)
// fs.rename = (pathA, pathB, cb(err, result))
const rename = Rx.Observable.bindNodeCallback(fs.rename);
rename("file.txt", "else.txt").subscribe(() => console.log("Renamed!"));

订阅Observables

示例中的Observable对象observable可以订阅,像这样:

observable.subscribe((x) => console.log(x));

observable.subscribeObservable.create(function subscribe(observer) {...}) 中的subscribe有着同样的名字,这并不是一个巧合。在库中,它们是不同的,但从实际出发,你可以认为在概念上它们是等同的。这表明subscribe调用在同一Observable的多个观察者之间是不共享的。当使用一个观察者调用observable.subscribe时,Observable.create(function subscribe(observer) {...}) 中的subscribe函数只服务于给定的观察者。对observable.subscribe的每次调用都会触发针对给定观察者的独立设置。

这与像addEventListener / removeEventListener这样的事件处理方法API是完全不同的。使用observable.subscribe,在Observable中不会将给定的观察者注册为监听器。Observable甚至不会去维护一个附加的观察者列表。

执行Observables

Observable.create(function subscribe(observer) {…})中…的代码表示 “Observable执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。Observable执行可以传递三种类型的值:

  • “Next” 通知:发送一个值,比如数字、字符串、对象,等等。
  • “Error” 通知:发送一个JavaScript错误 或 异常。
  • “Complete” 通知:不再发送任何值。

“Next” 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据“Error” 和 “Complete” 通知可能只会在Observable执行期间发生一次,并且只会执行其中的一个。

下面是Observable执行的示例,它发送了三个 “Next” 通知,然后是 “Complete” 通知:

const observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observable严格遵守自身的规约,所以下面的代码不会发送 “Next” 通知 4

const observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // 因为违反规约,所以不会发送
});

subscribe 中用try/catch代码块来包裹任意代码是个不错的主意,如果捕获到异常的话,会发送 “Error” 通知:

const observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // 如果捕获到异常会发送一个错误
  }
});

清理Observable执行

因为Observable执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个API来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。当调用了observable.subscribe,观察者会被附加到新创建的Observable执行中。这个调用还返回一个对象,即Subscription (订阅)

const subscription = observable.subscribe((x) => console.log(x));

Subscription表示进行中的执行,它有最小化的API以允许你取消执行。使用subscription.unsubscribe()你可以取消进行中的执行:

const observable = Rx.Observable.from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// 稍后:
subscription.unsubscribe();

当我们使用create()方法创建Observable时,Observable必须定义如何清理执行的资源。你可以通过在function subscribe()中返回一个自定义的unsubscribe函数。举例来说,这是我们如何清理使用了setIntervalinterval执行集合:

const observable = Rx.Observable.create(function subscribe(observer) {
  // 追踪 interval 资源
  const intervalID = setInterval(() => {
    observer.next("hi");
  }, 1000);

  // 提供取消和清理 interval 资源的方法
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

正如 observable.subscribe 类似于 Observable.create(function subscribe() {...}),从 subscribe 返回的 unsubscribe 在概念上也等同于 subscription.unsubscribe。事实上,如果我们抛开围绕这些概念的ReactiveX类型,保留下来的只是相当简单的JavaScript

function subscribe(observer) {
  const intervalID = setInterval(() => {
    observer.next("hi");
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

const unsubscribe = subscribe({ next: (x) => console.log(x) });

// 稍后:
unsubscribe(); // 清理资源

为什么我们要使用像ObservableObserverSubscription这样的Rx类型?原因是保证代码的安全性(比如Observable规约)和操作符的可组合性。

下一页