参数配置
Kafka 参数配置分析
生产者参数
-
key.serializer:用于
key 键的序列化,它实现了org.apache.kafka.common.serialization.Serializer
接口 -
value.serializer:用于
value 值的序列化,实现了org.apache.kafka.common.serialization.Serializer
接口 -
acks:
acks 参数指定了要有多少个分区副本接收消息,生产者才认为消息是写入成功的。此参数对消息丢失的影响较大- 如果
acks = 0 ,就表示生产者也不知道自己产生的消息是否被服务器接收了,它才知道它写成功了。如果发送的途中产生了错误,生产者也不知道,它也比较懵逼,因为没有返回任何消息。这就类似于UDP 的运输层协议,只管发,服务器接受不接受它也不关心。 - 如果
acks = 1 ,只要集群的Leader 接收到消息,就会给生产者返回一条消息,告诉它写入成功。如果发送途中造成了网络异常或者Leader 还没选举出来等其他情况导致消息写入失败,生产者会受到错误消息,这时候生产者往往会再次重发数据。因为消息的发送也分为同步
和异步
,Kafka 为了保证消息的高效传输会决定是同步发送还是异步发送。如果让客户端等待服务器的响应(通过调用Future
中的get()
方法) ,显然会增加延迟,如果客户端使用回调,就会解决这个问题。 - 如果
acks = all ,这种情况下是只有当所有参与复制的节点都收到消息时,生产者才会接收到一个来自服务器的消息。不过,它的延迟比acks =1 时更高,因为我们要等待不只一个服务器节点接收消息。
- 如果
-
buffer.memory:此参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,
send() 方法调用要么被阻塞,要么抛出异常,具体取决于block.on.buffer.null
参数的设置。 -
compression.type:此参数来表示生产者启用何种压缩算法,默认情况下,消息发送时不会被压缩。该参数可以设置为
snappy 、gzip 和lz4 ,它指定了消息发送给broker 之前使用哪一种压缩算法进行压缩。下面是各压缩算法的对比


-
retries:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领
) ,在这种情况下,reteis
参数的值决定了生产者可以重发的消息次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者在每次重试之间等待100ms ,这个等待参数可以通过retry.backoff.ms
进行修改。 -
batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,任意条数的消息都可能被发送。
-
client.id:此参数可以是任意的字符串,服务器会用它来识别消息的来源,一般配置在日志里
-
max.in.flight.requests.per.connection:此参数指定了生产者在收到服务器响应之前可以发送多少消息,它的值越高,就会占用越多的内存,不过也会提高吞吐量。把它设为
1 可以保证消息是按照发送的顺序写入服务器。 -
timeout.ms、
request.timeout.ms 和metadata.fetch.timeout.ms :request.timeout.ms 指定了生产者在发送数据时等待服务器返回的响应时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待时间超时,生产者要么重试发送数据,要么返回一个错误。timeout.ms 指定了broker 等待同步副本返回消息确认的时间,与asks 的配置相匹配—- 如果在指定时间内没有收到同步副本的确认,那么broker 就会返回一个错误。 -
max.block.ms:此参数指定了在调用
send() 方法或使用partitionFor() 方法获取元数据时生产者的阻塞时间当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms 时,生产者会抛出超时异常。 -
max.request.size:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。
-
receive.buffer.bytes 和send.buffer.bytes :Kafka 是基于TCP 实现的,为了保证可靠的消息传输,这两个参数分别指定了TCP Socket 接收和发送数据包的缓冲区的大小。如果它们被设置为-1 ,就使用操作系统的默认值。如果生产者或消费者与broker 处于不同的数据中心,那么可以适当增大这些值。
消费者配置
到目前为止,我们学习了如何使用消费者
-
fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。
broker 在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes
指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker 的工作负载,因为它们在主题使用频率不是很高的时候就不用来回处理消息。如果没有很多可用数据,但消费者的CPU 使用率很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值调大可以降低broker 的工作负载。 -
fetch.max.wait.ms:我们通过上面的
fetch.min.bytes 告诉 Kafka ,等到有足够的数据时才会把它返回给消费者。而fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是500 毫秒。如果没有足够的数据流入kafka 的话,消费者获取的最小数据量要求就得不到满足,最终导致500 毫秒的延迟。如果要降低潜在的延迟,就可以把参数值设置的小一些。如果fetch.max.wait.ms 被设置为100 毫秒的延迟,而fetch.min.bytes 的值设置为1MB ,那么Kafka 在收到消费者请求后,要么返回1MB 的数据,要么在100 ms 后返回所有可用的数据。就看哪个条件首先被满足。 -
max.partition.fetch.bytes:该属性指定了服务器从每个分区里返回给消费者的
最大字节数
。它的默认值时1MB ,也就是说,KafkaConsumer.poll()
方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes 指定的字节。如果一个主题有20 个分区和5 个消费者,那么每个消费者需要至少
4 MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比broker 能够接收的最大消息的字节数( 通过max.message.size 属性配置大) ,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另外一个考量的因素是消费者处理数据的时间。消费者需要频繁的调用poll() 方法来避免会话过期和发生分区再平衡,如果单次调用poll() 返回的数据太多,消费者需要更多的时间进行处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把max.partition.fetch.bytes 值改小,或者延长会话过期时间。 -
session.timeout.ms:这个属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是
3s 。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调器,就会被认定为死亡,协调器就会触发重平衡。把它的分区分配给消费者群组中的其它消费者,此属性与 heartbeat.interval.ms
紧密相关。heartbeat.interval.ms 指定了poll() 方法向群组协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,这两个属性一般需要同时修改,heartbeat.interval.ms 必须比session.timeout.ms 小,一般是session.timeout.ms 的三分之一。如果session.timeout.ms 是3s ,那么heartbeat.interval.ms 应该是1s 。把session.timeout.ms 值设置的比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的重平衡。把该属性的值设置得大一些,可以减少意外的重平衡,不过检测节点崩溃需要更长的时间。 -
auto.offset.reset:该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的该如何处理。它的默认值是
latest
,意思指的是,在偏移量无效的情况下,消费者将从最新的记录开始读取数据。另一个值是earliest
,意思指的是在偏移量无效的情况下,消费者将从起始位置处开始读取分区的记录。 -
enable.auto.commit:我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是
true ,为了尽量避免出现重复数据和数据丢失,可以把它设置为false ,由自己控制何时提交偏移量。如果把它设置为true ,还可以通过auto.commit.interval.ms 属性来控制提交的频率 -
partition.assignment.strategy:我们知道,分区会分配给群组中的消费者。
PartitionAssignor
会根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者,Kafka 有两个默认的分配策略Range
和RoundRobin
-
client.id:该属性可以是任意字符串,
broker 用他来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额中 -
max.poll.records:该属性用于控制单次调用
call() 方法能够返回的记录数量,可以帮你控制在轮询中需要处理的数据量。 -
receive.buffer.bytes 和send.buffer.bytes :socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设置为-1 ,就使用操作系统默认值。如果生产者或消费者与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。