架构概念

Kafka架构概念

Kafka作为一个高度可扩展可容错的消息系统,一个典型的kafka集群中包含若干producer,若干broker,若干consumer,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalanceproducer使用push模式将消息发布到brokerconsumer使用pull模式从broker订阅并消费消息:

Kafka 架构概览图

Kafka专用术语:

  • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Segment:partition物理上由多个segment组成。
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。
  • Producer:负责发布消息到Kafka broker
  • Consumer:消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group:每个Consumer属于一个特定的Consumer Group

Kafka实现了零拷贝原理来快速移动数据,避免了内核之间的切换。Kafka可以将数据记录分批发送,从生产者到文件系统(Kafka主题日志)到消费者,可以端到端的查看这些批次的数据。批处理能够进行更有效的数据压缩并减少I/O延迟,Kafka采取顺序写入磁盘的方式,避免了随机磁盘寻址的浪费,总结一下其实就是四个要点:

  • 顺序读写:因为硬盘是机械结构,每次读写都会寻址,写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最“讨厌”随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O
  • 零拷贝:在Linux Kernal 2.2之后出现了一种叫做“零拷贝(zero-copy)”系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存空间的直接映射,数据不再复制到“用户态缓冲区”系统上下文切换减少2次,可以提升一倍性能。
  • 消息压缩:消息都是经过压缩传递、存储的,降低网络与磁盘的负担。
  • 分批发送:批量处理是一种非常有效的提升系统吞吐量的方法,在Kafka内部,消息都是以“批”为单位处理的。

消息与主题

消息是Kafka通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。在老版本中,每一条消息称为Message;在由Java重新实现的客户端中,每一条消息称为Record。为了提高效率,消息会分批次写入Kafka,批次就代指的是一组消息。Kafka将一组消息抽象归纳为一个主题(Topic),也就是说,一个主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。

Kafka消息传递的事务特点如下:

  • at most once:最多一次,这个和JMS"非持久化"消息类似,发送一次,无论成败,将不会重发。消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理。那么此后"未处理"的消息将不能被fetch到,这就是"at most once"
  • at least once:消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。消费者fetch消息,然后处理消息,然后保存offset。如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeperzookeeper恢复正常还是之前offset状态。
  • exactly once:消息只会发送一次。kafka中并没有严格的去实现(基于2阶段提交,我们认为这种策略在kafka中是没有必要的。

生产者(Producer)

生产者(Producer)负责将消息发送给代理,也就是向Kafka代理发送消息的客户端。生产者会将某topic的消息发布到相应的partition中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。

生产者设计

Kafka中的生产者设计主要考虑了以下方面:

  • 负载均衡:由于消息topic由多个partition组成,且partition会均衡分布到不同broker上,因此,为了有效利用broker集群的性能,提高消息的吞吐量,producer可以通过随机或者hash等方式,将消息平均发送到多个partition上,以实现负载均衡。
  • 批量发送:是提高消息吞吐量重要的方式,Producer端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给broker,从而大大减少broker存储消息的IO操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。

消费者和消费组

consumerkafka当中的消费者,主要用于消费kafka当中的数据;在Kafka中每一个消费者都属于一个特定消费组(ConsumerGroup),我们可以为每个消费者指定一个消费组,以groupId代表消费组名称,通过group.id配置设置。如果不指定消费组,则该消费者属于默认消费组test-consumer-group

同时,每个消费者也有一个全局唯一的id,通过配置项client.id指定,如果客户端没有指定消费者的idKafka会自动为该消费者生成一个全局唯一的id,格式为 ${groupId}-${hostName}-${timestamp}-${UUID 前 8 位字符}。同一个主题的一条消息只能被同一个消费组下某一个消费者消费,但不同消费组的消费者可同时消费该消息。消费组是Kafka用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。

Consumer Group读取消息

总结而言,Kafka中的消费者与消费组具备以下属性:

  • 任何Consumer必须属于一个Consumer Group
  • 同一Consumer Group中的多个Consumer实例,不同时消费同一个partition,等效于队列模式。如图,Consumer Group 1的三个Consumer实例分别消费不同的partition的消息,即,TopicA-part0、TopicA-part1、TopicA-part2。
  • 不同Consumer GroupConsumer实例可以同时消费同一个partition,等效于发布订阅模式。如图,Consumer Group 1Consumer1Consumer Group 2Consumer4,同时消费TopicA-part0的消息。
  • Partition内消息是有序的,Consumer通过pull方式消费消息。
  • Kafka不删除已消费的消息

分区

Kafka将一组消息归纳为一个主题,而每个主题又被分成一个或多个分区(Partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现kafka的伸缩性;单一主题中的分区有序,但是无法保证主题中所有的分区有序。

分区写入示意

每个分区在物理上对应为一个文件夹,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始,编号最大值为分区的总数减1。每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的分区数可以在Kafka启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。

消费者群组读取消息

分区使得Kafka在并发处理上变得更加容易,分区数量决定了每个Consumer Group中并发消费者的最大数量。理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。某一个主题下的分区数,对于消费该主题的同一个消费组下的消费者数量,应该小于等于该主题下的分区数。某一个主题有4个分区,那么消费组中的消费者应该小于等于4,而且最好与分区数成整数倍1 2 4这样。同一个分区下的数据,在同一时刻,不能同一个消费组的不同消费者消费。

不同分区、消费者数目

同时,分区也是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础。Kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是Kafka高吞吐率的一个重要保证。同时与传统消息系统不同的是,Kafka并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储,因此Kafka提供两种删除老数据的策略,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置。

偏移量

任何发布到分区的消息会被直接追加到日志文件(分区目录下以“.log”为文件名后缀的数据文件)的尾部,而每条消息在日志文件中的位置都会对应一个按序递增的偏移量。偏移量是一个分区下严格有序的逻辑值,它并不表示消息在磁盘上的物理位置。由于Kafka几乎不允许对消息进行随机读写,因此Kafka并没有提供额外索引机制到存储偏移量,也就是说并不会给偏移量再提供索引。

消费者可以通过控制消息偏移量来对消息进行消费,如消费者可以指定消费的起始偏移量。为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到ZooKeeper当中,而新版消费者是将消费偏移量保存到Kafka内部一个主题当中。当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到Kafka中。

日志段(Segment)

一个日志又被划分为多个日志段(LogSegment),日志段是Kafka日志对象分片的最小单位。与日志对象一样,日志段也是一个逻辑概念,一个日志段对应磁盘上一个具体日志文件和两个索引文件。日志文件是以“.log”为文件名后缀的数据文件,用于保存消息实际数据。两个索引文件分别以“.index”和“.timeindex”作为文件名后缀,分别表示消息偏移量索引文件和消息时间戳索引文件。查找某个offset的消息,先二分法找出消息所在的segment文件(因为每个segment的命名都是以该文件中消息offset最小的值命名;然后,加载对应的.index索引文件到内存,同样二分法找出小于等于给定offset的最大的那个offset记录(相对offset,position;最后,根据position.log文件中,顺序查找出offset等于给定offset值的消息。

由于消息在partitionsegment数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的segment文件,这种顺序磁盘IO存储设计是Kafka高性能很重要的原因。

代理(Broker)

Kafka集群包含一个或多个服务器,我们将每一个Kafka实例称为代理(Broker),通常也称代理为Kafka服务器(KafkaServer)broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。

在生产环境中Kafka集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。每一个代理都有唯一的标识id,这个id是一个非负整数。在一个Kafka集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的idid值可以选择任意非负整数即可,只要保证它在整个Kafka集群中唯一,这个id就是代理的名字,也就是在启动代理时配置的broker.id对应的值,因此在本文中有时我们也称为brokerId。由于给每个代理分配了不同的brokerId,这样对代理进行迁移就变得更方便,从而对消费者来说是透明的,不会影响消费者对消息的消费。

每个集群中都会有一个broker同时充当了 集群控制器(Leader)的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当LeaderLeader负责管理工作,包括将分区分配给broker和监控broker。集群中,一个分区从属于一个Leader,但是一个分区可以分配给多个broker(非Leader,这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个broker失效,那么其他活跃用户会重新选举一个Leader接管。

分区复制示意图

ISR

KafkaZooKeeper中动态维护了一个ISR(In-sync Replica),即保存同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的代理节点id。如果一个Follower副本宕机(本书用宕机来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等)或是落后太多,则该Follower副本节点将从ISR列表中移除。对于Kafka节点,判断是”alive”有以下两个条件:

  • 节点必须和Zookeeper保持心跳连接
  • 如果节点是follower,必须从leader节点上复制数据来备份,而且备份的数据相比leader而言,不能落后太多。

副本

Kafka 副本示意图

由于Kafka副本的存在,就需要保证一个分区的多个副本之间数据的一致性,Kafka会选择该分区的一个副本作为Leader副本,而该分区其他副本即为Follower副本,只有Leader副本才负责处理客户端读/写请求,Follower副本从Leader副本同步数据。引入Leader副本后客户端只需与Leader副本进行交互,这样数据一致性及顺序性就有了保证。Follower副本从Leader副本同步消息,对于n个副本只需n1条通路即可,这样就使得系统更加简单而高效。副本FollowerLeader的角色并不是固定不变的,如果Leader失效,通过相应的选举算法将从其他Follower副本中选出新的Leader副本。

Kafka解决了fail/recover,一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据

ZooKeeper

Kafka利用ZooKeeper保存相应元数据信息,Kafka元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。Kafka在启动或运行过程当中会在ZooKeeper上创建相应节点来保存元数据信息,Kafka通过监听机制在这些节点注册相应监听器来监听节点元数据的变化,从而由ZooKeeper负责管理维护Kafka集群,同时通过ZooKeeper我们能够很方便地对Kafka集群进行水平扩展及数据迁移。

不过2021330日,Kafka背后的企业Confluent发布博客表示,在即将发布的2.8版本里,用户可在完全不需要ZooKeeper的情况下运行Kafka,该版本将依赖于ZooKeeper的控制器改造成了基于Kafka RaftQuorm控制器。在之前的版本中,如果没有ZooKeeperKafka将无法运行。但管理部署两个不同的系统不仅让运维复杂度翻倍,还让Kafka变得沉重,进而限制了Kafka在轻量环境下的应用,同时ZooKeeper的分区特性也限制了Kafka的承载能力。