消费者

Kafka 消费者

消息由生产者发送到 kafka 集群后,会被消费者消费。一般来说我们的消费模型有两种:推送模型(push)和拉取模型(pull)基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。如果采用 push,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。Kafka 采取拉取模型(pull),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。

消费者组(Consumer Group)是由一个或多个消费者实例(Consumer Instance)组成的群组,具有可扩展性和可容错性的一种机制。消费者组内的消费者共享一个消费者组 ID,这个 ID 也叫做 Group ID,组内的消费者共同对一个主题进行订阅和消费,同一个组中的消费者只能消费一个分区的消息,多余的消费者会闲置,派不上用场。换言之,同一主题的一条消息只能被同一个消费者组内的一个消费者消费,但多个消费者组可同时消费这一消息。

消费者组与分区

这是 Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer)和单播(发给某一个 Consumer)的手段。一个 Topic 可以对应多个 Consumer Group。如果需要实现广播,只要每个 Consumer 有一个独立的 Group 就可以了。要实现单播只要所有的 Consumer 在同一个 Group 里。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。

队列与订阅

Kafka 的消息队列一般分为两种模式:点对点队列模式和发布订阅模式。如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式:

点对点模式的消息队列

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列:

发布-订阅模式的消息队列

队列模式

队列模式,指每条消息只会有一个 Consumer 消费到。Kafka 保证同一 Consumer Group 中只有一个 Consumer 会消费某条消息。

  • 在 Consumer Group 稳定状态下,每一个 Consumer 实例只会消费某一个或多个特定 partition 的数据,而某个 partition 的数据只会被某一个特定的 Consumer 实例所消费,也就是说 Kafka 对消息的分配是以 partition 为单位分配的,而非以每一条消息作为分配单元;
  • 同一 Consumer Group 中,如果 Consumer 实例数量少于 partition 数量,则至少有一个 Consumer 会消费多个 partition 的数据;如果 Consumer 的数量与 partition 数量相同,则正好一个 Consumer 消费一个 partition 的数据;而如果 Consumer 的数量多于 partition 的数量时,会有部分 Consumer 无法消费该 Topic 下任何一条消息;

Kafka 消费者从属于消费者群组。一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分区的消息。下面是一个 Kafka 分区消费示意图:

1 个消费者消费 4 个分区消息

上图中的主题 T1 有四个分区,分别是分区 0、分区 1、分区 2、分区 3,我们创建一个消费者群组 1,消费者群组中只有一个消费者,它订阅主题 T1,接收到 T1 中的全部消息。由于一个消费者处理四个生产者发送到分区的消息,压力有些大,需要帮手来帮忙分担任务,于是就演变为下图:

2 个消费者消费 4 个分区消息

这样一来,消费者的消费能力就大大提高了,但是在某些环境下比如用户产生消息特别多的时候,生产者产生的消息仍旧让消费者吃不消,那就继续增加消费者。

4 个消费者消费 4 个分区消息

如上图所示,每个分区所产生的消息能够被每个消费者群组中的消费者消费,如果向消费者群组中增加更多的消费者,那么多余的消费者将会闲置,如下图所示

5 个消费者消费 4 个分区消息

向群组中增加消费者是横向伸缩消费能力的主要方式。总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。

发布订阅模式

发布订阅模式,又指广播模式,Kafka 保证 topic 的每条消息会被所有 Consumer Group 消费到,而对于同一个 Consumer Group,还是保证只有一个 Consumer 实例消费到这条消息。Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么就演变为下图这样:

两个消费者群组消费一个主题的消息

在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。总结起来就是如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。

分区重平衡

我们从上面的消费者演变图中可以知道这么一个过程:最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例分摊了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡,英文名也叫做 Rebalance。如下图所示:

Kafka 分区重平衡

重平衡非常重要,它为消费者群组带来了高可用性和伸缩性,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为。在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用。另外,当分区被重新分配给另一个消费者时,消息当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。

重平衡也是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点(bug),而这些 bug 到现在社区还无法修改。重平衡的过程对消费者组有极大的影响。因为每次重平衡过程中都会导致万物静止,参考 JVM 中的垃圾回收机制,也就是 Stop The World .也就是说,在重平衡期间,消费者组中的消费者实例都会停止消费,等待重平衡的完成。而且重平衡这个过程很慢。

重平衡算法

重平衡算法

重平衡流程

消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。如果过了一段时间 Kafka 停止发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡。如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息。在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿。

  • 对于每个 Consumer Group,选举出一个 Broker 作为 Coordinator(0.9 版本以上),由它 Watch Zookeeper,从而监控判断是否有 partition 或者 Consumer 的增减,然后生成 Rebalance 命令,按照以上算法重新分配。
  • 当 Consumer Group 第一次被初始化时,Consumer 通常会读取每个 partition 的最早或最近的 offset(Zookeeper 记录),然后顺序地读取每个 partition log 的消息,在 Consumer 读取过程中,它会提交已经成功处理的消息的 offsets(由 Zookeeper 记录)。
  • 当一个 partition 被重新分配给 Consumer Group 中的其他 Consumer,新的 Consumer 消费的初始位置会设置为(原来 Consumer)最近提交的 offset。

创建消费者

在读取消息之前,需要先创建一个 KafkaConsumer 对象。创建 KafkaConsumer 对象与创建 KafkaProducer 对象十分相似 — 把需要传递给消费者的属性放在 properties 对象中,后面我们会着重讨论 Kafka 的一些配置,这里我们先简单的创建一下,使用 3 个属性就足矣,分别是 bootstrap.server,key.deserializer,value.deserializer 。

还有一个属性是 group.id 这个属性不是必须的,它指定了 KafkaConsumer 是属于哪个消费者群组。创建不属于任何一个群组的消费者也是可以的:

Properties properties = new Properties();
properties.put("bootstrap.server", "192.168.1.9:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer < String, String > consumer = new KafkaConsumer < > (properties);

主题订阅

创建好消费者之后,下一步就开始订阅主题了。subscribe() 方法接受一个主题列表作为参数,使用起来比较简单:

consumer.subscribe(Collections.singletonList("customerTopic"));

为了简单我们只订阅了一个主题 customerTopic,参数传入的是一个正则表达式,正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式相匹配,那么会立即触发一次重平衡,消费者就可以读取新的主题。

要订阅所有与 test 相关的主题,可以这样做:

consumer.subscribe("test.*");

轮询

我们知道,Kafka 是支持订阅/发布模式的,生产者发送数据给 Kafka Broker,那么消费者是如何知道生产者发送了数据呢?其实生产者产生的数据消费者是不知道的,KafkaConsumer 采用轮询的方式定期去 Kafka Broker 中进行数据的检索,如果有数据就用来消费,如果没有就再继续轮询等待,下面是轮询等待的具体实现:

try {
    while (true) {
        ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(100));
        for (ConsumerRecord < String, String > record: records) {
            int updateCount = 1;
            if (map.containsKey(record.value())) {
                updateCount = (int) map.get(record.value() + 1);
            }
            map.put(record.value(), updateCount);
        }
    }
} finally {
    consumer.close();
}
  • 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过轮询的方式向 Kafka 请求数据。
  • 第三行代码非常重要,Kafka 必须定期循环请求数据,否则就会认为该 Consumer 已经挂了,会触发重平衡,它的分区会移交给群组中的其它消费者。传给 poll() 方法的是一个超市时间,用 java.time.Duration 类来表示,如果该参数被设置为 0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待 broker 返回数据。
  • poll() 方法会返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息、记录在分区中的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理每条记录。
  • 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次重平衡,而不是等待群组协调器发现它不再发送心跳并认定它已经死亡。

提交和偏移量的概念

The Consumer&rsquo;s Position in the Log

如图,Last Commited Offset 指 Consumer 最近一次提交的消费记录 offset,Current Position 是当前消费的位置,High Watermark 是成功拷贝到 log 的所有副本节点(partition 的所有 ISR 节点,下文介绍)的最近消息的 offset,Log End Offset 是写入 log 中最后一条消息的 offset+1。

从 Consumer 的角度来看,最多只能读取到 High watermark 的位置,后面的消息对消费者不可见,因为未完全复制的数据还没可靠存储,有丢失可能。

特殊偏移

我们上面提到,消费者在每次调用 poll() 方法进行定时轮询的时候,会返回由生产者写入 Kafka 但是还没有被消费者消费的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。消费者可以使用 Kafka 来追踪消息在分区中的位置(偏移量)

消费者会向一个叫做 _consumer_offset 的特殊主题中发送消息,这个主题会保存每次所发送消息中的分区偏移量,这个主题的主要作用就是消费者触发重平衡后记录偏移使用的,消费者每次向这个主题发送消息,正常情况下不触发重平衡,这个主题是不起作用的,当触发重平衡后,消费者停止工作,每个消费者可能会分到对应的分区,这个主题就是让消费者能够继续处理消息所设置的。

如果提交的偏移量小于客户端最后一次处理的偏移量,那么位于两个偏移量之间的消息就会被重复处理:

重平衡后,消息被重复消费

如果提交的偏移量大于最后一次消费时的偏移量,那么处于两个偏移量中间的消息将会丢失,既然_consumer_offset 如此重要,那么它的提交方式是怎样的呢?下面我们就来说一下提交方式,KafkaConsumer API 提供了多种方式来提交偏移量。

自动提交

最简单的方式就是让消费者自动提交偏移量。如果 enable.auto.commit 被设置为 true,那么每过 5s,消费者会自动把从 poll() 方法轮询到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认是 5s。与消费者里的其他东西一样,自动提交也是在轮询中进行的。消费者在每次轮询中会检查是否提交该偏移量了,如果是,那么就会提交从上一次轮询中返回的偏移量。

提交当前偏移量

把 auto.commit.offset 设置为 false,可以让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

commitSync() 将会提交由 poll() 返回的最新偏移量,如果处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险,如果发生了在均衡,从最近一批消息到发生在均衡之间的所有消息都将被重复处理。

异步提交

异步提交 commitAsync() 与同步提交 commitSync() 最大的区别在于异步提交不会进行重试,同步提交会一致进行重试。

同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

因此,在消费者关闭之前一般会组合使用 commitAsync 和 commitSync 提交偏移量。

提交特定的偏移量

消费者 API 允许调用 commitSync() 和 commitAsync() 方法时传入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

上一页