生产者
Kafka Producer
在

-
负载均衡:由于消息
topic 由多个partition 组成,且partition 会均衡分布到不同broker 上,因此,为了有效利用broker 集群的性能,提高消息的吞吐量,producer 可以通过随机或者hash 等方式,将消息平均发送到多个partition 上,以实现负载均衡。 -
批量发送:是提高消息吞吐量重要的方式,
Producer 端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给broker ,从而大大减少broker 存储消息的IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。
创建Kafka 生产者
要向
-
bootstrap.servers:该属性指定
broker 的地址清单,地址的格式为host:port 。清单里不需要包含所有的broker 地址,生产者会从给定的broker 里查找到其他的broker 信息。不过建议至少要提供两个broker 信息,一旦其中一个宕机,生产者仍然能够连接到集群上。 -
key.serializer:
broker 需要接收到序列化之后的key/value 值,所以生产者发送的消息需要经过序列化之后才传递给Kafka Broker 。生产者需要知道采用何种方式把Java 对象转换为字节数组。key.serializer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化为字节数组。这里拓展一下Serializer 类,Serializer 是一个接口,它表示类将会采用何种方式序列化,它的作用是把对象转换为字节,实现了Serializer 接口的类主要有ByteArraySerializer 、StringSerializer、IntegerSerializer ,其中ByteArraySerialize 是Kafka 默认使用的序列化器,其他的序列化器还有很多,你可以通过 这里 查看其他序列化器。要注意的一点:key.serializer 是必须要设置的,即使你打算只发送值的内容。 -
value.serializer:与
key.serializer 一样,value.serializer 指定的类会将值序列化。
下面代码演示了如何创建一个
private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);
Kafka 消息发送
实例化生产者对象后,接下来就可以开始发送消息了,我们从创建一个

如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用
- 如果将主题配置为使用
CreateTime ,则生产者记录中的时间戳将由broker 使用。 - 如果将主题配置为使用
LogAppendTime ,则生产者记录中的时间戳在将消息添加到其日志中时,将由broker 重写。
然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到
简单消息发送
ProducerRecord<String,String> record = new ProducerRecord<String, String>("CustomerCountry","West","France");
producer.send(record);
代码中生产者
public ProducerRecord(String topic, K key, V value) {}
这个构造函数,需要传递的是

发送成功后,
同步发送消息
第二种消息发送机制如下所示:
ProducerRecord<String,String> record = new ProducerRecord<String, String>("CustomerCountry","West","France");
try {
RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
e.printStackTrace();
}
这种发送消息的方式较上面的发送方式有了改进,首先调用
生产者(KafkaProducer)在发送的过程中会出现两类错误:其中一类是重试错误,这类错误可以通过重发消息来解决。比如连接的错误,可以通过再次建立连接来解决;无主错误则可以通过重新为分区选举首领来解决。
异步发送消息
同步发送消息都有个问题,那就是同一时间只能有一个消息在发送,这会造成许多消息无法直接发送,造成消息滞后,无法发挥效益最大化。比如消息在应用程序和
为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回掉支持。下面是回调的一个例子:
ProducerRecord < String, String > producerRecord = new ProducerRecord < String, String > ("CustomerCountry", "Huston", "America");
producer.send(producerRecord, new DemoProducerCallBack());
class DemoProducerCallBack implements Callback {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();;
}
}
}
首先实现回调需要定义一个实现了
生产者分区机制
这其实就设计到
分区策略
public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}
partition(): 这个类有几个参数: topic ,表示需要传递的主题;key 表示消息中的键值;keyBytes 表示分区中序列化过后的key ,byte 数组的形式传递;value 表示消息的value 值;valueBytes 表示分区中序列化后的值数组;cluster 表示当前集群的原数据。Kafka 给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。close() : 继承了Closeable 接口能够实现close() 方法,在分区关闭时调用。onNewBatch(): 表示通知分区程序用来创建新的批次
其中与分区策略息息相关的就是
顺序轮询
顺序分配,消息是均匀的分配给每个

上图表示的就是轮询策略,轮训策略是
随机轮询
随机轮询简而言之就是随机的向

实现随机分配的代码只需要两行,如下:
List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
按照key 进行消息保存
这个策略也叫做

实现这个策略的
List < PartitionInfo > partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
上面这几种分区策略都是比较基础的策略,除此之外,你还可以自定义分区策略。
生产者压缩机制
压缩一词简单来讲就是一种互换思想,它是一种经典的用
在
private Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.1.9:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer < String, String > producer = new KafkaProducer < String, String > (properties);
ProducerRecord < String, String > record = new ProducerRecord < String, String > ("CustomerCountry", "Precision Products", "France");
上面代码表明该