9.2 探索Spring Integration
9.2 探索Spring Integration
集成流由以下一个或多个组件组成。在编写更多代码之前,我们将简要地了解一下这些组件在集成流中所扮演的角色:
-
Channels —— 将信息从一个元素传递到另一个元素。
-
Filters —— 有条件地允许基于某些标准的消息通过流。
-
Transformers —— 更改消息值或将消息有效负载从一种类型转换为另一种类型。
-
Routers —— 直接将信息发送到几个渠道之一,通常是基于消息头。
-
Splitters —— 将收到的信息分成两条或多条,每条都发送到不同的渠道。
-
Aggregators —— 与分离器相反,它将来自不同渠道的多条信息组合成一条信息。
-
Service activators —— 将消息传递给某个
Java 方法进行处理,然后在输出通道上发布返回值。 -
Channel adapters —— 将通道连接到某些外部系统或传输。可以接受输入,也可以向外部系统写入。
-
Gateways —— 通过接口将数据传递到集成流。
在定义文件写入集成流时,你已经看到了其中的一些组件。
9.2.1 消息通道
消息通道意指消息移动的集成管道移动。它们是连接
- PublishSubscribeChannel —— 消息被发布到
PublishSubscribeChannel 后又被传递给一个或多个消费者。如果有多个消费者,他们都将会收到消息。 - QueueChannel —— 消息被发布到
QueueChannel 后被存储到一个队列中,直到消息被消费者以先进先出(FIFO)的方式拉取。如果有多个消费者,他们中只有一个能收到消息。 - PriorityChannel —— 与
QueueChannel 类似,但是与FIFO 方式不同,消息被冠以priority 的消费者拉取。 - RendezvousChannel —— 与
QueueChannel 期望发送者阻塞通道,直到消费者接收这个消息类似,这种方式有效的同步了发送者与消费者。 - DirectChannel —— 与
PublishSubscribeChannel 类似,但是是通过在与发送方相同的线程中调用消费者来将消息发送给单个消费者,此通道类型允许事务跨越通道。 - ExecutorChannel —— 与
DirectChannel 类似,但是消息分派是通过TaskExecutor 进行的,在与发送方不同的线程中进行,此通道类型不支持事务跨通道。 - FluxMessageChannel ——
Reactive Streams Publisher 基于Project Reactor Flux 的消息通道。 (我们将会在第10 章讨论Reactive Streams 、Reactor 和Flux )
在
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}
然后在集成流定义中通过名称引用这个通道。例如,如果一个服务
@ServiceActovator(inputChannel="orderChannel")
或者,如果使用
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlows
...
.channel("orderChannel")
...
.get();
}
需要注意的是,如果使用
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}
需要确保将使用者配置为轮询消息通道。在服务激活器的情况下,
@ServiceActivator(inputChannel="orderChannel",
poller=@Poller(fixedRate="1000"))
在本例中,服务激活器每秒(或
9.2.2 过滤器
过滤器可以放置在集成管道的中间,以允许或不允许消息进入流中的下一个步骤。
例如,假设包含整数值的消息通过名为
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
...
.<Integer>filter((p) -> p % 2 == 0)
...
.get();
}
在本例中,使用
9.2.3 转换器
转换器对消息执行一些操作,通常会产生不同的消息,并且可能会产生不同的负载类型。转换可以是一些简单的事情,例如对数字执行数学运算或操作
例如,假设正在一个名为
@Bean
@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
return RomanNubers::toRoman;
}
通过
在
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
...
.transform(RomanNumbers::toRoman)
...
.get();
}
虽然在两个
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow(
RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
...
.transform(romanNumberTransformer)
...
.get();
}
在这里,声明了一个
9.2.4 路由
基于某些路由标准的路由器允许在集成流中进行分支,将消息定向到不同的通道。
例如,假设有一个名为
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel>
determineTargetChannels(Message<?> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}
这里声明的
在
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
...
.<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping ->
mapping.subFlowMapping("EVEN", sf ->
sf.<Integer, Integer>transform(n -> n * 10).handle((i,h) -> { ... }))
.subFlowMapping("ODD", sf ->
sf.transform(RomanNumbers::toRoman).handle((i,h) -> { ... }))
)
.get();
}
虽然仍然可以声明
如果是偶数,则返回一个偶数的字符串值。如果是奇数,则返回奇数。然后使用这些值来确定哪个子映射将处理消息。
9.2.5 Splitter
有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。
- 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
- 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,
Splitter 后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。
当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个
例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
然后,可以使用
@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
在这里,购买订单到达名为
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(List.class.getName(), "lineItemsChannel");
return router;
}
顾名思义,
按照目前的情况,流分为两个子流:一个是
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}
当携带
如果你想使用
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping ->
mapping.subFlowMapping("BILLING_INFO", sf ->
sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
.subFlowMapping("LINE_ITEMS", sf ->
sf.split().<LineItem> handle((lineItem, h) -> { ... }))
)
.get();
流定义的
9.2.6 服务激活器
服务激活器从输入信道接收消息并发送这些消息给的
@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
return message -> {
System.out.println("Message payload: " + message.getPayload());
};
}
通过
另外,可以声明一个服务激活器,用于在返回一个新的有效载荷之前处理传入的消息。在这种情况下,这个
@Bean
@ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(OrderRepository orderRepo) {
return (payload, headers) -> {
return orderRepo.save(payload);
};
}
在这种情况下,服务激活器是一个
注意,
public IntegrationFlow someFlow() {
return IntegrationFlows
...
.handle(msg -> {
System.out.println("Message payload: " + msg.getPayload());
})
.get();
}
在这种情况下,
类似地,如果服务激活器不是流的结束,
public IntegrationFlow orderFlow(OrderRepository orderRepo) {
return IntegrationFlows
...
.<Order>handle((payload, headers) -> {
return orderRepo.save(payload);
})
...
.get();
}
当利用
9.2.7 网关
网关是通过一个应用程序可以将数据提交到一个集成信息流和接收这是该流的结果的响应的装置。通过
你已经见过
作为一个例子,假设一个网关处理接受一个
package com.example.demo;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
String uppercase(String in);
}
令人惊叹的是,没有必要实现这个接口。
当
至于
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows
.from("inChannel")
.<String, String> transform(s -> s.toUpperCase())
.channel("outChannel")
.get();
}
正如这里所定义的,流程启动于名为
9.2.8 通道适配器
通道适配器代表集成信息流的入口点和出口点。数据通过入站信道适配器的方式进入到集成流中,通过出站信道适配器的方式离开集成流。
入站信道的适配器可以采取多种形式,这取决于它们引入到流的数据源。例如,声明一个入站通道适配器,它采用从
@Bean
@InboundChannelAdapter(
poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
return () -> {
return new GenericMessage<>(source.getAndIncrement());
};
}
此
当使用
@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
return IntegrationFlows
.from(integerSource, "getAndIncrement",
c -> c.poller(Pollers.fixedRate(1000)))
...
.get();
}
通常情况下,通道适配器通过的
@Bean
@InboundChannelAdapter(channel="file-channel",
poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource sourceReader = new FileReadingMessageSource();
sourceReader.setDirectory(new File(INPUT_DIR));
sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
return sourceReader;
}
当在
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INPUT_DIR))
.patternFilter(FILE_PATTERN))
.get();
}
服务激活器(作为消息处理的实现)往往是为出站通道适配器而存在的,特别是当数据需要被扇出到应用程序本身的时候。
值得一提的,
9.2.9 端点模块
模块 | 依赖的 |
---|---|
AMQP | spring-integration-amqp |
Spring application events | spring-integration-event |
RSS and Atom | spring-integration-feed |
Filesystem | spring-integration-file |
FTP/FTPS | spring-integration-ftp |
GemFire | spring-integration-gemfire |
HTTP | spring-integration-http |
JDBC | spring-integration-jdbc |
JPA | spring-integration-jpa |
JMS | spring-integration-jms |
spring-integration-mail | |
MongoDB | spring-integration-mongodb |
MQTT | spring-integration-mqtt |
Redis | spring-integration-redis |
RMI | spring-integration-rmi |
SFTP | spring-integration-sftp |
STOMP | spring-integration-stomp |
Stream | spring-integration-stream |
Syslog | spring-integration-syslog |
TCP/UDP | spring-integration-ip |
spring-integration-twitter | |
Web | Services spring-integration-ws |
WebFlux | spring-integration-webflux |
WebSocket | spring-integration-websocket |
XMPP | spring-integration-xmpp |
ZooKeeper | spring-integration-zookeeper |
从表
更重要的是,本章在表
每个端点模块提供通道适配器,当使用