分区日志

分区日志

通过网络发送数据包或向网络服务发送请求通常是短暂的操作,不会留下永久的痕迹。尽管可以永久记录(通过抓包与日志),但我们通常不这么做。即使是将消息持久地写入磁盘的消息代理,在送达给消费者之后也会很快删除消息,因为它们建立在短暂消息传递的思维方式上。数据库和文件系统采用截然相反的方法论:至少在某人显式删除前,通常写入数据库或文件的所有内容都要被永久记录下来。

这种思维方式上的差异对创建衍生数据的方式有巨大影响。批处理过程的一个关键特性是,你可以反复运行它们,试验处理步骤,不用担心损坏输入(因为输入是只读的)。而 AMQP/JMS 风格的消息传递并非如此:收到消息是具有破坏性的,因为确认可能导致消息从代理中被删除,因此你不能期望再次运行同一个消费者能得到相同的结果。如果你将新的消费者添加到消息系统,通常只能接收到消费者注册之后开始发送的消息。先前的任何消息都随风而逝,一去不复返。作为对比,你可以随时为文件和数据库添加新的客户端,且能读取任意久远的数据(只要应用没有显式覆盖或删除这些数据)。

为什么我们不能把它俩杂交一下,既有数据库的持久存储方式,又有消息传递的低延迟通知?这就是基于日志的消息代理(log-based message brokers)背后的想法。

使用日志进行消息存储

日志只是磁盘上简单的仅追加记录序列,在《数据库》中我们也讨论了日志结构存储引擎和预写式日志的应用。同样的结构可以用于实现消息代理:生产者通过将消息追加到日志末尾来发送消息,而消费者通过依次读取日志来接收消息。如果消费者读到日志末尾,则会等待新消息追加的通知。Unix 工具 tail -f 能监视文件被追加写入的数据,基本上就是这样工作的。

为了扩展到比单个磁盘所能提供的更高吞吐量,可以对日志进行分区,不同的分区可以托管在不同的机器上,且每个分区都拆分出一份能独立于其他分区进行读写的日志。一个主题可以定义为一组携带相同类型消息的分区。在每个分区内,代理为每个消息分配一个单调递增的序列号或偏移量(offset),这种序列号是有意义的,因为分区是仅追加写入的,所以分区内的消息是完全有序的。没有跨不同分区的顺序保证。

生产者通过将消息追加写入主题分区文件来发送消息,消费者依次读取这些文件

Apache Kafka,Amazon Kinesis Streams 和 Twitter 的 DistributedLog 都是基于日志的消息代理。Google Cloud Pub/Sub 在架构上类似,但对外暴露的是 JMS 风格的 API,而不是日志抽象。尽管这些消息代理将所有消息写入磁盘,但通过跨多台机器分区,每秒能够实现数百万条消息的吞吐量,并通过复制消息来实现容错性。

日志与传统消息相比

基于日志的方法天然支持扇出式消息传递,因为多个消费者可以独立读取日志,而不会相互影响:读取消息不会将其从日志中删除。为了在一组消费者之间实现负载平衡,代理可以将整个分区分配给消费者组中的节点,而不是将单条消息分配给消费者客户端。

每个客户端消费指派分区中的所有消息。然后使用分配的分区中的所有消息。通常情况下,当一个用户被指派了一个日志分区时,它会以简单的单线程方式顺序地读取分区中的消息。这种粗粒度的负载均衡方法有一些缺点:

  • 共享消费主题工作的节点数,最多为该主题中的日志分区数,因为同一个分区内的所有消息被递送到同一个节点。
  • 如果某条消息处理缓慢,则它会阻塞该分区中后续消息的处理。

因此在消息处理代价高昂,希望逐条并行处理,以及消息的顺序并没有那么重要的情况下,JMS/AMQP 风格的消息代理是可取的。另一方面,在消息吞吐量很高,处理迅速,顺序很重要的情况下,基于日志的方法表现得非常好。

消费者偏移量

顺序消费一个分区使得判断消息是否已经被处理变得相当容易:所有偏移量小于消费者的当前偏移量的消息已经被处理,而具有更大偏移量的消息还没有被看到。因此,代理不需要跟踪确认每条消息,只需要定期记录消费者的偏移即可。在这种方法减少了额外簿记开销,而且在批处理和流处理中采用这种方法有助于提高基于日志的系统的吞吐量。

实际上,这种偏移量与单领导者数据库复制中常见的日志序列号非常相似。在数据库复制中,日志序列号允许跟随者断开连接后,重新连接到领导者,并在不跳过任何写入的情况下恢复复制。这里原理完全相同:消息代理的表现得像一个主库,而消费者就像一个从库。

如果消费者节点失效,则失效消费者的分区将指派给其他节点,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续的消息,但还没有记录它们的偏移量,那么重启后这些消息将被处理两次。

磁盘空间使用

如果只追加写入日志,则磁盘空间终究会耗尽。为了回收磁盘空间,日志实际上被分割成段,并不时地将旧段删除或移动到归档存储。这就意味着如果一个慢消费者跟不上消息产生的速率而落后的太多,它的消费偏移量指向了删除的段,那么它就会错过一些消息。实际上,日志实现了一个有限大小的缓冲区,当缓冲区填满时会丢弃旧消息,它也被称为循环缓冲区(circular buffer)或环形缓冲区(ring buffer)。不过由于缓冲区在磁盘上,因此可能相当的大。

让我们做个简单计算。在撰写本文时,典型的大型硬盘容量为 6TB,顺序写入吞吐量为 150MB/s。如果以最快的速度写消息,则需要大约 11 个小时才能填满磁盘。因而磁盘可以缓冲 11 个小时的消息,之后它将开始覆盖旧的消息。即使使用多个磁盘和机器,这个比率也是一样的。实践中的部署很少能用满磁盘的写入带宽,所以通常可以保存一个几天甚至几周的日志缓冲区。

不管保留多长时间的消息,日志的吞吐量或多或少保持不变,因为无论如何,每个消息都会被写入磁盘。这种行为与默认将消息保存在内存中,仅当队列太长时才写入磁盘的消息传递系统形成鲜明对比。当队列很短时,这些系统非常快;而当这些系统开始写入磁盘时,就要慢的多,所以吞吐量取决于保留的历史数量。

当消费者跟不上生产者时

《微服务调用》中我们介绍过如果消费者无法跟上生产者发送信息的速度时,我们讨论了三种选择:丢弃信息,进行缓冲或施加背压。在这种分类法里,基于日志的方法是缓冲的一种形式,具有很大,但大小固定的缓冲区(受可用磁盘空间的限制)。如果消费者远远落后,而所要求的信息比保留在磁盘上的信息还要旧,那么它将不能读取这些信息,所以代理实际上丢弃了比缓冲区容量更大的旧信息。你可以监控消费者落后日志头部的距离,如果落后太多就发出报警。由于缓冲区很大,因而有足够的时间让人工运维来修复慢消费者,并在消息开始丢失之前让其赶上。

即使消费者真的落后太多开始丢失消息,也只有那个消费者受到影响;它不会中断其他消费者的服务。这是一个巨大的运维优势:你可以实验性地消费生产日志,以进行开发,测试或调试,而不必担心会中断生产服务。当消费者关闭或崩溃时,会停止消耗资源,唯一剩下的只有消费者偏移量。这种行为也与传统的信息代理形成了鲜明对比,在那种情况下,你需要小心地删除那些消费者已经关闭的队列,否则那些队列就会累积不必要的消息,从其他仍活跃的消费者那里占走内存。

重播旧信息

我们之前提到,使用 AMQP 和 JMS 风格的消息代理,处理和确认消息是一个破坏性的操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,使用消息更像是从文件中读取数据:这是只读操作,不会更改日志。

除了消费者的任何输出之外,处理的唯一副作用是消费者偏移量的前进。但偏移量是在消费者的控制之下的,所以如果需要的话可以很容易地操纵:例如你可以用昨天的偏移量跑一个消费者副本,并将输出写到不同的位置,以便重新处理最近一天的消息。你可以使用各种不同的处理代码重复任意次。

这一方面使得基于日志的消息传递更像上一章的批处理,其中衍生数据通过可重复的转换过程与输入数据显式分离。它允许进行更多的实验,更容易从错误和漏洞中恢复,使其成为在组织内集成数据流的良好工具。

下一页