Observable
Observable ( 可观察对象)
const observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});
要调用
console.log("just before subscribe");
observable.subscribe({
next: (x) => console.log("got value " + x),
error: (err) => console.error("something wrong occurred: " + err),
complete: () => console.log("done"),
});
console.log("just after subscribe");
拉取(Pull) vs. 推送(Push)
拉取和推送是两种不同的协议,用来描述数据生产者function*
在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。在当今的
Function 是惰性的评估运算,调用时会同步地返回一个单一值。 Generator 是惰性的评估运算,调用时会同步地返回零到 ( 有可能的) 无限多个值。Promise 是最终可能 ( 或可能不) 返回单个值的运算。Observable 是惰性的评估运算,它可以从它被调用的时刻起同步或异步地返回零到 ( 有可能的) 无限多个值。
创建Observables
const observable = Rx.Observable.create(function subscribe(observer) {
const id = setInterval(() => {
observer.next("hi");
}, 1000);
});
// 来自一个或多个值
Rx.Observable.of("foo", "bar");
// 来自数组
Rx.Observable.from([1, 2, 3]);
// 来自事件
Rx.Observable.fromEvent(document.querySelector("button"), "click");
// 来自 Promise
Rx.Observable.fromPromise(fetch("/users"));
// 来自回调函数(最后一个参数得是回调函数,比如下面的 cb)
// fs.exists = (path, cb(exists))
const exists = Rx.Observable.bindCallback(fs.exists);
exists("file.txt").subscribe((exists) =>
console.log("Does file exist?", exists)
);
// 来自回调函数(最后一个参数得是回调函数,比如下面的 cb)
// fs.rename = (pathA, pathB, cb(err, result))
const rename = Rx.Observable.bindNodeCallback(fs.rename);
rename("file.txt", "else.txt").subscribe(() => console.log("Renamed!"));
订阅Observables
示例中的
observable.subscribe((x) => console.log(x));
Observable.create(function subscribe(observer) {...})
中的Observable.create(function subscribe(observer) {...})
中的
这与像
执行Observables
Observable.create(function subscribe(observer) {…
- “Next” 通知:发送一个值,比如数字、字符串、对象,等等。
- “Error” 通知:发送一个
JavaScript 错误 或 异常。 - “Complete” 通知:不再发送任何值。
“Next” 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据
下面是
const observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
4
:
const observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // 因为违反规约,所以不会发送
});
在 subscribe
中用try
catch
const observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // 如果捕获到异常会发送一个错误
}
});
清理Observable 执行
因为
const subscription = observable.subscribe((x) => console.log(x));
const observable = Rx.Observable.from([10, 20, 30]);
const subscription = observable.subscribe((x) => console.log(x));
// 稍后:
subscription.unsubscribe();
当我们使用
const observable = Rx.Observable.create(function subscribe(observer) {
// 追踪 interval 资源
const intervalID = setInterval(() => {
observer.next("hi");
}, 1000);
// 提供取消和清理 interval 资源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
正如 observable.subscribe
类似于 Observable.create(function subscribe() {...})
,从 subscribe
返回的 unsubscribe
在概念上也等同于 subscription.unsubscribe
。事实上,如果我们抛开围绕这些概念的
function subscribe(observer) {
const intervalID = setInterval(() => {
observer.next("hi");
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
const unsubscribe = subscribe({ next: (x) => console.log(x) });
// 稍后:
unsubscribe(); // 清理资源
为什么我们要使用像