Subject
Subject
const subject = new Rx.Subject();
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(1);
subject.next(2);
observerA: 1;
observerB: 1;
observerA: 2;
observerB: 2;
Observable vs Subject
The subject is another Observable type in RxJS. Subjects like Observables can emit multiple event values. However, Subjects allow subscribers of the Subject to push back or trigger their own events on the Subject. Here is what the Subject API looks like,
import { Observable } from "rxjs";
const observable = new Observable((observer) => {
setTimeout(() => observer.next("hello from Observable!"), 1000);
});
observable.subscribe((v) => console.log(v));
import { Subject } from "rxjs";
const subject = new Subject();
subject.next("missed message from Subject");
subject.subscribe((v) => console.log(v));
subject.next("hello from subject!");
We instantiate the Subject class. With the Subject instance, we can immediately trigger events outside of the constructor by calling next(). Now anyone can listen or trigger events on the Subject. Notice how we call next and emit ‘missed message from Subject’ before we have subscribed to the Subject? Subjects, unlike regular Observables, are what we would call “Hot”. A hot Observable is an Observable that can start emitting events before you subscribe. This means you can miss previous events that have already emitted.
Subjects, unlike Observables, share their work with all subscribers. Unlike our first Observable that created a setTimeout for each subscriber, this Subject would share that work with all subscribers. What if we subscribe late to our Subject and want to get the previous value we missed? Well, that’s where our next Subject type comes in, the ReplaySubject.
const clock$ = Observable.interval(1000).take(3);
const observerA = {
next(v) {
console.log(‘A next: ‘ + v)
}
}
const observerB = {
next(v) {
console.log(‘B next: ‘ + v)
}
}
clock$.subscribe(observerA) // a Observable execution
setTimeout(() => {
clock$.subscribe(observerB) // another new Observable execution
}, 2000)
/*
* A next: 0
* A next: 1
* A next: 2
* B next: 0
* B next: 1
* B next: 2
*/
// 如果是同一个 shared Observable execution 的话,B的第一个 emit 的值应该是 2 而不是 0,并且只有且仅有一个值 2
const clock$ = Observable.interval(1000).take(3);
const observerA = {
next(v) {
console.log(‘A next: ‘ + v)
}
}
const observerB = {
next(v) {
console.log(‘B next: ‘ + v)
}
}
const subject = new Subject()
subject.subscribe(observerA)
clock$.subscribe(subject)
setTimeout(() => {
subject.subscribe(observerB)
}, 2000)
/*
* A next: 0
* A next: 1
* A next: 2
* B next: 2
*/
multicast
// RxJS v6+
import { Subject, interval } from "rxjs";
import { take, tap, multicast, mapTo } from "rxjs/operators";
//emit every 2 seconds, take 5
const source = interval(2000).pipe(take(5));
const example = source.pipe(
//since we are multicasting below, side effects will be executed once
tap(() => console.log("Side Effect #1")),
mapTo("Result!")
);
//subscribe subject to source upon connect()
const multi = example.pipe(multicast(() => new Subject()));
/*
subscribers will share source
output:
"Side Effect #1"
"Result!"
"Result!"
...
*/
const subscriberOne = multi.subscribe((val) => console.log(val));
const subscriberTwo = multi.subscribe((val) => console.log(val));
//subscribe subject to source
multi.connect();
引用计数
通常,当第一个观察者到达时我们想要自动地连接,而当最后一个观察者取消订阅时我们想要自动地取消共享执行。我们可以使用
const source = Rx.Observable.interval(500);
const subject = new Rx.Subject();
const refCounted = source.multicast(subject).refCount();
const subscription1, subscription2, subscriptionConnect;
// 这里其实调用了 `connect()`,
// 因为 `refCounted` 有了第一个订阅者
console.log("observerA subscribed");
subscription1 = refCounted.subscribe({
next: v => console.log("observerA: " + v)
});
setTimeout(() => {
console.log("observerB subscribed");
subscription2 = refCounted.subscribe({
next: v => console.log("observerB: " + v)
});
}, 600);
setTimeout(() => {
console.log("observerA unsubscribed");
subscription1.unsubscribe();
}, 1200);
// 这里共享的 Observable 执行会停止,
// 因为此后 `refCounted` 将不再有订阅者
setTimeout(() => {
console.log("observerB unsubscribed");
subscription2.unsubscribe();
}, 2000);
setTimeout(() => {
console.log("observerC subscribed");
subscription2 = refCounted.subscribe({
next: v => console.log("observerC: " + v)
});
}, 2500);
/*
"observerA subscribed"
"observerA: 0"
"observerB subscribed"
"observerA: 1"
"observerB: 1"
"observerA unsubscribed"
"observerB: 2"
"observerB: 3"
"observerB unsubscribed"
"observerC subscribed"
"observerC: 0"
*/
BehaviorSubject
在下面的示例中,
const subject = new Rx.BehaviorSubject(0); // 0是初始值
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(3);
/**
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
**/
ReplaySubject
const subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(5);
/**
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
**/
除了缓冲数量,你还可以指定
const subject = new Rx.ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
const i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
}, 1000);
从下面的输出可以看出,第二个观察者得到的值是3
、4
、5
,这三个值是订阅发生前的500
毫秒内发生的:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerA: 5
observerB: 3
observerB: 4
observerB: 5
observerA: 6
observerB: 6
...
AsyncSubject
complete()
const subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log("observerA: " + v),
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log("observerB: " + v),
});
subject.next(5);
subject.complete();
输出:
observerA: 5
observerB: 5
last()
操作符类似,因为它也是等待 complete
通知,以发送一个单个值。