8.1 使用JMS 发送消息
8.1 使用JMS 发送消息
我们将探讨
8.1.1 设置JMS
在使用
如果使用
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
如果选择
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
默认情况下,
属性 | 描述 |
---|---|
spring.artemis.host | |
spring.artemis.port | |
spring.artemis.user | 用于访问 |
spring.artemis.password | 用于访问 |
例如,考虑应用程序中的以下条目。可能用于非开发设置的
spring:
artemis:
host: artemis.tacocloud.com
port: 61617
user: tacoweb
password: 13tm31n
这将设置
如果要使用
属性 | 描述 |
---|---|
spring.activemq.broker-url | |
spring.activemq.user | 用于访问 |
spring.activemq.password | 用于访问 |
spring.activemq.in-memory | 是否启动内存 |
请注意,不是为tcp://
spring:
activemq:
broker-url: tcp://activemq.tacocloud.com
user: tacoweb
password: 13tm31n
无论选择
但是,如果使用
在继续之前,将希望安装并启动一个
-
Artemis —— https://activemq.apache.org/artemis/docs/latest/using-server.html
-
ActiveMQ —— http://activemq.apache.org/getting-started.html#GettingStarted-PreInstallationRequirements
有了构建中的
8.1.2 使用JmsTemplate 发送消息
在构建中有
// 发送原始消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
// 发送转换自对象的消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
// 发送经过处理后从对象转换而来的消息
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
实际上只有两个方法,
send() 方法需要一个MessageCreator 来制造一个Message 对象。convertAndSend() 方法接受一个Object ,并在后台自动将该Object 转换为一条Message 。- 三种
convertAndSend() 方法会自动将一个Object 转换成一条Message ,但也会接受一个MessagePostProcessor ,以便在Message 发送前对其进行定制。
此外,这三个方法类别中的每一个都由三个重载的方法组成,它们是通过指定
- 一个方法不接受目的地参数,并将消息发送到默认目的地。
- 一个方法接受指定消息目的地的目标对象。
- 一个方法接受一个
String ,该String 通过名称指定消息的目的地。
要使这些方法工作起来,请考虑下面程序清单中的
package tacos.messaging;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@Service
public class JmsOrderMessagingService implements OrderMessagingService {
private JmsTemplate jms;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms) {
this.jms = jms;
}
@Override
public void sendOrder(Order order) {
jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session)
throws JMSException {
return session.createObjectMessage(order);
}
});
}
}
我认为程序清单
@Override
public void sendOrder(Order order) {
jms.send(session -> session.createObjectMessage(order));
}
但是请注意,对
spring:
jms:
template:
default-destination: tacocloud.order.queue
在许多情况下,使用缺省目的地是最简单的选择。它让你指定一次目的地名称,允许代码只关心发送消息,而不关心消息被发送到哪里。但是,如果需要将消息发送到缺省目的地之外的目的地,则需要将该目的地指定为
一种方法是传递目标对象作为
public Destination orderQueue() {
return new ActiveMQQueue("tacocloud.order.queue");
}
需要注意的是,这里使用的
如果这个
private Destination orderQueue;
@Autowired
public JmsOrderMessagingService(JmsTemplate jms, Destination orderQueue) {
this.jms = jms;
this.orderQueue = orderQueue;
}
@Override
public void sendOrder(Order order) {
jms.send(
orderQueue,
session -> session.createObjectMessage(order));
}
使用类似这样的
@Override
public void sendOrder(Order order) {
jms.send(
"tacocloud.order.com",
session -> session.createObjectMessage(order));
}
虽然
在发送前转换消息
例如,
@Override
public void sendOrder(Order order) {
jms.convertAndSend("tacocloud.order.queue", order);
}
与
无论选择哪种形式的
配置消息转换器
public interface MessageConverter {
Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException;
Object fromMessage(Message message);
}
这个接口的实现很简单,都不需要创建自定义实现。
消息转换器 | 做了什么 |
---|---|
MappingJackson2MessageConverter | 使用 |
MarshallingMessageConverter | 使用 |
MessagingMessageConverter | 使用底层 |
SimpleMessageConverter | 将 |
为了应用不同的消息转换器,需要做的是将选择的转换器声明为一个
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
return messageConverter;
}
注意一下,你在返回
为了实现更大的灵活性,可以通过调用消息转换器上的
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter =
new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
Map<String, Class<?>> typeIdMappings = new HashMap<String, Class<?>>();
typeIdMappings.put("order", Order.class);
messageConverter.setTypeIdMappings(typeIdMappings);
messageConverter;
}
与在消息的 _typeId
属性中发送完全限定的类名不同,将发送值
后期处理消息
让我们假设,除了利润丰厚的网络业务,
在
更简单的解决方案是在消息中添加一个自定义头信息,以承载订单的源。如果正在使用
jms.send("tacocloud.order.queue",
session -> {
Message message = session.createObjectMessage(order);
message.setStringProperty("X_ORDER_SOURCE", "WEB");
});
这里的问题是没有使用
幸运的是,有一种方法可以在发送消息之前调整在幕后创建的
jms.convertAndSend("tacocloud.order.queue", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message)
throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});
可能注意到了
jms.convertAndSend("tacocloud.order.queue", order,
message -> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});
尽管只需要这个特定的
@GetMapping("/convertAndSend/order")
public String convertAndSendOrder() {
Order order = buildOrder();
jms.convertAndSend("tacocloud.order.queue", order, this::addOrderSource);
return "Convert and sent order";
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
已经看到了几种发送消息的方法。但是,如果没有人收到信息,就没有什么用处。让我们看看如何使用
8.1.3 接收JMS 消息
在消费消息时,可以选择 拉模型(代码请求消息并等待消息到达)或 推模型(消息可用时将消息传递给代码
另一方面,还可以选择使用推模型,在该模型中,定义了一个消息监听器,它在消息可用时被调用。
这两个选项都适用于各种用例。人们普遍认为推模型是最佳选择,因为它不会阻塞线程。但是在某些用例中,如果消息到达得太快,侦听器可能会负担过重。拉模型允许使用者声明他们已经准备好处理新消息。
让我们看看接收消息的两种方式。我们将从
使用
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
可以看到,这
要查看这些操作,需要编写一些代码来从
package tacos.kitchen.messaging.jms;
import javax.jms.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
private MessageConverter converter;
@Autowired
public JmsOrderReceiver(JmsTemplate jms, MessageConverter converter) {
this.jms = jms;
this.converter = converter;
}
public Order receiveOrder() {
Message message = jms.receive("tacocloud.order.queue");
return (Order) converter.fromMessage(message);
}
}
这里,使用了一个
接收原始
package tacos.kitchen.messaging.jms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class JmsOrderReceiver implements OrderReceiver {
private JmsTemplate jms;
@Autowired
public JmsOrderReceiver(JmsTemplate jms) {
this.jms = jms;
}
public Order receiveOrder() {
return (Order) jms.receiveAndConvert("tacocloud.order.queue");
}
}
这个新版本的
在继续之前,让我们考虑一下如何在
此时,
现在,让我们通过声明
声明消息监听器
在拉模型中,接收消息需要显式调用
要创建对
package tacos.kitchen.messaging.jms.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@JmsListener(destination = "tacocloud.order.queue")
public void receiveOrder(Order order) {
ui.displayOrder(order);
}
}
在许多方面,
消息监听器通常被吹捧为最佳的选择,因为它们不会阻塞,并且能够快速处理多个消息。然而,在
这并不是说消息监听器不好。相反,当消息可以快速处理时,它们是完美的选择。但是,当消息处理程序需要能够根据自己的时间请求更多消息时,
因为