副本

副本(replication)策略

Kafka0.8版本前没有提供PartitionReplication机制,一旦Broker宕机,其上的所有Partition就都无法提供服务,而Partition又没有备份数据,数据的可用性就大大降低了。所以0.8后提供了Replication机制来保证Brokerfailover

副本选择新的 Leader

Kafka的高可靠性的保障来源于其健壮的副本(replication)策略,即多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫Reblance)Kafkareplica副本单元是topicpartition,一个partitionreplica数量不能超过broker的数量,因为一个broker最多只会存储这个partition的一个副本。所有消息生产、消费请求都是由partitionleader replica来处理,其他follower replica负责从leader复制数据进行备份,保持和Leader的数据同步。如果没有Leader副本,那就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,假设有n个副本则需要有n×n条通路来同步数据,这样数据的一致性和有序性就很难保证。

ISR

Kafka中并不是所有的Follower都能被拿来替代Leader,所以在KafkaLeader节点中维护着一个ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:

  • 一是它必须维护与ZooKeepersession(这个通过ZooKeeperHeartbeat机制来实现
  • 二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。

Leader会跟踪与其保持同步的Replica列表,如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值或者Follower超过一定时间未向Leader发送fetch请求。

数据可靠性

Producer在发布消息到某个Partition时,先通过ZooKeeper找到该PartitionLeader,然后无论该TopicReplication Factor为多少,Producer只将该消息发送到该PartitionLeaderLeader会将该消息写入其本地Log。每个Follower都从Leader拉取数据:

  • 消息所在partitionISR replicas会定时异步从leader上批量复制数据log
  • 当所有ISR replica都返回ack,告诉leader该消息已经写log成功后,leader认为该消息committed,并告诉Producer生产成功。这里和以上”alive”条件的第二点是不矛盾的,因为leader有超时机制,leaderISRfollower复制数据,如果一定时间不返回ack(可能数据复制进度落后太多,则leader将该follower replicaISR中剔除。
  • 一旦Leader收到了ISR中的所有ReplicaACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACKHW(高水位)Consumer能够看到的此partition的位置,LEO是每个partitionlog最后一条Message的位置。HW能保证leader所在的broker失效,该消息仍然可以从新选举的leader中获取,不会造成消息丢失。

Kafka Replication的数据流如下图所示:

同步策略

ISR机制下的数据复制,既不是完全的同步复制,也不是单纯的异步复制,这是Kafka高吞吐很重要的机制。同步复制要求所有能工作的follower都复制完,这条消息才会被认为committed,这种复制方式极大的影响了吞吐量。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经committed,这种情况下如果follower都复制完都落后于leader,而如果leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐量,follower可以批量的从leader复制数据,数据复制到内存即返回ack,这样极大的提高复制性能,当然数据仍然是有丢失风险的。

对于Producer而言,它可以选择是否等待消息commit。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。当ProducerLeader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

  • 1(默认:这意味着producerISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
  • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader(其他节点都和zk断开连接,或者都没追上),这样就变成了acks=1的情况。

副本放置策略

为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。Kafka分配Replica的算法如下:

  • 将所有存活的NBrokers和待分配的Partition排序
  • 将第iPartition分配到第(i mod n)Broker上,这个Partition的第一个Replica存在于这个分配的Broker上,并且会作为partition的优先副本
  • 将第iPartition的第jReplica分配到第((i + j) mod n)Broker

假设集群一共有4brokers,一个topic4partition,每个Partition3个副本。下图是每个Broker上的副本分配情况。

Broker 副本分布情况

服务可用性

Leader选举

Kafka所有收发消息请求都由Leader节点处理,由以上数据可靠性设计可知,当ISRfollower replica故障后,leader会及时地从ISR列表中把它剔除掉,并不影响服务可用性。而如果Leader发生了故障,则Kafka会重新选举Leader

  • KafkaZookeeper存储partitionISR信息,并且能动态调整ISR列表的成员,只有ISR里的成员replica才会被选为leader,并且ISR所有的replica都有可能成为leader
  • Leader节点宕机后,Zookeeper能监控发现,并由brokercontroller节点从ISR中选举出新的leader,并通知ISR内的所有broker节点。

Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:

  • 节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
  • 临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁

容灾和数据一致性

分布式系统的容灾能力,跟其本身针对数据一致性考虑所选择的算法有关,例如,ZookeeperZab算法,raft算法等。KafkaISR机制和这些Majority Vote算法对比如下:

  • ISR机制能容忍更多的节点失败。假如replica节点有2f+1个,每个partition最多能容忍2f个失败,且不丢失消息数据;但相对Majority Vote选举算法,只能最多容忍f个失败。
  • 在消息committed持久化上,ISR需要等2f个节点返回ack,但Majority Vote只需等f+1个节点返回ack,且不依赖处理最慢的follower节点,因此Majority Vote有优势
  • ISR机制能节省更多replica节点数。例如,要保证f个节点可用,ISR方式至少要f个节点,而Majority Vote至少需要2f+1个节点。

如果所有replica都宕机了,有两种方式恢复服务:

  • ISR任一节点恢复,并选举为leader
  • 选择第一个恢复的节点(不一定是ISR中的节点)为leader

第一种方式消息不会丢失(只能说这种方式最有可能不丢而已,第二种方式可能会丢消息,但能尽快恢复服务可用。这是可用性和一致性场景的两种考虑,Kafka默认选择第二种,用户也可以自主配置。大部分考虑CP的分布式系统(假设2f+1个节点,为了保证数据一致性,最多只能容忍f个节点的失败,而Kafka为了兼顾可用性,允许最多2f个节点失败,因此是无法保证数据强一致的。

ISR 容灾

如图所示,一开始ISR数量等于3,正常同步数据,红色部分开始,leader发现其他两个follower复制进度太慢或者其他原因(网络分区、节点故障等,将其从ISR剔除后,leader单节点存储数据;然后,leader宕机,触发重新选举第二节点为leader,重新开始同步数据,但红色部分的数据在新leader上是没有的;最后原leader节点恢复服务后,重新从新leader上复制数据,而红色部分的数据已经消费不到了。

因此,为了减少数据丢失的概率,可以设置KafkaISR最小replica数,低于该值后直接返回不可用,当然是以牺牲一定可用性和吞吐量为前提了。

上一页
下一页