2022- 流处理系统中状态的表示和存储
流处理系统中状态的表示和存储
流处理系统处理的数据往往是没有边界的:数据会一直从数据源输入,用户需要看到
本文介绍了工业界学术界中流处理系统状态存储的三种方案:存储完整状态
引入
假设某个购物系统中有两个表:
visit(product, user, length)
表示用户查看某产品多少秒。info(product, category)
表示某个产品属于某个分类。
现在我们要查询:某个分类下用户查看产品最长的时间是多少。
CREATE VIEW result AS
SELECT category,
MAX(length) as max_length FROM
info INNER JOIN visit ON product
GROUP BY category
这个查询中包含两表
假设系统现在的状态是:
info(product, category)
Apple, Fruit
Banana, Fruit
Carrot, Vegetable
Potato, Vegetable
visit(product, user, length)
Apple, Alice, 10
Apple, Bob, 20
Carrot, Bob, 50
Banana, Alice, 40
Potato, Eve, 60
此时,查询的结果应该是
category, max_length
Fruit, 40
Vegetable, 60
代表
在常见的数据库产品中,系统通常来说会为这个查询生成如下的执行计划(不考虑

流处理系统的执行计划和常见数据库系统的计划没有太多区别。下面将具体介绍各种流处理系统会如何表示和存储计算的中间状态。
Full State - 算子维护自己的完整状态
诸如 Flink 的流处理系统持久化每个算子的完整状态;与此同时,流计算图上,算子之间传递数据的更新信息。这种存储状态的方法非常符合直觉。前文所述的

数据源会发出增加一行或是减少一行的消息。经过流算子的处理,这些消息会转变为用户需要的结果。
Join State 的存储
数据源的消息进入系统后,碰到的第一个算子就是info INNER JOIN visit ON product
。info
的消息后,会先将 visit
一侧的 product
相同的行查出来,然后发给下游。之后,将 info
一侧的消息记录在自己的状态中。对于右侧消息的处理也如出一辙。
比如,现在 visit
一侧收到+ Potato Eve 60
的消息。假设此时 info
一侧的状态已经有了四条记录。

info
一侧 product = Potato
的记录,得到Potato, Vegetable, 60
发给下游。
而后,visit
一侧的状态会加入 Potato -> Eve, 60
的记录,这样一来,如果 info
发生变化,visit
给下游发送
Aggregation State 的存储
消息接下来被传递到了
一些简单的
但对于
Fruit -> { 10, 20, 30, 40 }
Vegetable -> { 50 }
上游Potato, Vegetable, 60
的消息,
Fruit -> { 10, 20, 30, 40 }
Vegetable -> { 50, [60] }
并把
DELETE Vegetable, 50
INSERT Vegetable, 60
整个过程如下图所示:

总结
存储完整状态的流系统通常来说有这么几个特点:
- 流计算图上单向传递数据变更的消息
( 添加/ 删除) 。 - 流算子维护、访问自己的状态;与此同时,在多路
Join 的时候,存储的状态可能重复。后文在介绍共享状态时也会详细介绍这一点。
Shared State - 算子之间共享状态
我们以 Differential Dataflow (Materialize 下面的计算引擎
DD 的Arrange 算子与Arrangement

- 通过
handler 任意查询某个时间点key-value 的映射关系。 - 查询某一个
key 在一段时间内的变更情况。 - 指定查询的水位,后台合并或删除不再使用的历史数据。
- 数据在某一时刻的变更
(data, time, diff)
。这种数据流叫做Collection 。 - 数据的快照,也就是
Arrangement 的handler 。这种数据流叫做Arranged 。
Map 算子(对应SQL 的Projection )输入Collection 输出Collection 。JoinCore 算子(Join 的一个阶段) 输入Arranged 输出Collection 。ReduceCore 算子(Agg 的一个阶段) 输入Arranged 输出Arranged 。
之后我们会详细介绍
从Differential Dataflow 到Materialize
join
group by

Join State 的存储
Arrange
和一个 JoinCore
。JoinCore
算子。实际的JoinCore
算子中发生,JoinCore
不存储任何状态。

如上图所示,现在JoinCore
算子通过product = Potato
的行,匹配到 Potato
是一种蔬菜,往下游输出 Potato, Vegetable, 60
的更改。
Reduce 状态的存储
Arrange
和 ReduceCore
。Arrange
算子根据ReduceCore
算子自己维护一个存储聚合结果的as_collection
操作将聚合结果输出成一个

key = Vegetable
的行全部扫描出来,并求最大值,最后将最大值更新到自己的as_collection
操作后,即可输出为数据更新的形式,变成其他算子可以处理的信息。
更方便的算子状态复用
由于

比如用户想要同时查询 A JOIN B
和 B JOIN C
,在
另一个例子是多路SELECT * FROM A, B, C WHERE A.x = B.x and A.x = C.x
。在这个例子中,如果使用
远程访问状态的开销
在流系统中,计算中间产生的数据往往无法全部放在一个结点上;与此同时,同一个执行计划中的节点,也会有很多并行度。比如下面这个两表

在这个情况下,
总结
在共享状态的流处理系统中,算子的计算逻辑和存储逻辑被拆分到多个算子中。因此,不同的计算任务可以共享同一个存储,从而减少存储状态的数量。如果要实现共享状态的流处理系统,一般会有这样的特点:
- 流计算图上传递的不仅仅是数据的变更,可能还会包括状态的共享信息(比如
DD 的Trace Handle ) 。 - 流算子访问状态会有一定的开销;但相对而言存储完整状态的流计算系统而言,整个流计算过程中由于状态复用,存储的状态数量更小。
Partial State - 算子只存储部分信息
在 Noria (OSDI ‘18) 这一系统中,计算不会在数据源更新信息时触发,流处理算子并不会保存完整的信息。
比如,如果用户在之前创建的视图上执行:
SELECT * FROM result WHERE category = "Vegetable"
执行这条category = "Vegetable"
相关的数据,保存相关的状态。下面将以这条查询为例,说明
Upquery

用户向流计算引擎查询 category = "Vegetable"
的最大值。
Join 算子的实现


在
Agg 算子的实现

数据来到
总结
存储部分状态的流处理系统通过
- 计算图的数据流向是双向的——既可以从上游到下游输出数据,也可以从下游到上游发
upquery 。 - 由于需要递归
upquery ,计算的延迟可能比其他状态存储方式略微大一点。 - 数据一致性比较难实现。本文所述的其他存储方法都可以比较简单地实现最终一致;但对于存储部分状态的系统来说,需要比较小心地处理更新和
upquery 返回结果同时在流上传递的问题,对于每个算子都要仔细证明实现的正确性。 DDL / Recovery 非常快。由于算子里面的信息都是按需计算的,如果用户对View 进行增删列的操作,或是做迁移,都可以直接清空缓存分配新节点,无需代价较高的状态恢复。
最后对比一下所有的状态存储方式所对应的流处理系统特征:

- 存储完整状态
( 以Flink 为例) :流上传递数据。 - 共享状态存储
( 以Materialize / DD 为例) :流上传递数据和snapshot 。 - 存储部分状态
( 以Noria 为例) :流上传递数据,流上双向都有消息。