消费者
Kafka 消费者
消息由生产者发送到
消费者组(Consumer Group)是由一个或多个消费者实例(Consumer Instance)组成的群组,具有可扩展性和可容错性的一种机制。消费者组内的消费者共享一个消费者组

这是
队列与订阅

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列:

队列模式
队列模式,指每条消息只会有一个
- 在
Consumer Group 稳定状态下,每一个Consumer 实例只会消费某一个或多个特定partition 的数据,而某个partition 的数据只会被某一个特定的Consumer 实例所消费,也就是说Kafka 对消息的分配是以partition 为单位分配的,而非以每一条消息作为分配单元; - 同一
Consumer Group 中,如果Consumer 实例数量少于partition 数量,则至少有一个Consumer 会消费多个partition 的数据;如果Consumer 的数量与partition 数量相同,则正好一个Consumer 消费一个partition 的数据;而如果Consumer 的数量多于partition 的数量时,会有部分Consumer 无法消费该Topic 下任何一条消息;

上图中的主题

这样一来,消费者的消费能力就大大提高了,但是在某些环境下比如用户产生消息特别多的时候,生产者产生的消息仍旧让消费者吃不消,那就继续增加消费者。

如上图所示,每个分区所产生的消息能够被每个消费者群组中的消费者消费,如果向消费者群组中增加更多的消费者,那么多余的消费者将会闲置,如下图所示

向群组中增加消费者是横向伸缩消费能力的主要方式。总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。
发布订阅模式
发布订阅模式,又指广播模式,

在这个场景中,消费组
分区重平衡
我们从上面的消费者演变图中可以知道这么一个过程:最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例分摊了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡,英文名也叫做

重平衡非常重要,它为消费者群组带来了高可用性和伸缩性,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为。在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用。另外,当分区被重新分配给另一个消费者时,消息当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
重平衡也是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点
重平衡算法

重平衡流程
消费者通过向组织协调者(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。如果过了一段时间
- 对于每个
Consumer Group ,选举出一个Broker 作为Coordinator (0.9 版本以上) ,由它Watch Zookeeper ,从而监控判断是否有partition 或者Consumer 的增减,然后生成Rebalance 命令,按照以上算法重新分配。 - 当
Consumer Group 第一次被初始化时,Consumer 通常会读取每个partition 的最早或最近的offset (Zookeeper 记录) ,然后顺序地读取每个partition log 的消息,在Consumer 读取过程中,它会提交已经成功处理的消息的offsets (由Zookeeper 记录) 。 - 当一个
partition 被重新分配给Consumer Group 中的其他Consumer ,新的Consumer 消费的初始位置会设置为( 原来Consumer) 最近提交的offset 。
创建消费者
在读取消息之前,需要先创建一个
还有一个属性是
Properties properties = new Properties();
properties.put("bootstrap.server", "192.168.1.9:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer < String, String > consumer = new KafkaConsumer < > (properties);
主题订阅
创建好消费者之后,下一步就开始订阅主题了。
consumer.subscribe(Collections.singletonList("customerTopic"));
为了简单我们只订阅了一个主题
要订阅所有与
consumer.subscribe("test.*");
轮询
我们知道,
try {
while (true) {
ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord < String, String > record: records) {
int updateCount = 1;
if (map.containsKey(record.value())) {
updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
} finally {
consumer.close();
}
- 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过轮询的方式向
Kafka 请求数据。 - 第三行代码非常重要,
Kafka 必须定期循环请求数据,否则就会认为该Consumer 已经挂了,会触发重平衡,它的分区会移交给群组中的其它消费者。传给poll()
方法的是一个超市时间,用java.time.Duration
类来表示,如果该参数被设置为0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待broker 返回数据。 poll() 方法会返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息、记录在分区中的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理每条记录。- 在退出应用程序之前使用
close()
方法关闭消费者。网络连接和socket 也会随之关闭,并立即触发一次重平衡,而不是等待群组协调器发现它不再发送心跳并认定它已经死亡。
提交和偏移量的概念

如图,
从
特殊偏移
我们上面提到,消费者在每次调用
消费者会向一个叫做 _consumer_offset
的特殊主题中发送消息,这个主题会保存每次所发送消息中的分区偏移量,这个主题的主要作用就是消费者触发重平衡后记录偏移使用的,消费者每次向这个主题发送消息,正常情况下不触发重平衡,这个主题是不起作用的,当触发重平衡后,消费者停止工作,每个消费者可能会分到对应的分区,这个主题就是让消费者能够继续处理消息所设置的。
如果提交的偏移量小于客户端最后一次处理的偏移量,那么位于两个偏移量之间的消息就会被重复处理:

如果提交的偏移量大于最后一次消费时的偏移量,那么处于两个偏移量中间的消息将会丢失,既然_consumer_offset
如此重要,那么它的提交方式是怎样的呢?下面我们就来说一下提交方式,
自动提交
最简单的方式就是让消费者自动提交偏移量。如果
提交当前偏移量
把
异步提交
异步提交
同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。
因此,在消费者关闭之前一般会组合使用
提交特定的偏移量
消费者