Kafka-CheatSheet
Kafka CheatSheet
Kafka 简介
一、简介
- 支持消息的发布和订阅,类似于
RabbtMQ 、ActiveMQ 等消息队列; - 支持数据实时处理;
- 能保证消息的可靠性投递;
- 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错;
- 高吞吐率,单
Broker 可以轻松处理数千个分区以及每秒百万级的消息量。
二、基本概念
2.1 Messages And Batches
2.2 Topics And Partitions
由于一个

2.3 Producers And Consumers
1. 生产者
生产者负责创建消息。一般情况下,生产者在把消息均衡地分布到在主题的所有分区上,而并不关心消息会被写到哪个分区。如果我们想要把消息写到指定的分区,可以通过自定义分区器来实现。
2. 消费者
消费者是消费者群组的一部分,消费者负责消费消息。消费者可以订阅一个或者多个主题,并按照消息生成的顺序来读取它们。消费者通过检查消息的偏移量

一个分区只能被同一个消费者群组里面的一个消费者读取,但可以被不同消费者群组中所组成的多个消费者共同读取。多个消费者群组中消费者共同读取同一个主题时,彼此之间互不影响。

2.4 Brokers And Clusters
一个独立的
在集群中,一个分区

Kafka 生产者详解
一、生产者发送消息的过程
首先介绍一下
Kafka 会将发送消息包装为ProducerRecord 对象,ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。- 接下来,数据被传给分区器。如果之前已经在
ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker 上。 - 服务器在收到这些消息时会返回一个响应。如果消息成功写入
Kafka ,就返回一个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

二、创建生产者
2.1 项目依赖
本项目采用kafka-clients
依赖,如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
2.2 创建生产者
创建
- bootstrap.servers :指定
broker 的地址清单,清单里不需要包含所有的broker 地址,生产者会从给定的broker 里查找broker 的信息。不过建议至少要提供两个broker 的信息作为容错; - key.serializer :指定键的序列化器;
- value.serializer :指定值的序列化器。
创建的示例代码如下:
public class SimpleProducer {
public static void main(String[] args) {
String topicName = "Hello-Kafka";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i,
"world" + i);
/* 发送消息*/
producer.send(record);
}
/*关闭生产者*/
producer.close();
}
}
本篇文章的所有示例代码可以从
Github 上进行下载:kafka-basis
2.3 测试
1. 启动Kakfa
# zookeeper启动命令
bin/zkServer.sh start
# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
启动单节点
# bin/kafka-server-start.sh config/server.properties
2. 创建topic
# 创建用于测试主题
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 1 \
--topic Hello-Kafka
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
3. 启动消费者
启动一个控制台消费者用于观察写入情况,启动命令如下:
# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning
4. 运行项目
此时可以看到消费者控制台,输出如下,这里 kafka-console-consumer
只会打印出值信息,不会打印出键信息。

2.4 可能出现的问题
在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动server.properties
文件中的 listeners
配置进行更改:
# hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://hadoop001:9092
二、发送消息
上面的示例程序调用了 send
方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。
2.1 同步发送
在调用 send
方法后可以接着调用 get()
方法,send
方法的返回值是一个
for (int i = 0; i < 10; i++) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*同步发送消息*/
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了Hello-Kafka
主题时候,使用 --partitions
指定其分区数为
topic=Hello-Kafka, partition=0, offset=40
topic=Hello-Kafka, partition=0, offset=41
topic=Hello-Kafka, partition=0, offset=42
topic=Hello-Kafka, partition=0, offset=43
topic=Hello-Kafka, partition=0, offset=44
topic=Hello-Kafka, partition=0, offset=45
topic=Hello-Kafka, partition=0, offset=46
topic=Hello-Kafka, partition=0, offset=47
topic=Hello-Kafka, partition=0, offset=48
topic=Hello-Kafka, partition=0, offset=49
2.2 异步发送
通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*异步发送消息,并监听回调*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("进行异常处理");
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}
三、自定义分区器
- 如果键值为
null , 则使用轮询(Round Robin) 算法将消息均衡地分布到各个分区上; - 如果键值不为
null ,那么Kafka 会使用内置的散列算法对键进行散列,然后分布到各个分区上。
某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例:
3.1 自定义分区器
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {
private int passLine;
@Override
public void configure(Map<String, ?> configs) {
/*从生产者配置中获取分数线*/
passLine = (Integer) configs.get("pass.line");
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
/*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/
return (Integer) key >= passLine ? 1 : 0;
}
@Override
public void close() {
System.out.println("分区器关闭");
}
}
需要在创建生产者时指定分区器,和分区器所需要的配置参数:
public class ProducerWithPartitioner {
public static void main(String[] args) {
String topicName = "Kafka-Partitioner-Test";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*传递自定义分区器*/
props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
/*传递分区器所需的参数*/
props.put("pass.line", 6);
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
String score = "score:" + i;
ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
/*异步发送消息*/
producer.send(record, (metadata, exception) ->
System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
}
producer.close();
}
}
3.2 测试
需要创建一个至少有两个分区的主题:
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 2 \
--topic Kafka-Partitioner-Test
此时输入如下,可以看到分数大于等于
score:6, partition=1,
score:7, partition=1,
score:8, partition=1,
score:9, partition=1,
score:10, partition=1,
score:0, partition=0,
score:1, partition=0,
score:2, partition=0,
score:3, partition=0,
score:4, partition=0,
score:5, partition=0,
分区器关闭
四、生产者其他属性
上面生产者的创建都仅指定了服务地址,键序列化器、值序列化器,实际上
1. acks
- acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
2. buffer.memory
设置生产者内存缓冲区的大小。
3. compression.type
默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有
4. retries
发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。
5. batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
6. linger.ms
该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。
7. clent.id
客户端
8. max.in.flight.requests.per.connection
指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为
9. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms
timeout.ms 指定了borker 等待同步副本返回消息的确认时间;request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。
10. max.block.ms
指定了在调用 send()
方法或使用 partitionsFor()
方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到
11. max.request.size
该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为
12. receive.buffer.bytes & send.buffer.byte
这两个参数分别指定
Kafka 消费者详解
一、消费者和消费者群组
在

需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费者读取,不可能存在同一个分区被同一个消费者群里多个消费者共同读取的情况,如图:

可以看到即便消费者
二、分区再均衡
因为群组里的消费者共同读取主题的分区,所以当一个消费者被关闭或发生崩溃时,它就离开了群组,原本由它读取的分区将由群组里的其他消费者来读取。同时在主题发生变化时 , 比如添加了新的分区,也会发生分区与消费者的重新分配,分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。正是因为再均衡,所以消费费者群组才能保证高可用性和伸缩性。
消费者通过向群组协调器所在的
三、创建Kafka 消费者
在创建消费者的时候以下以下三个选项是必选的:
- bootstrap.servers :指定
broker 的地址清单,清单里不需要包含所有的broker 地址,生产者会从给定的broker 里查找broker 的信息。不过建议至少要提供两个broker 的信息作为容错; - key.deserializer :指定键的反序列化器;
- value.deserializer :指定值的反序列化器。
除此之外你还需要指明你需要想订阅的主题,可以使用如下两个
- consumer.subscribe(Collection<String> topics) :指明需要订阅的主题的集合;
- consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。
最后只需要通过轮询poll
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/*订阅主题 (s)*/
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
/*轮询获取数据*/
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} finally {
consumer.close();
}
本篇文章的所有示例代码可以从
Github 上进行下载:kafka-basis
三、 自动提交偏移量
3.1 偏移量的重要性
_consumer_offset
的特殊主题发送消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有
什么用处。不过,如果有消费者退出或者新分区加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。 因为这个原因,所以如果不能正确提交偏移量,就可能会导致数据丢失或者重复出现消费,比如下面情况:
- 如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复消费;
- 如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
3.2 自动提交偏移量
只需要将消费者的 enable.auto.commit
属性配置为 true
即可完成自动提交的配置。 此时每隔固定的时间,消费者就会把 poll()
方法接收到的最大偏移量进行提交,提交间隔由 auto.commit.interval.ms
属性进行配置,默认值是
使用自动提交是存在隐患的,假设我们使用默认的
四、手动提交偏移量
用户可以通过将 enable.auto.commit
设为 false
,然后手动提交偏移量。基于用户需求手动提交偏移量可以分为两大类:
- 手动提交当前偏移量:即手动提交当前轮询的最大偏移量;
- 手动提交固定偏移量:即按照业务需求,提交某一个固定的偏移量。
而按照
4.1 同步提交
通过调用 consumer.commitSync()
来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
/*同步提交*/
consumer.commitSync();
}
如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量。基于这个原因,
4.2 异步提交
异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
/*异步提交并定义回调*/
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("错误处理");
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
x.topic(), x.partition(), y.offset()));
}
}
});
}
异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序同时提交了
注:虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试的,你可以通过一个
Map<TopicPartition, Integer> offsets 来维护你提交的每个分区的偏移量,然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。
4.3 同步加异步提交
下面这种情况,在正常的轮询中使用异步提交来保证吞吐量,但是因为在最后即将要关闭消费者了,所以此时需要用同步提交来保证最大限度的提交成功。
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因为即将要关闭消费者,所以要用同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}
4.4 提交特定偏移量
在上面同步和异步提交的
/*同步提交特定偏移量*/
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
/*异步提交特定偏移量*/
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
需要注意的是,因为你可以订阅多个主题,所以 offsets
中必须要包含所有主题的每个分区的偏移量,示例代码如下:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
/*记录每个主题的每个分区的偏移量*/
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData");
/*TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加*/
offsets.put(topicPartition, offsetAndMetadata);
}
/*提交特定偏移量*/
consumer.commitAsync(offsets, null);
}
} finally {
consumer.close();
}
五、监听分区再均衡
因为分区再均衡会导致分区与消费者的重新划分,有时候你可能希望在再均衡前执行一些操作:比如提交已经处理但是尚未提交的偏移量,关闭数据库连接等。此时可以在订阅主题时候,调用 subscribe
的重载方法传入自定义的分区再均衡监听器。
/*订阅指定集合内的所有主题*/
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
/*使用正则匹配需要订阅的主题*/
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
代码示例如下:
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
/*该方法会在消费者停止读取消息之后,再均衡开始之前就调用*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("再均衡即将触发");
// 提交已经处理的偏移量
consumer.commitSync(offsets);
}
/*该方法会在重新分配分区之后,消费者开始读取消息之前被调用*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData");
/*TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加*/
offsets.put(topicPartition, offsetAndMetadata);
}
consumer.commitAsync(offsets, null);
}
} finally {
consumer.close();
}
六 、退出轮询
consumer.wakeup()
方法用于退出轮询,它通过抛出 WakeupException
异常来跳出循环。需要注意的是,在退出线程时最好显示的调用consumer.close()
下面的示例代码为监听控制台输出,当输入 exit
时结束轮询,关闭消费者并退出程序:
/*调用 wakeup 优雅的退出*/
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
/*等待主线程完成提交偏移量、关闭消费者等操作*/
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> rd : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
}
}
} catch (WakeupException e) {
//对于 wakeup() 调用引起的 WakeupException 异常可以不必处理
} finally {
consumer.close();
System.out.println("consumer 关闭");
}
七、独立的消费者
因为
在这种情况下,就不需要订阅主题, 取而代之的是消费者为自己分配分区。 一个消费者可以订阅主题(井加入消费者群组
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
/*可以指定读取哪些分区 如这里假设只读取主题的 0 分区*/
for (PartitionInfo partition : partitionInfos) {
if (partition.partition()==0){
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
}
// 为消费者指定分区
consumer.assign(partitions);
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<Integer, String> record : records) {
System.out.printf("partition = %s, key = %d, value = %s\n",
record.partition(), record.key(), record.value());
}
consumer.commitSync();
}
附录: Kafka 消费者可选属性
1. fetch.min.byte
消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,
2. fetch.max.wait.ms
3. max.partition.fetch.bytes
该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为
4. session.timeout.ms
消费者在被认为死亡之前可以与服务器断开连接的时间,默认是
5. auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
latest ( 默认值) :在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的最新记录);- earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
6. enable.auto.commit
是否自动提交偏移量,默认值是
7. client.id
客户端
8. max.poll.records
单次调用 poll()
方法能够返回的记录数量。
9. receive.buffer.bytes & send.buffer.byte
这两个参数分别指定
深入理解Kafka 副本机制
一、Kafka 集群
broker.id
,用于标识自己在集群中的身份,可以在配置文件 server.properties
中进行配置,或者由程序自动生成。下面是
- 每一个
broker 启动的时候,它会在Zookeeper 的/brokers/ids
路径下创建一个临时节点
,并将自己的broker.id
写入,从而将自身注册到集群; - 当有多个
broker 时,所有broker 会竞争性地在Zookeeper 上创建/controller
节点,由于Zookeeper 上的节点不会重复,所以必然只会有一个broker 创建成功,此时该broker 称为controller broker 。它除了具备其他broker 的功能外,还负责管理主题分区及其副本的状态。 - 当
broker 出现宕机或者主动退出从而导致其持有的Zookeeper 会话超时时,会触发注册在Zookeeper 上的watcher 事件,此时Kafka 会进行相应的容错处理;如果宕机的是controller broker 时,还会触发新的controller 选举。
二、副本机制
为了保证高可用,controller broker
来进行专门的管理。下面将详解介绍
2.1 分区和副本
replication-factor
参数进行指定

2.2 ISR 机制
每个分区都有一个
- 与
Zookeeper 之间有一个活跃的会话,即必须定时向Zookeeper 发送心跳; - 在规定的时间内从首领副本那里低延迟地获取过消息。
如果副本不满足上面条件的话,就会被从
这里给出一个主题创建的示例:使用 --replication-factor
指定副本系数为--describe
命令可以看到分区

2.3 不完全的首领选举
对于副本机制,在unclean.leader.election.enable
,默认值为
2.4 最少同步副本
min.insync.replicas
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
2.5 发送确认
- acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
- acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
- acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
三、数据请求
3.1 元数据请求机制
在所有副本中,只有领导副本才能进行消息的读写处理。由于不同分区的领导副本可能在不同的Not a Leader for Partition
的错误响应。 为了解决这个问题,
首先集群中的每个metadata.max.age.ms
来进行指定。有了元数据信息后,客户端就知道了领导副本所在的
如果在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已经过时了,此时还有可能会收到 Not a Leader for Partition
的错误响应,这种情况下客户端会再次求发出元数据请求,然后刷新本地缓存,之后再去正确的

3.2 数据可见性
需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本

3.3 零拷贝
传统模式下的四次拷贝与四次上下文切换
以将磁盘文件通过网络发送为例。传统模式下,一般使用如下伪代码所示的方法先将文件数据读入内存,然后通过
buffer = File.read
Socket.send(buffer)
这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态

sendfile 和transferTo 实现零拷贝
sendfile
系统调用,提供了零拷贝。数据通过sendfile
调用完成,整个过程只有两次上下文切换,因此大大提高了性能。零拷贝过程如下图所示:

从具体实现来看,PlaintextTransportLayer
的 transferFrom
方法通过调用transferTo
方法实现零拷贝,如下所示:
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
注: transferTo
和 transferFrom
并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供 sendfile
这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。
四、物理存储
4.1 分区分配
在创建主题时,
- 在所有
broker 上均匀地分配分区副本; - 确保分区的每个副本分布在不同的
broker 上; - 如果使用了
broker.rack
参数为broker 指定了机架信息,那么会尽可能的把每个分区的副本分配到不同机架的broker 上,以避免一个机架不可用而导致整个分区不可用。
基于以上原因,如果你在一个单节点上创建一个
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor
Exception: Replication factor: 3 larger than available brokers: 1.
4.2 分区数据保留规则
保留数据是
log.retention.bytes
:删除数据前允许的最大数据量;默认值-1 ,代表没有限制;log.retention.ms
:保存数据文件的毫秒数,如果未设置,则使用log.retention.minutes
中的值,默认为null ;log.retention.minutes
:保留数据文件的分钟数,如果未设置,则使用log.retention.hours
中的值,默认为null ;log.retention.hours
:保留数据文件的小时数,默认值为168 ,也就是一周。
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以
4.3 文件格式
通常保存在磁盘上的数据格式与生产者发送过来消息格式是一样的。 如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送
