8.2 使用RabbitMQ 和AMQP
8.2 使用RabbitMQ 和AMQP
图
当消息到达
有几种不同的交换方式,包括以下几种:
- Default —— 一种特殊的交换器,通过
broker 自动创建。它将消息路由到与消息的路由键的值同名的队列中。所有的队列将会自动地与交换器绑定。 - Direct —— 路由消息到消息路由键的值与绑定值相同的队列。
- Topic —— 将消息路由到一个或多个队列,其中绑定键(可能包含通配符)与消息的路由键匹配。
- Fanout —— 将消息路由到所有绑定队列,而不考虑绑定键或路由键。
- Headers —— 与
topic 交换器类似,只是路由基于消息头值而不是路由键。 - Dead letter —— 对无法交付的消息(意味着它们不匹配任何已定义的交换器与队列的绑定)的全部捕获。
最简单的交换形式是
需要理解的最重要的一点是,消息是用路由键发送到交换器的,它们是从队列中使用的。它们如何从一个交换到一个队列取决于绑定定义以及什么最适合相应的情况。
使用哪种交换类型以及如何定义从交换到队列的绑定与
注意
有关如何最好地将队列绑定到交换器的更详细讨论,请参见
Alvaro Videla 和Jason J.W. Williams (Manning, 2012)的* 《RabbitMQ 实战》*。
8.2.1 添加RabbitMQ 到Spring 中
在开始使用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
将
表
属性 | 描述 |
---|---|
spring.rabbitmq.addresses | 一个逗号分隔的 |
spring.rabbitmq.host | |
spring.rabbitmq.port | |
spring.rabbitmq.username | 访问 |
spring.rabbitmq.password | 访问 |
出于开发目的,可能有一个
例如,假设在进入生产环境时,
spring:
profiles: prod
rabbitmq:
host: rabbit.tacocloud.com
port: 5673
username: tacoweb
password: l3tm31n
现在
8.2.2 使用RabbitTemplate 发送消息
关于使用
// 发送原始消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
// 发送从对象转换过来的消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
// 发送经过处理后从对象转换过来的消息
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
这些方法与
这些方法与对应的
让我们用
package tacos.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private RabbitTemplate rabbit;
@Autowired
public RabbitOrderMessagingService(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
}
有了
说到默认交换,默认交换名称是 “”(一个空
spring:
rabbitmq:
template:
exchange: tacocloud.orders
routing-key: kitchens.central
在这种情况下,所有发送的消息都将自动发送到名为
从消息转换器创建消息对象非常简单,但是使用
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order", order);
}
配置消息转换器
默认情况下,使用
- Jackson2JsonMessageConverter —— 使用
Jackson 2 JSON 处理器将对象与JSON 进行转换 - MarshallingMessageConverter —— 使用
Spring 的序列化和反序列化抽象转换String 和任何类型的本地对象 - SimpleMessageConverter —— 转换
String 、字节数组和序列化类型 - ContentTypeDelegatingMessageConverter —— 基于
contentType 头信息将对象委托给另一个MessageConverter - MessagingMessageConverter —— 将消息转换委托给底层
MessageConverter ,将消息头委托给AmqpHeaderConverter
如果需要修改消息转换器,需要做的是配置
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
设置消息属性
与
重新访问程序清单
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
Message message = converter.toMessage(order, props);
rabbit.send("tacocloud.order", message);
}
但是,在使用
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
这里,在
现在已经了解了如何使用
8.2.3 从RabbitMQ 接收消息
使用
与
- 使用
RabbitTemplate 从队列中拉取消息 - 获取被推送到
@RabbitListener 注解的方法中的消息
让我们从基于拉模型的
使用
// 接收消息
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// 接收从消息转换过来的对象
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
// 接收从消息转换过来的类型安全的对象
<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
这些方法是前面描述的
但是在方法签名方面有一些明显的区别。首先,这些方法都不以交换键或路由键作为参数。这是因为交换和路由键用于将消息路由到队列,但是一旦消息在队列中,它们的下一个目的地就是将消息从队列中取出的使用者。使用应用程序不需要关心交换或路由键,队列是在消费应用程序是仅仅需要知道一个东西。
许多方法接受一个
让我们看看如何将其付诸实践。下面程序清单显示了
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return message != null
? (Order) converter.fromMessage(message)
: null;
}
}
根据实际情况的不同,可能容忍一个小的延迟。例如,在
public Order receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (Order) converter.fromMessage(message)
: null;
}
如果你和我一样,看到这样一个硬编码的数字会让你有点不舒服。那么创建一个带
spring:
rabbitmq:
template:
receive-timeout: 30000
回到
public Oreder receiveOrder() {
return (Order) rabbit.receiveAndConvert("tacocloud.order.queue");
}
那就简单多了,不是吗?所看到的唯一麻烦的事情就是从
public Order receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}
这是否比类型转换更好还值得商榷,但它是一种比类型转换更安全的方法。使用
使用监听器处理
对于消息驱动的
例如,下面的程序清单显示了
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}
这与程序清单
虽然在前面的段落中可能感觉到了
在结束本章时,让我们继续关注