8.3使用Kafka发送消息

8.3使用Kafka发送消息

Apache Kafka是我们在本章中研究的最新消息传递选项。乍一看,Kafka是一个消息代理,就像ActiveMQArtemisRabbit一样。但是Kafka有一些独特的技巧。

Kafka被设计为在集群中运行,提供了巨大的可伸缩性。通过将其topic划分到集群中的所有实例中,它具有很强的弹性。RabbitMQ主要处理exchange中的队列,而Kafka仅利用topic来提供消息的发布/订阅。

Kafka topic被复制到集群中的所有broker中。集群中的每个节点充当一个或多个topicleader,负责该topic的数据并将其复制到集群中的其他节点。

更进一步说,每个topic可以分成多个分区。在这种情况下,集群中的每个节点都是一个topic的一个或多个分区的leader,但不是整个topicleader。该topic的职责由所有节点分担。图8.2说明了这是如何工作的。8.2 Kafka集群由多个broker组成,每一个都作为topic分区的leader

![8.2](E:\Document\spring-in-action-v5-translate\第二部分 集成Spring\8章 发送异步消息\8.2.jpg)

由于Kafka独特的构建风格,我鼓励你在迪伦·斯科特(Dylan Scott,2017)的*Kafka实战》*中阅读更多关于它的内容。出于我们的目的,我们将重点讨论如何使用SpringKafka发送和接收消息。

8.3.1设置SpringKafka

要开始使用Kafka进行消息传递,需要将适当的依赖项添加到构建中。但是,与JMSRabbitMQ不同,Kafka没有Spring Boot starter。不过还是只需要一个依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

这个依赖项将Kafka所需的一切都带到项目中。更重要的是,它的存在将触发KafkaSpring Boot自动配置,它将在Spring应用程序上下文中生成一个KafkaTemplate。你所需要做的就是注入KafkaTemplate并开始发送和接收消息。

然而,在开始发送和接收消息之前,应该了解一些在使用Kafka时会派上用场的属性。具体来说就是,KafkaTemplate默认在localhost上运行Kafka broker,并监听9092端口。在开发应用程序时,在本地启动Kafka broker是可以的,但是在进入生产环境时,需要配置不同的主机和端口。

spring.kafka.bootstrap-servers属性设置一个或多个Kafka服务器的位置,用于建立到Kafka集群的初始连接。例如,如果集群中的Kafka服务器之一运行在Kafka .tacocloud.com上,并监听9092端口,那么可以在YAML中像这样配置它的位置:

spring:
  kafka:
    bootstrap-servers:
      - kafka.tacocloud.com:9092

但是注意spring.kafka.bootstrap-servers属性是复数形式,它接受一个列表。因此,可以在集群中为它提供多个Kafka服务器:

spring:
  kafka:
    bootstrap-servers:
      - kafka.tacocloud.com:9092
      - kafka.tacocloud.com:9093
      - kafka.tacocloud.com:9094

在项目中设置了Kafka之后,就可以发送和接收消息了。首先来看看KafkaTemplateOrder对象发送给Kafka

8.3.2使用KafkaTemplate发送消息

在许多方面,KafkaTemplateJMSRabbitMQ类似。与此同时,它也是不同的,尤其是在我们考虑它发送消息的方法时:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

注意到的第一件事是没有convertAndSend()方法。这是因为KafkaTemplate是用的泛型,同时能够在发送消息时直接处理域类型。在某种程度上,所有的send()方法都在做convertAndSend()的工作。

再者send()sendDefault()的参数,它们与JMSRabbit中使用的参数完全不同。当使用Kafka发送消息时,可以指定以下参数来指导如何发送消息:

  • 发送消息的topicsend()方法必要的参数)
  • 写入topic的分区(可选)
  • 发送记录的键(可选)
  • 时间戳(可选;默认为System.currentTimeMillis()
  • payload(必须)

topicpayload是两个最重要的参数。分区和键对如何使用KafkaTemplate几乎没有影响,除了作为send()sendDefault()的参数用于提供额外信息。出于我们的目的,我们将把重点放在将消息有效负载发送到给定主题上,而不考虑分区和键。

对于send()方法,还可以选择发送一个ProducerRecord,它与在单个对象中捕获所有上述参数的类型差不多。也可以发送Message对象,但是这样做需要将域对象转换为Message。通常,使用其他方法比创建和发送ProducerRecordMessage对象更容易。

使用KafkaTemplate及其send()方法,可以编写一个基于kafkaOrderMessagingService实现。下面的程序清单显示了这样一个实现。程序清单8.8使用KafkaTemplate发送订单

package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaOrderMessagingService implements OrderMessagingService {
    private KafkaTemplate<String, Order> kafkaTemplate;

    @Autowired
    public KafkaOrderMessagingService(KafkaTemplate<String, Order> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Override
    public void sendOrder(Order order) {
        kafkaTemplate.send("tacocloud.orders.topic", order);
    }
}

OrderMessagingService的这个实现中,sendOrder()方法使用注入的KafkaTemplatesend()方法向名为tacocloud.orders.topic的主题发送Order。代码中除了使用 “Kafka” 这个名称外,这与为JMSRabbit编写的代码没有太大的不同。

如果设置了默认主题,可以稍微简化sendOrder()方法。首先,通过设置spring.kafka.template.default-topic属性,将默认主题设置为tacocloud.orders.topic

spring:
  kafka:
    template:
      default-topic: tacocloud.orders.topic

然后,在sendOrder()方法中,可以调用sendDefault()而不是send(),并且不指定主题名称:

@Override
public void sendOrder(Order order) {
    kafkaTemplate.sendDefault(order);
}

现在已经编写了消息发送代码了,让我们将注意力转向编写从Kafka接收这些消息的代码。

8.3.3编写Kafka监听器

除了send()sendDefault()的惟一方法签名之外,KafkaTemplateJmsTemplateRabbitTemplate的不同之处在于它不提供任何接收消息的方法。这意味着使用Spring消费来自Kafka主题的消息的唯一方法是编写消息监听器。

对于Kafka,消息监听器被定义为被@KafkaListener注解的方法。@KafkaListener注解大致类似于@JmsListener@RabbitListener,其使用方式大致相同。下面程序清单显示了为Kafka编写的基于listener的订单接收程序。程序清单8.9使用@KafkaListener接收订单

package tacos.kitchen.messaging.kafka.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;

@Component
public class OrderListener {
    private KitchenUI ui;

    @Autowired
    public OrderListener(KitchenUI ui) {
        this.ui = ui;
    }

    @KafkaListener(topics="tacocloud.orders.topic")
    public void handle(Order order) {
        ui.displayOrder(order);
    }
}

handle()方法由@KafkaListener注解,表示当消息到达名为tacocloud.orders.topic的主题时应该调用它。正如程序清单8.9中所写的,只为handle()方法提供了一个Order(payload)参数 。但是,如果需要来自消息的其他元数据,它也可以接受一个ConsumerRecordMessage对象。

例如,handle()的以下实现接受一个ConsumerRecord,这样就可以记录消息的分区和时间戳:

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, ConsumerRecord<Order> record) {
    log.info("Received from partition {} with timestamp {}",
             record.partition(), record.timestamp());
    ui.displayOrder(order);
}

类似地,可以使用Message而不是ConsumerRecord,并达到同样的效果:

@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
    MessageHeaders headers = message.getHeaders();
    log.info("Received from partition {} with timestamp {}",
             headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)
             headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
    ui.displayOrder(order);
}

值得注意的是,消息有效负载也可以通过ConsumerRecord.value()Message.getPayload()获得。这意味着可以通过这些对象请求Order,而不是直接将其作为handle()的参数。

上一页
下一页