Observable
Observable

function (action$: Observable<Action>, state$: StateObservable<State>): Observable<Action>
在
同时,
- 竞态处理能力
- 声明式地任务处理
- 测试友好
- 组件自治(
redux-observable 1. 0 开始支持)
传统模式下组件的耦合
在传统的模式下,我们需要面对一个现实,对于状态的获取是
const state = store.getState();
亦即我们需要主动取用状态,而无法监听状态变化。因此,在这种模式下,我们组件化开发的思路会是:
- 组件挂载,开启轮询
- 搜索时,结束上次轮询,构建新的请求参数,开始新的轮询
- 排序变动时,结束上次轮询,构建新的请求参数,开始新的轮询
- 分页变动时,结束上次轮询,构建新的请求参数,开始新的轮询
- 组件卸载,结束轮询
在这种思路下,我们撰写搜索,排序,分页等容器时,当容器涉及的取值变动时,不仅需要在状态树上更新这些值,还需要去重启一下轮询。

假定我们使用
let pollingTimer: number = null;
function fetchUsers(): ThunkResult {
return (dispatch, getState) => {
const delay = pollingTimer === null ? 0 : 15 * 1000;
pollingTimer = setTimeout(() => {
dispatch({
type: FETCH_START,
payload: {}
});
const { repo }: { repo: IState } = getState();
const { pagination, sort, query } = repo;
// 封装参数
const param: ISearchParam = {
// ...
};
// 进行请求
// fetch(param)...
}, delay);
};
}
export function polling(): ThunkResult {
return dispatch => {
dispatch(stopPolling());
dispatch({
type: POLLING_START,
payload: {}
});
dispatch(fetchUsers());
};
}
export function stopPolling(): IAction {
clearTimeout(pollingTimer);
pollingTimer = null;
return {
type: POLLING_STOP,
payload: {}
};
}
export function changePagination(pagination: IPagination): ThunkResult {
return dispatch => {
dispatch({
type: CHANGE_PAGINATION,
payload: {
pagination
}
});
dispatch(polling());
};
}
export function changeQuery(query: string): ThunkResult {
return dispatch => {
dispatch({
type: CHANGE_QUERY,
payload: {
query
}
});
dispatch(polling());
};
}
export function changeSort(sort: string): ThunkResult {
return dispatch => {
dispatch({
type: CHANGE_SORT,
payload: {
sort
}
});
dispatch(polling());
};
}
可以看到,涉及到请求参数的几个组件,如筛选项目,分页,搜索等,当它们
或许这个场景的复杂程度你觉得也还能接受,但是假想我们有一个更大的项目,或者现在的项目未来会扩展得很大,那么组件势必会越来越多,参与协作的开发者也会越来越多。协作的开发者就需要时刻关注到自己撰写的组件是否会是其他开发者撰写的组件的影响因子,如果是的话,影响有多大,又该怎么处理?我们归纳下使用传统模式梳理数据流以及副作用面临的问题:
- 过程式编程,代码啰嗦;
- 竞态处理需要人为地通过标志量等进行控制;
- 组件间耦合大,彼此牵连。
FRP 模式与组件自治
在state$
:
function (action$: Observable<Action>, state$: StateObservable<State>): Observable<Action>
state$
的引入,让
const autoSaveEpic = (action$, state$) =>
action$.pipe(
ofType(AUTO_SAVE_ENABLE),
exhaustMap(() =>
state$.pipe(
pluck("googleDocument"),
distinctUntilChanged(),
throttleTime(500, { leading: false, trailing: true }),
concatMap(googleDocument =>
saveGoogleDoc(googleDocument).pipe(
map(() => saveGoogleDocFulfilled()),
catchError(e => of(saveGoogleDocRejected(e)))
)
),
takeUntil(action$.pipe(ofType(AUTO_SAVE_DISABLE)))
)
)
);
回过头来,我们还可以将列表页的需求概括为:
- 间隔一段时间轮询数据列表
- 参数(排序,分页等)变动时,重新发起轮询
- 主动进行搜索时,重新发起轮询
- 组件卸载时结束轮询
在
const pollingEpic: Epic = (action$, state$) => {
const stopPolling$ = action$.ofType(POLLING_STOP);
const params$: Observable<ISearchParam> = state$.pipe(
map(({ user }: { user: IState }) => {
const { pagination, sort, query } = user;
return {
q: `${query ? query + " " : ""}language:javascript`,
language: "javascript",
page: pagination.page,
per_page: pagination.pageSize,
sort,
order: EOrder.Desc
};
}),
distinctUntilChanged(isEqual)
);
return action$.pipe(
ofType(LISTEN_POLLING_START, SEARCH),
combineLatest(params$, (action, params) => params),
switchMap((params: ISearchParam) => {
const polling$ = merge(
interval(15 * 1000).pipe(
takeUntil(stopPolling$),
startWith(null),
switchMap(() =>
from(fetch(params)).pipe(
map(({ data }: ISearchResp) => ({
type: FETCH_SUCCESS,
payload: {
total: data.total_count,
list: data.items
}
})),
startWith({
type: FETCH_START,
payload: {}
}),
catchError((error: AxiosError) =>
of({
type: FETCH_ERROR,
payload: {
error: error.response.statusText
}
})
)
)
),
startWith({
type: POLLING_START,
payload: {}
})
)
);
return polling$;
})
);
};
首先我们声明轮询结束流,当轮询结束流有值产生时,轮询会被终止:
const stopPolling$ = action$.ofType(POLLING_STOP);
参数来源于状态,由于现在状态可观测,我们可以从状态流 state$
派发一个下游:参数流:
const params$: Observable<ISearchParam> = state$.pipe(
map(({ user }: { user: IState }) => {
const { pagination, sort, query } = user;
return {
// 构造参数
};
}),
distinctUntilChanged(isEqual)
);
我们预期参数流都是最新的参数,因此使用了
return action$.pipe(
ofType(LISTEN_POLLING_START, SEARCH),
combineLatest(params$, (action, params) => params),
switchMap((params: ISearchParam) => {
const polling$ = merge(
interval(15 * 1000).pipe(
takeUntil(stopPolling$),
// 自动开始轮询
startWith(null),
switchMap(() =>
from(fetch(params)).pipe(
map(({ data }: ISearchResp) => {
// ... 处理响应
}),
startWith({
type: FETCH_START,
payload: {}
}),
catchError((error: AxiosError) => {
// ...
})
)
),
startWith({
type: POLLING_START,
payload: {}
})
)
);
return polling$;
})
);
我们现在只需要在数据表格这个容器组件挂载时
export function changePagination(pagination: IPagination): IAction {
return {
type: CHANGE_PAGINATION,
payload: {
pagination
}
};
}
在

总结,利用
-
声明式地描述异步任务,代码简洁;
-
使用
switchMap operator 处理竞态任务; -
尽可能减少组件耦合,来达到组件自治。利于多人协作的大型工程。