Spring Integration
Spring Integration
消息处理
消息的分割:

消息的聚合:

消息的过滤:

消息的分发:

简单实例
SubscribableChannel messageChannel =new DirectChannel(); // 1
messageChannel.subscribe(msg-> { // 2
System.out.println("receive: " +msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3
-
构造一个可订阅的消息通道
messageChannel
; -
使用
MessageHandler
去消费这个消息通道里的消息; -
发送一条消息到这个消息通道,消息最终被消息通道里的
MessageHandler
所消费。
最后控制台打印出receive: msg from alibaba
;
DirectChannel
内部有个 UnicastingDispatcher
类型的消息分发器,会分发到对应的消息通道 MessageChannel
中,从名字也可以看出来,UnicastingDispatcher
是个单播的分发器,只能选择一个消息通道。那么如何选择呢LoadBalancingStrategy
负载均衡策略,默认只有轮询的实现,可以进行扩展。
我们对上段代码做一点修改,使用多个 MessageHandler
去处理消息:
SubscribableChannel messageChannel = new DirectChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
由于 DirectChannel
内部的消息分发器是 UnicastingDispatcher
单播的方式,并且采用轮询的负载均衡策略,所以这里两次的消费分别对应这两个 MessageHandler
。控制台打印出:
receive1: msg from alibaba
receive2: msg from alibaba
既然存在单播的消息分发器 UnicastingDispatcher
,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher
,它被 PublishSubscribeChannel
这个消息通道所使用。广播消息分发器会把消息分发给所有的 MessageHandler
:
SubscribableChannel messageChannel = new PublishSubscribeChannel();
messageChannel.subscribe(msg -> {
System.out.println("receive1: " + msg.getPayload());
});
messageChannel.subscribe(msg -> {
System.out.println("receive2: " + msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
发送两个消息,都被所有的 MessageHandler
所消费。控制台打印:
receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba