数据汇集层

数据汇集层

数据汇集层在部分场景下又称为变更分发平台。当各类数据从源端抽取后,首先应当被写入一个数据汇集层,然后再进行后继的转换处理,直至将最终结果写入目的地。数据汇集层的作用主要有两点:

  • 数据汇集层将异构的数据源数据存储为统一的格式,并且为后继的处理提供一致的访问接口。这就将处理逻辑和数据源解耦开来,同时屏蔽了数据抽取过程中可能发生的异常对后继作业的影响。

  • 数据汇集层独立于数据源,可被多次访问,亦可根据业务需要缓存全部或一定期限的原始数据,这为转换分析提供了更高的灵活度。当业务需求发生变化时,无需重复读取源端数据,直接基于数据汇集层就可以开发新的模型和应用。数据汇集层可基于任意支持海量 / 高可用的文件系统、数据仓库或者消息队列构建,常见的方案包括 HDFS、HBase、Kafka 等。

数据汇集层的技术考量

变更分发平台可以有很多种形式,本质上它只是一个存储变更的中间件,那么如何进行选型呢?首先由于变更数据数据量级大,且操作时没有事务需求,所以先排除了关系型数据库,剩下的 NoSQL 如 Cassandra,mq 如 Kafka、RabbitMQ 都可以胜任。其区别在于,消费端到分发平台拉取变更时,假如是 NoSQL 的实现,那么就能很容易地实现条件过滤等操作(比如某个客户端只对特定字段为 true 的消息感兴趣); 但 NoSQL 的实现往往会在吞吐量和一致性上输给 mq。这里就是一个设计抉择的问题,最终我们选择了 mq,主要考虑的点是:消费端往往是无状态应用,很容易进行水平扩展,因此假如有条件过滤这样的需求,我们更希望把这样的计算压力放在消费端上。

而在 mq 里,Kafka 则显得具有压倒性优势。Kafka 本身就有大数据的基因,通常被认为是目前吞吐量最大的消息队列,同时,使用 Kafka 有一项很适合该场景的特性:Log Compaction。Kafka 默认的过期清理策略(log.cleanup.policy)是 delete,也就是删除过期消息,配置为 compact 则可以启用 Log Compaction 特性,这时 Kafka 不再删除过期消息,而是对所有过期消息进行”折叠”:对于 key 相同的所有消息会,保留最新的一条。

对应的在 mq 中的流总共会产生 4 条变更消息,而最下面两条分别是 id:1 id:2 下的最新记录,在它们之前的两条 INSERT 引起的变更就会被 Kafka 删除,最终我们在 Kafka 中看到的就是两行记录的最新状态,而一个持续订阅该流的消费者则能收到全部 4 条记录。这种行为有一个有趣的名字,流表二相性(Stream Table Durability):Topic 中有无尽的变更消息不断被写入,这是流的特质;而 Topic 某一时刻的状态,恰恰是该时刻对应的数据表的一个快照(参见上面的例子),每条新消息的到来相当于一次 Upsert,这又是表的特性。落到实践中来讲,Log Compaction 对于我们的场景有一个重要应用:全量数据迁移与数据补偿,我们可以直接编写针对每条变更数据的处理程序,就能兼顾全量迁移与之后的增量同步两个过程;而在数据异常时,我们可以重新回放整个 Kafka Topic:该 Topic 就是对应表的快照,针对上面的例子,我们回放时只会读到最新的两条消息,不需要读全部四条消息也能保证数据正确。

关于 Kafka 作为变更分发平台,最后要说的就是消费顺序的问题。大家都知道 Kafka 只能保证单个 Partition 内消息有序,而对于整个 Topic,消息是无序的。一般的认知是,数据变更的消费为了逻辑的正确性,必须按序消费。按着这个逻辑,我们的 Topic 只能有单个 Partition,这就大大牺牲了 Kafka 的扩展性与吞吐量。其实这里有一个误区,对于数据库变更抓取,我们只要保证 同一行记录的变更有序 就足够了。还是上面的例子,我们只需要保证对 id:2 这行的 insert 消息先于 update 消息,该行数据最后就是正确的。而实现”同一行记录变更有序”就简单多了,Kafka Producer 对带 key 的消息默认使用 key 的 hash 决定分片,因此只要用数据行的主键作为消息的 key,所有该行的变更都会落到同一个 Parition 上,自然也就有序了。这有一个要求就是 CDC 模块必须解析出变更数据的主键:而这点 Debezium 已经帮助我们解决了。

统一数据格式

数据格式的选择同样十分重要。首先想到的当然是 json, 目前最常见的消息格式,不仅易读,开发也都对它十分熟悉。但 json 本身有一个很大的不足,那就是契约性太弱,它的结构可以随意更改:试想假如有一个接口返回 String,注释上说这是个 json,那我们该怎么编写对应的调用代码呢?是不是需要翻接口文档,提前获知这段 json 的 schema,然后才能开始编写代码,并且这段代码随时可能会因为这段 json 的格式改变而 break。

在规模不大的系统中,这个问题并不显著。但假如在一个拥有上千种数据格式的数据管道上工作,这个问题就会很麻烦,首先当你订阅一个变更 topic 时,你完全处于懵逼状态——不知道这个 topic 会给你什么,当你经过文档的洗礼与不断地调试终于写完了客户端代码,它又随时会因为 topic 中的消息格式变更而挂掉。参考 Yelp 和 Linkedin 的选择,我们决定使用 Apache Avro 作为统一的数据格式。Avro 依赖模式 Schema 来实现数据结构定义,而 Schema 通常使用 json 格式进行定义,一个典型的 Schema 如下:这里要介绍一点背景知识,Avro 的一个重要特性就是支持 Schema 演化,它定义了一系列的演化规则,只要符合该规则,使用不同的 Schema 也能够正常通信。也就是说,使用 Avro 作为数据格式进行通信的双方是有自由更迭 Schema 的空间的。

在我们的场景中,数据库表的 Schema 变更会引起对应的变更数据 Schema 变更,而每次进行数据库表 Schema 变更就更新下游消费端显然是不可能的。所以这时候 Avro 的 Schema 演化机制就很重要了。我们做出约定,同一个 Topic 上传输的消息,其 Avro Schema 的变化必须符合演化规则,这么一来,消费者一旦开始正常消费之后就不会因为消息的 Schema 变化而挂掉。

下一页