2022-流处理系统中状态的表示和存储

原文地址

流处理系统中状态的表示和存储

流处理系统处理的数据往往是没有边界的:数据会一直从数据源输入,用户需要看到SQL查询的实时结果。与此同时,流处理系统中的计算节点可能出错、失败,可能根据用户的需求实时扩容、缩容。在这一过程中,系统需要能够高效地将计算的中间状态在节点之间转移,并持久化到外部系统上,从而保证计算的不间断进行。

本文介绍了工业界学术界中流处理系统状态存储的三种方案:存储完整状态(Flink 等系统),存储共享状态(Materialize / Differential Dataflow 为例),存储部分状态(Noria (OSDI ‘18) 为例)。这些存储方案各有优势,可以为未来的流处理引擎开发提供一些借鉴意义。

引入

假设某个购物系统中有两个表:

  • visit(product, user, length) 表示用户查看某产品多少秒。
  • info(product, category) 表示某个产品属于某个分类。

现在我们要查询:某个分类下用户查看产品最长的时间是多少。

CREATE VIEW result AS
  SELECT category,
       MAX(length) as max_length FROM
  info INNER JOIN visit ON product
  GROUP BY category

这个查询中包含两表Join和一个聚合操作。后文的讨论都将基于这个查询进行。

假设系统现在的状态是:

info(product, category)
Apple, Fruit
Banana, Fruit
Carrot, Vegetable
Potato, Vegetable

visit(product, user, length)
Apple, Alice, 10
Apple, Bob, 20
Carrot, Bob, 50
Banana, Alice, 40
Potato, Eve, 60

此时,查询的结果应该是

category, max_length
Fruit, 40
Vegetable, 60

代表Fruit分类被用户查看最长的时间为40秒(对应Alice访问Banana的时间Vegetable分类被用户查看的最长时间为60秒(对应Eve访问Potato的时间

在常见的数据库产品中,系统通常来说会为这个查询生成如下的执行计划(不考虑optimizer

base plan of the query

流处理系统的执行计划和常见数据库系统的计划没有太多区别。下面将具体介绍各种流处理系统会如何表示和存储计算的中间状态。

Full State -算子维护自己的完整状态

诸如 Flink 的流处理系统持久化每个算子的完整状态;与此同时,流计算图上,算子之间传递数据的更新信息。这种存储状态的方法非常符合直觉。前文所述的SQL,在Flink等系统中大致会创建出这个计算图:

plan of Flink

数据源会发出增加一行或是减少一行的消息。经过流算子的处理,这些消息会转变为用户需要的结果。

Join State的存储

数据源的消息进入系统后,碰到的第一个算子就是Join。回顾SQL查询的Join条件: info INNER JOIN visit ON productJoin算子在收到左侧 info 的消息后,会先将 visit 一侧的 product 相同的行查出来,然后发给下游。之后,将 info 一侧的消息记录在自己的状态中。对于右侧消息的处理也如出一辙。

比如,现在 visit 一侧收到Eve对着Potato看了60+ Potato Eve 60 的消息。假设此时 info 一侧的状态已经有了四条记录。

join state of Flink

Join算子会查询 info 一侧 product = Potato 的记录,得到PotatoVegetable的结果,之后将 Potato, Vegetable, 60 发给下游。

而后,visit 一侧的状态会加入 Potato -> Eve, 60 的记录,这样一来,如果 info 发生变化,Join算子也能对应 visit 给下游发送Join算子的更新。

Aggregation State的存储

消息接下来被传递到了Agg算子上,Agg算子需要根据category分组,计算每个categorylength的最大值。

一些简单的Agg状态(比如sum)只需要记录每一个group当前的值就行了。上游发来insert,就将sum加上对应的值;上游发来delete,就将sum减去对应的值。所以,诸如sum、不带distinctcount等聚合表达式需要记录的状态非常小。

但对于max状态来说,我们就不能只记录最大的那个值了。如果上游发来了一条delete消息,max状态需要把第二大的值作为新的最大值发给下游。如果只记录最大值,删掉最大值以后就没法知道第二大的值是多少。因此,Agg算子需要存储一个group对应的完整数据。比如在我们的例子里,AggMaxState现在存的数据有:

Fruit -> { 10, 20, 30, 40 }
Vegetable -> { 50 }

上游Join算子发来一条插入 Potato, Vegetable, 60 的消息,Agg算子会更新自己的状态:

Fruit -> { 10, 20, 30, 40 }
Vegetable -> { 50, [60] }

并把Vegetable这一组的更新发给下游。

DELETE Vegetable, 50
INSERT Vegetable, 60

整个过程如下图所示:

aggregation state of Flink

总结

存储完整状态的流系统通常来说有这么几个特点:

  • 流计算图上单向传递数据变更的消息(添加/删除)
  • 流算子维护、访问自己的状态;与此同时,在多路Join的时候,存储的状态可能重复。后文在介绍共享状态时也会详细介绍这一点。

Shared State -算子之间共享状态

我们以 Differential Dataflow (Materialize 下面的计算引擎)Shared Arrangement为例介绍这种共享状态的实现。下文将使用DD简称Differential Dataflow

DDArrange算子与Arrangement

intro of shared arrangement

DD使用Arrangement来维护状态。简单来说,Arrangement是一个支持MVCCkey-value map数据结构,存储key(value, time, diff)的映射。在Arrangement上可以:

  • 通过handler任意查询某个时间点key-value的映射关系。
  • 查询某一个key在一段时间内的变更情况。
  • 指定查询的水位,后台合并或删除不再使用的历史数据。

DD中大部分算子都是没有状态的,所有的状态都存储在Arrangement里。Arrangement可以使用Arrange算子生成,也可以由算子(比如Reduce算子)自己维护。在DD的计算图上,有两种消息传递:

  • 数据在某一时刻的变更 (data, time, diff)。这种数据流叫做Collection
  • 数据的快照,也就是Arrangementhandler。这种数据流叫做Arranged

DD中每个算子的对自己的输入输出也有一定的要求,比如下面几个例子:

  • Map算子(对应SQLProjection)输入Collection输出Collection
  • JoinCore算子(Join的一个阶段)输入Arranged输出Collection
  • ReduceCore算子(Agg的一个阶段)输入Arranged输出Arranged

之后我们会详细介绍DD中的JoinCoreReduceCore算子。

Differential DataflowMaterialize

Materialize会将用户输入的SQL查询转换为DD的计算图。值得一提的是,join,group bySQL操作在DD中往往不会只对应一个算子。我们顺着消息的流动,看看Materialize是如何存储状态的。

plan of differential dataflow

Join State的存储

SQLA Join B操作在DD中对应三个算子:两个 Arrange 和一个 JoinCoreArrange算子根据join key分别持久化状态两个source的状态,以KV的形式存储在Arrangement中。Arrange算子对输入攒批后,将TraceHandle发给下游的 JoinCore 算子。实际的Join逻辑在 JoinCore 算子中发生,JoinCore 不存储任何状态。

join state of differential dataflow

如上图所示,现在Visit侧来了一条更新:Eve对着Potato看了60秒。JoinCore 算子通过Trace B访问到这条更新,并向另一侧的Trace A查询 product = Potato 的行,匹配到 Potato 是一种蔬菜,往下游输出 Potato, Vegetable, 60 的更改。

Reduce状态的存储

DDSQL Agg算子对应Reduce操作。Reduce中又包含两个算子:ArrangeReduceCoreArrange 算子根据group key存储输入数据,ReduceCore 算子自己维护一个存储聚合结果的Arrangement,而后通过 as_collection 操作将聚合结果输出成一个collection

aggregation state of differential dataflow

Join的更新来到Reduce算子后,先被Arrange算子根据group key存储在Arrangement中。ReduceCore收到Trace C后,将 key = Vegetable 的行全部扫描出来,并求最大值,最后将最大值更新到自己的Arrangement中。Trace D经过 as_collection 操作后,即可输出为数据更新的形式,变成其他算子可以处理的信息。

更方便的算子状态复用

由于DD中存储状态的算子和实际计算的算子是分开的,我们可以利用这个性质做算子状态的复用。

3-way join of differential dataflow

比如用户想要同时查询 A JOIN BB JOIN C,在DD中,一种可能的计算图就是生成三个Arrange算子和两个JoinCore算子。相比于存储完整状态的流处理系统,我们可以避免B的状态被存两遍

另一个例子是多路Join,比如 SELECT * FROM A, B, C WHERE A.x = B.x and A.x = C.x。在这个例子中,如果使用JoinCore算子来生成计算图,状态还是有可能重复,一共需要生成4Arrangement

MaterializeSQL Join除了被转换为上文所述DDJoinCore算子之外,也有可能转换为Delta Join。如图所示,我们只需要分别生成A, B, C3Arrangement,然后使用lookup算子查询A的修改在B C中对应的行(其他两个表的修改亦然,最后做一个union,即可得到Join的结果。Delta Join可以充分利用已有的Arrangement进行计算,大大减小Join所需的状态存储数。

远程访问状态的开销

在流系统中,计算中间产生的数据往往无法全部放在一个结点上;与此同时,同一个执行计划中的节点,也会有很多并行度。比如下面这个两表Join的例子。两个表ABArrangement可能分别在两个结点上产生(Node 1, 2),然后同时用两个结点分别对其中一部分数据做Join

remote shuffle of differential dataflow

在这个情况下,DD中势必会发生远程访问Arrangement的问题。由于算子完全没有内部状态,JoinCore每处理一行都需要一次远程访问,查找join key对应的数据。总的来说,Arrange和计算放在两个结点上会大大增加计算的延迟,放在一个结点上又无法充分利用分布式系统的资源,这是一个比较矛盾的地方。

总结

在共享状态的流处理系统中,算子的计算逻辑和存储逻辑被拆分到多个算子中。因此,不同的计算任务可以共享同一个存储,从而减少存储状态的数量。如果要实现共享状态的流处理系统,一般会有这样的特点:

  • 流计算图上传递的不仅仅是数据的变更,可能还会包括状态的共享信息(比如DDTrace Handle
  • 流算子访问状态会有一定的开销;但相对而言存储完整状态的流计算系统而言,整个流计算过程中由于状态复用,存储的状态数量更小。

Partial State -算子只存储部分信息

Noria (OSDI ‘18) 这一系统中,计算不会在数据源更新信息时触发,流处理算子并不会保存完整的信息。

比如,如果用户在之前创建的视图上执行:

SELECT * FROM result WHERE category = "Vegetable"

执行这条SQL的时候,才会触发流系统的计算。计算过程中,也只计算 category = "Vegetable" 相关的数据,保存相关的状态。下面将以这条查询为例,说明Noria的计算方式与状态存储。

Upquery

Noria的各个算子仅存储部分数据。用户的查询可能直接击中这个部分状态的缓存,也有可能需要回溯到上游查询。假设现在所有算子的状态都为空,Noria需要通过upquery来递归查询上游算子的状态,从而得到正确的结果。

upquery of Noria

用户向流计算引擎查询 category = "Vegetable" 的最大值。Agg算子为了计算出它的结果,需要知道所有category为蔬菜的记录。于是,Agg算子将这个upquery转发到上游Join算子。

Join算子要得到蔬菜对应的所有信息,需要向两个上游表分别查询情况。category属于Info表的列,因此,Join算子将这条upquery转发给Info表。

Join算子的实现

join implementation of Noria - the left side

Info table返回蔬菜分类下的所有产品后,Join算子会再发一个upquery给另一边Visit table,查询胡萝卜、土豆对应的浏览记录。

join implementation of Noria - the right side

Visit table返回对应记录后,Join算子就可以根据两次Upquery的输出计算出Join结果了。

Noria中,Join算子无需保存任何实际状态,仅需要记录正在进行的upquery即可。

Agg算子的实现

aggregation implementation of Noria

数据来到Agg算子后,Noria将直接计算出最大值,并将最大值存储在算子的状态中。在前文所述的系统里,Agg算子的状态需要保存完整的数据(水果的所有浏览记录、蔬菜的所有浏览记录Noria只需要缓存用户请求的状态,因此在这个请求中只要记录蔬菜的记录。与此同时,如果上游发生了删除操作,Noria可以直接将蔬菜对应的行删除,以便之后重新计算最大值。因此,在存储部分状态的系统中,也无需通过记录所有值的方法回推第二大的值——直接清空缓存就行了。

总结

存储部分状态的流处理系统通过upquery的方式实时响应用户的请求,在本文所述的实现中,所需要存储的状态数最少。它一般有以下特点:

  • 计算图的数据流向是双向的——既可以从上游到下游输出数据,也可以从下游到上游发upquery
  • 由于需要递归upquery,计算的延迟可能比其他状态存储方式略微大一点。
  • 数据一致性比较难实现。本文所述的其他存储方法都可以比较简单地实现最终一致;但对于存储部分状态的系统来说,需要比较小心地处理更新和upquery返回结果同时在流上传递的问题,对于每个算子都要仔细证明实现的正确性。
  • DDL / Recovery非常快。由于算子里面的信息都是按需计算的,如果用户对View进行增删列的操作,或是做迁移,都可以直接清空缓存分配新节点,无需代价较高的状态恢复。

最后对比一下所有的状态存储方式所对应的流处理系统特征:

comparison of streaming state stores

  • 存储完整状态(Flink为例):流上传递数据。
  • 共享状态存储(Materialize / DD为例):流上传递数据和snapshot
  • 存储部分状态(Noria为例):流上传递数据,流上双向都有消息。

Reference