事务消息

高可用及幂等

在分布式系统中一般有三种处理语义:

  • at-least-once:至少一次,有可能会有多次。如果producer收到来自ack的确认,则表示该消息已经写入到Kafka了,此时刚好是一次,也就是我们后面的exactly-once。但是如果producer超时或收到错误,并且request.required.acks配置的不是-1,则会重试发送消息,客户端会认为该消息未写入Kafka。如果broker在发送Ack之前失败,但在消息成功写入Kafka之后,这一次重试将会导致我们的消息会被写入两次,所以消息就不止一次地传递给最终consumer,如果consumer处理逻辑没有保证幂等的话就会得到不正确的结果。在这种语义中会出现乱序,也就是当第一次ack失败准备重试的时候,但是第二消息已经发送过去了,这个时候会出现单分区中乱序的现象,我们需要设置Prouducer的参数max.in.flight.requests.per.connectionflight.requestsProducer端用来保存发送请求且没有响应的队列,保证Producer端未响应的请求个数为1

  • at-most-once:如果在ack超时或返回错误时producer不重试,也就是我们讲request.required.acks=-1,则该消息可能最终没有写入kafka,所以consumer不会接收消息。

  • exactly-once:刚好一次,即使producer重试发送消息,消息也会保证最多一次地传递给consumer。该语义是最理想的,也是最难实现的。在0.10之前并不能保证exactly-once,需要使用consumer自带的幂等性保证。0.11.0使用事务保证了。

如何实现exactly-once

要实现exactly-onceKafka 0.11.0中有两个官方策略。

ProducerTopic

每个producer在初始化的时候都会被分配一个唯一的PID,对于每个唯一的PIDProducer向指定的Topic中某个特定的Partition发送的消息都会携带一个从0单调递增的sequence number。在我们的Broker端也会维护一个维度为,每次提交一次消息的时候都会对齐进行校验:

  • 如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
  • 如果消息序号刚好大一,就证明是合法的。

上面所说的解决了两个问题:

  • Prouducer发送了一条消息之后失败,broker并没有保存,但是第二条消息却发送成功,造成了数据的乱序。
  • Producer发送了一条消息之后,broker保存成功,ack回传失败,producer再次投递重复的消息。

上面所说的都是在同一个PID下面,意味着必须保证在单个Producer中的同一个seesion内,如果Producer挂了,被分配了新的PID,这样就无法保证了,所以Kafka中又有事务机制去保证。

事务

kafka中事务的作用是:

  • 实现exactly-once语义
  • 保证操作的原子性,要么全部成功,要么全部失败。
  • 有状态的操作的恢复

事务可以保证就算跨多个,在本次事务中的对消费队列的操作都当成原子性,要么全部成功,要么全部失败。并且,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。在kafka的事务中,应用程序必须提供一个唯一的事务ID,即Transaction ID,并且宕机重启之后,也不会发生改变,Transactin IDPID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。为了Producer重启之后,旧的Producer具有相同的Transaction ID失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producerepoch比新Producerepoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。

为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator,用于管理Producer发送的消息的事务性。该Transaction Coordinator维护Transaction Log,该log存于一个内部的Topic内。由于Topic数据具有持久性,因此事务的状态也具有持久性。Producer并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction LogTransaction Log的设计与Offset Log用于保存ConsumerOffset类似。

上一页
下一页