生产者

Kafka Producer

Kafka中,我们把产生消息的那一方称为生产者,比如登录电商网站的时候,你的登陆信息,登陆次数都会作为消息传输到Kafka后台,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给Kafka后台,然后系统会根据你的爱好做智能推荐。

生产者设计

Kafka中的生产者设计主要考虑了以下方面:

  • 负载均衡:由于消息topic由多个partition组成,且partition会均衡分布到不同broker上,因此,为了有效利用broker集群的性能,提高消息的吞吐量,producer可以通过随机或者hash等方式,将消息平均发送到多个partition上,以实现负载均衡。

  • 批量发送:是提高消息吞吐量重要的方式,Producer端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给broker,从而大大减少broker存储消息的IO操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。

创建Kafka生产者

要向Kafka写入消息,首先需要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性:

  • bootstrap.servers:该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他的broker信息。不过建议至少要提供两个broker信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

  • key.serializer:broker需要接收到序列化之后的key/value值,所以生产者发送的消息需要经过序列化之后才传递给Kafka Broker。生产者需要知道采用何种方式把Java对象转换为字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化为字节数组。这里拓展一下Serializer类,Serializer是一个接口,它表示类将会采用何种方式序列化,它的作用是把对象转换为字节,实现了Serializer接口的类主要有ByteArraySerializer、StringSerializer、IntegerSerializer ,其中ByteArraySerializeKafka默认使用的序列化器,其他的序列化器还有很多,你可以通过 这里 查看其他序列化器。要注意的一点:key.serializer是必须要设置的,即使你打算只发送值的内容。

  • value.serializer:与key.serializer一样,value.serializer指定的类会将值序列化。

下面代码演示了如何创建一个Kafka生产者,这里只指定了必要的属性,其他使用默认的配置

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);

Kafka消息发送

实例化生产者对象后,接下来就可以开始发送消息了,我们从创建一个ProducerRecord对象开始,ProducerRecordKafka中的一个核心类,它代表了一组Kafka需要发送的key/value键值对,它由记录要发送到的主题名称(Topic Name,可选的分区号(Partition Number)以及可选的键值对构成。在发送ProducerRecord时,我们需要将键值对对象由序列化器转换为字节数组,这样它们才能够在网络上传输。然后消息到达了分区器。

Producer 发送流程

如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用keyhash函数映射指定一个分区。如果发送的过程中既没有分区号也没有,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题和分区发送数据了。ProducerRecord还有关联的时间戳,如果用户没有提供时间戳,那么生产者将会在记录中使用当前的时间作为时间戳。Kafka最终使用的时间戳取决于topic主题配置的时间戳类型。

  • 如果将主题配置为使用CreateTime,则生产者记录中的时间戳将由broker使用。
  • 如果将主题配置为使用LogAppendTime,则生产者记录中的时间戳在将消息添加到其日志中时,将由broker重写。

然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到Kafka Broker上。Kafka Broker在收到消息时会返回一个响应,如果写入成功,会返回一个RecordMetaData对象,它包含了主题和分区信息,以及记录在分区里的偏移量,上面两种的时间戳类型也会返回给用户。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。

简单消息发送

Kafka最简单的消息发送如下:

ProducerRecord<String,String> record = new ProducerRecord<String, String>("CustomerCountry","West","France");
producer.send(record);

代码中生产者(producer)send()方法需要把ProducerRecord的对象作为参数进行发送,ProducerRecord有很多构造函数,这个我们下面讨论,这里调用的是:

public ProducerRecord(String topic, K key, V value) {}

这个构造函数,需要传递的是topic主题,keyvalue。把对应的参数传递完成后,生产者调用send()方法发送消息(ProducerRecord对象。我们可以从生产者的架构图中看出,消息是先被写入分区中的缓冲区中,然后分批次发送给Kafka Broker

不同的主题与批次

发送成功后,send()方法会返回一个Future(java.util.concurrent)对象,Future对象的类型是RecordMetadata类型,我们上面这段代码没有考虑返回值,所以没有生成对应的Future对象,所以没有办法知道消息是否发送成功。如果不是很重要的信息或者对结果不会产生影响的信息,可以使用这种方式进行发送。我们可以忽略发送消息时可能发生的错误或者在服务器端可能发生的错误,但在消息发送之前,生产者还可能发生其他的异常。这些异常有可能是SerializationException(序列化失败)BufferedExhaustedExceptionTimeoutException(说明缓冲区已满),又或是InterruptedException(说明发送线程被中断)

同步发送消息

第二种消息发送机制如下所示:

ProducerRecord<String,String> record = new ProducerRecord<String, String>("CustomerCountry","West","France");
try {
    RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
    e.printStackTrace()
}

这种发送消息的方式较上面的发送方式有了改进,首先调用send()方法,然后再调用get()方法等待Kafka响应。如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们会得到RecordMetadata对象,可以用它来查看消息记录。

生产者(KafkaProducer)在发送的过程中会出现两类错误:其中一类是重试错误,这类错误可以通过重发消息来解决。比如连接的错误,可以通过再次建立连接来解决;无主错误则可以通过重新为分区选举首领来解决。KafkaProducer被配置为自动重试,如果多次重试后仍无法解决问题,则会抛出重试异常。另一类错误是无法通过重试来解决的,比如消息过大对于这类错误,KafkaProducer不会进行重试,直接抛出异常。

异步发送消息

同步发送消息都有个问题,那就是同一时间只能有一个消息在发送,这会造成许多消息无法直接发送,造成消息滞后,无法发挥效益最大化。比如消息在应用程序和Kafka集群之间一个来回需要10ms。如果发送完每个消息后都等待响应的话,那么发送100个消息需要1秒,但是如果是异步方式的话,发送100条消息所需要的时间就会少很多很多。大多数时候,虽然Kafka会返回RecordMetadata消息,但是我们并不需要等待响应。

为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回掉支持。下面是回调的一个例子:

ProducerRecord < String, String > producerRecord = new ProducerRecord < String, String > ("CustomerCountry", "Huston", "America");
producer.send(producerRecord, new DemoProducerCallBack());

class DemoProducerCallBack implements Callback {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();;
        }
    }
}

首先实现回调需要定义一个实现了org.apache.kafka.clients.producer.Callback的类,这个接口只有一个onCompletion方法。如果kafka返回一个错误,onCompletion方法会抛出一个非空(non null)异常,这里我们只是简单的把它打印出来,如果是生产环境需要更详细的处理,然后在send()方法发送的时候传递一个Callback回调的对象。

生产者分区机制

Kafka对于数据的读写是以分区为粒度的,分区可以分布在多个主机(Broker)中,这样每个节点能够实现独立的数据写入和读取,并且能够通过增加新的节点来增加Kafka集群的吞吐量,通过分区部署在多个Broker来实现负载均衡的效果。上面我们介绍了生产者的发送方式有三种:不管结果如何直接发送、发送并返回结果、发送并回调。由于消息是存在主题(topic)的分区(partition)中的,所以当Producer生产者发送产生一条消息发给topic的时候,你如何判断这条消息会存在哪个分区中呢?

这其实就设计到Kafka的分区机制了。

分区策略

Kafka的分区策略指的就是将生产者发送到哪个分区的算法。Kafka为我们提供了默认的分区策略,同时它也支持你自定义分区策略。如果要自定义分区策略的话,你需要显示配置生产者端的参数Partitioner.class,我们可以看一下这个类它位于org.apache.kafka.clients.producer包下:

public interface Partitioner extends Configurable, Closeable {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    public void close();
    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}

Partitioner类有三个方法,分别来解释一下

  • partition():这个类有几个参数: topic,表示需要传递的主题;key表示消息中的键值;keyBytes表示分区中序列化过后的keybyte数组的形式传递;value表示消息的value值;valueBytes表示分区中序列化后的值数组;cluster表示当前集群的原数据。Kafka给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
  • close() :继承了Closeable接口能够实现close()方法,在分区关闭时调用。
  • onNewBatch():表示通知分区程序用来创建新的批次

其中与分区策略息息相关的就是partition()方法了。

顺序轮询

顺序分配,消息是均匀的分配给每个partition,即每个分区存储一次消息。

分区顺序分配

上图表示的就是轮询策略,轮训策略是Kafka Producer提供的默认策略,如果你不使用指定的轮训策略的话,Kafka默认会使用顺序轮训策略的方式。

随机轮询

随机轮询简而言之就是随机的向partition中保存消息,如下图所示。

随机分配示意图

实现随机分配的代码只需要两行,如下:

List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。

按照key进行消息保存

这个策略也叫做key-ordering策略,Kafka中每条消息都会有自己的key,一旦消息被定义了Key,那么你就可以保证同一个Key的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略,如下图所示。

key-ordering 分配示意图

实现这个策略的partition方法同样简单,只需要下面两行代码即可:

List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

上面这几种分区策略都是比较基础的策略,除此之外,你还可以自定义分区策略。

生产者压缩机制

压缩一词简单来讲就是一种互换思想,它是一种经典的用CPU时间去换磁盘空间或者I/O传输量的思想,希望以较小的CPU开销带来更少的磁盘占用或更少的网络I/O传输。Kafka的消息分为两层:消息集合 和 消息。一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。Kafka底层的消息日志由一系列消息集合日志项组成。Kafka通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。

Kafka中,压缩会发生在两个地方:Kafka ProducerKafka Consumer,为什么启用压缩?说白了就是消息太大,需要变小一点 来使消息发的更快一些。Kafka Producer中使用compression.type来开启压缩

private Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.9:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer < String, String > producer = new KafkaProducer < String, String > (properties);
ProducerRecord < String, String > record = new ProducerRecord < String, String > ("CustomerCountry", "Precision Products", "France");

上面代码表明该Producer的压缩算法使用的是GZIP

下一页