9.2探索Spring Integration

9.2探索Spring Integration

Spring Integration涵盖了许多集成场景。试图将所有这些内容都包含在一个章节中,就像试图将大象装进一个信封一样。我将展示一张Spring Integration大象的照片,而不是对Spring Integration进行全面的讨论,以便让你了解它是如何工作的。然后,将创建一个向Taco Cloud应用程序添加功能的集成流。

集成流由以下一个或多个组件组成。在编写更多代码之前,我们将简要地了解一下这些组件在集成流中所扮演的角色:

  • Channels —— 将信息从一个元素传递到另一个元素。

  • Filters —— 有条件地允许基于某些标准的消息通过流。

  • Transformers —— 更改消息值或将消息有效负载从一种类型转换为另一种类型。

  • Routers —— 直接将信息发送到几个渠道之一,通常是基于消息头。

  • Splitters —— 将收到的信息分成两条或多条,每条都发送到不同的渠道。

  • Aggregators —— 与分离器相反,它将来自不同渠道的多条信息组合成一条信息。

  • Service activators —— 将消息传递给某个Java方法进行处理,然后在输出通道上发布返回值。

  • Channel adapters —— 将通道连接到某些外部系统或传输。可以接受输入,也可以向外部系统写入。

  • Gateways —— 通过接口将数据传递到集成流。

在定义文件写入集成流时,你已经看到了其中的一些组件。FileWriterGateway接口是将应用程序提交的文本写入文件的网关。还定义了一个转换器来将给定的文本转换为大写;然后声明一个服务网关,它执行将文本写入文件的任务。这个流有两个通道:textInChannelfileWriterChannel,它们将其他组件相互连接起来。现在,按照承诺快速浏览一下集成流组件。

9.2.1消息通道

消息通道意指消息移动的集成管道移动。它们是连接Spring Integration所有其他部分的管道。

Spring Integration提供了多个管道的实现,包括以下这些:

  • PublishSubscribeChannel —— 消息被发布到PublishSubscribeChannel后又被传递给一个或多个消费者。如果有多个消费者,他们都将会收到消息。
  • QueueChannel —— 消息被发布到QueueChannel后被存储到一个队列中,直到消息被消费者以先进先出(FIFO)的方式拉取。如果有多个消费者,他们中只有一个能收到消息。
  • PriorityChannel —— 与QueueChannel类似,但是与FIFO方式不同,消息被冠以priority的消费者拉取。
  • RendezvousChannel —— 与QueueChannel期望发送者阻塞通道,直到消费者接收这个消息类似,这种方式有效的同步了发送者与消费者。
  • DirectChannel —— 与PublishSubscribeChannel类似,但是是通过在与发送方相同的线程中调用消费者来将消息发送给单个消费者,此通道类型允许事务跨越通道。
  • ExecutorChannel —— 与DirectChannel类似,但是消息分派是通过TaskExecutor进行的,在与发送方不同的线程中进行,此通道类型不支持事务跨通道。
  • FluxMessageChannel —— Reactive Streams Publisher基于Project Reactor Flux的消息通道(我们将会在第10章讨论Reactive StreamsReactorFlux

Java配置和Java DSL样式中,输入通道都是自动创建的,默认是DirectChannel。但是,如果希望使用不同的通道实现,则需要显式地将通道声明为bean并在集成流中引用它。例如,要声明PublishSubscribeChannel,需要声明以下@Bean方法:

@Bean
public MessageChannel orderChannel() {
    return new PublishSubscribeChannel();
}

然后在集成流定义中通过名称引用这个通道。例如,如果一个服务activator bean正在使用这个通道,那么可以在@ServiceActivatorinputChannel属性中引用它:

@ServiceActovator(inputChannel="orderChannel")

或者,如果使用Java DSL配置方式,需要通过调用channel()方法引用它:

@Bean
public IntegrationFlow orderFlow() {
    return IntegrationFlows
        ...
        .channel("orderChannel")
        ...
        .get();
}

需要注意的是,如果使用QueueChannel,则必须为使用者配置一个轮询器。例如,假设已经声明了一个这样的QueueChannel bean

@Bean
public MessageChannel orderChannel() {
    return new QueueChannel();
}

需要确保将使用者配置为轮询消息通道。在服务激活器的情况下,@ServiceActivator注解可能是这样的:

@ServiceActivator(inputChannel="orderChannel",
                 poller=@Poller(fixedRate="1000"))

在本例中,服务激活器每秒(或1,000 ms)从名为orderChannel的通道轮询一次。

9.2.2过滤器

过滤器可以放置在集成管道的中间,以允许或不允许消息进入流中的下一个步骤。

例如,假设包含整数值的消息通过名为numberChannel的通道发布,但是只希望偶数传递到名为evenNumberChannel的通道。在这种情况下,可以使用@Filter注解声明一个过滤器,如下所示:

@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
    return IntegrationFlows
        ...
        .<Integer>filter((p) -> p % 2 == 0)
        ...
        .get();
}

在本例中,使用lambda表达式实现过滤器。但是,事实上,filter()方法是接收一个GenericSelector作为参数。这意味着可以实现GenericSelector接口,而不是引入一个简略的lambda表达式实现过滤。

9.2.3转换器

转换器对消息执行一些操作,通常会产生不同的消息,并且可能会产生不同的负载类型。转换可以是一些简单的事情,例如对数字执行数学运算或操作String字符串值;转换也会很复杂,例如使用表示ISBNString字符串值来查找并返回相应书籍的详细信息。

例如,假设正在一个名为numberChannel的通道上发布整数值,并且希望将这些数字转换为包含等效罗马数字的String字符串。在这种情况下,可以声明一个GenericTransformer类型的bean,并添加@Transformer注解,如下所示:

@Bean
@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
    return RomanNubers::toRoman;
}

通过@Transformer注解将bean指定为transformer bean,它从名为numberChannel的通道接收整数值,并使用oRoman()toRoman()方法是在一个名为RomanNumbers的类中静态定义的,并在这里通过方法引用进行引)的静态方法进行转换,得到的结果被发布到名为romanNumberChannel的通道中。

Java DSL配置风格中,调用transform()甚至更简单,将方法引用传递给toRoman()方法即可:

@Bean
public IntegrationFlow transformerFlow() {
    return IntegrationFlows
        ...
        .transform(RomanNumbers::toRoman)
        ...
        .get();
}

虽然在两个transformer代码示例中都使用了方法引用,但是要知道transformer也可以使用lambda表达式。或者,如果transformer比较复杂,需要单独的成为一个Java类,可以将它作为bean注入流配置,并将引用传递给transform()方法:

@Bean
public RomanNumberTransformer romanNumberTransformer() {
    return new RomanNumberTransformer();
}

@Bean
public IntegrationFlow transformerFlow(
    RomanNumberTransformer romanNumberTransformer) {
    return IntegrationFlows
        ...
        .transform(romanNumberTransformer)
        ...
        .get();
}

在这里,声明了一个RomanNumberTransformer类型的bean,它本身是Spring IntegrationTransformerGenericTransformer接口的实现。bean被注入到transformerFlow()方法,并在定义集成流时传递给transform()方法。

9.2.4路由

基于某些路由标准的路由器允许在集成流中进行分支,将消息定向到不同的通道。

例如,假设有一个名为numberChannel的通道,整数值通过它流动。假设希望将所有偶数消息定向到一个名为evenChannel的通道,而将奇数消息定向到一个名为oddChannel的通道。要在集成流中创建这样的路由,可以声明一个AbstractMessageRouter类型的bean,并使用@Router注解该bean

@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
    return new AbstractMessageRouter() {
        @Override
        protected Collection<MessageChannel>
            determineTargetChannels(Message<?> message) {
            Integer number = (Integer) message.getPayload();
            if (number % 2 == 0) {
                return Collections.singleton(evenChannel());
            }
            return Collections.singleton(oddChannel());
        }
    };
}

@Bean
public MessageChannel evenChannel() {
    return new DirectChannel();
}

@Bean
public MessageChannel oddChannel() {
    return new DirectChannel();
}

这里声明的AbstractMessageRouter bean接受来自名为numberChannel的输入通道的消息。定义为匿名内部类的实现检查消息有效负载,如果它是偶数,则返回名为evenChannel的通道(在路由器bean之后声明为bean。否则,通道有效载荷中的数字必须为奇数;在这种情况下,将返回名为oddChannel的通道(也在bean声明方法中声明

Java DSL形式中,路由器是通过在流定义过程中调用route()来声明的,如下所示:

@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
    return IntegrationFlows
        ...
        .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping ->
            mapping.subFlowMapping("EVEN", sf ->
               sf.<Integer, Integer>transform(n -> n * 10).handle((i,h) -> { ... }))
                 .subFlowMapping("ODD", sf ->
                     sf.transform(RomanNumbers::toRoman).handle((i,h) -> { ... }))
            )
        .get();
}

虽然仍然可以声明AbstractMessageRouter并将其传递给route(),但是本例使用lambda表达式来确定消息有效负载是奇数还是偶数。

如果是偶数,则返回一个偶数的字符串值。如果是奇数,则返回奇数。然后使用这些值来确定哪个子映射将处理消息。

9.2.5 Splitter

有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter将为分割并处理这些消息。

Splitter在很多情况下都很有用,但是有两个基本用例可以使用Splitter

  • 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
  • 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,Splitter后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。

当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个POJO即可,该POJO提取传入的有效负载的各个部分,并将它们作为集合的元素返回。

例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的OrderSplitter将完成这项工作:

public class OrderSplitter {
    public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
        ArrayList<Object> parts = new ArrayList<>();
        parts.add(po.getBillingInfo());
        parts.add(po.getLineItems());
        return parts;
    }
}

然后,可以使用@Splitter注解将OrderSplitter bean声明为集成流的一部分,如下所示:

@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
    return new OrderSplitter();
}

在这里,购买订单到达名为poChannel的通道,并被OrderSplitter分割。然后,将返回集合中的每个项作为集成流中的单独消息发布到名为splitOrderChannel的通道。在流的这一点上,可以声明一个PayloadTypeRouter来将账单信息和项目,并路由到它们自己的子流:

@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(
        BillingInfo.class.getName(), "billingInfoChannel");
    router.setChannelMapping(List.class.getName(), "lineItemsChannel");
    return router;
}

顾名思义,PayloadTypeRouter根据消息的有效负载类型将消息路由到不同的通道。按照这里的配置,将有效负载为类型为BillingInfo的消息路由到一个名为billingInfoChannel的通道进行进一步处理。至于项目信息,它们在java.util.List集合包中;因此,可以将List类型的有效负载映射到名为lineItemsChannel的通道中。

按照目前的情况,流分为两个子流:一个是BillingInfo对象流,另一个是List流。但是,如果想进一步分割它,而不是处理LineItem列表,而是分别处理每个LineItem,该怎么办呢?要将列表拆分为多个消息(每个行项对应一条消息,只需编写一个方法(而不是bean,该方法使用@Splitter进行注解,并返回LineItems集合,可能类似如下:

@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
    return lineItems;
}

当携带List的有效负载的消息到达名为lineItemsChannel的通道时,它将传递到lineItemSplitter()方法。根据Splitter的规则,该方法必须返回要Splitter的项的集合。在本例中,已经有了LineItems的集合,因此只需直接返回该集合。因此,集合中的每个LineItem都以其自己的消息形式发布到名为lineItemChannel的通道。

如果你想使用Java DSL来声明相同的Splitter/Router配置,你可以调用split()route()

return IntegrationFlows
    ...
    .split(orderSplitter())
    .<Object, String> route(p -> {
        if (p.getClass().isAssignableFrom(BillingInfo.class)) {
            return "BILLING_INFO";
        } else {
            return "LINE_ITEMS";
        }
    }, mapping ->
           mapping.subFlowMapping("BILLING_INFO", sf ->
                      sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
                  .subFlowMapping("LINE_ITEMS", sf ->
                       sf.split().<LineItem> handle((lineItem, h) -> { ... }))
    )
    .get();

流定义的DSL形式当然更简洁,如果不是更难于理解的话。它使用与Java配置示例相同的OrderSplitter来分割订单。在订单被分割之后,它被其类型路由到两个单独的子流。

9.2.6服务激活器

服务激活器从输入信道接收消息并发送这些消息给的MessageHandlerSpring集成提供了多种的MessageHandler实现开箱即用(PayloadTypeRouter就是MessageHandler的实现,但你会经常需要提供一些定制实现充当服务激活。作为一个例子,下面的代码说明了如何声明的MessageHandler bean,构成为一个服务激活器:

@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
    return message -> {
        System.out.println("Message payload: " + message.getPayload());
    };
}

通过@ServiceActivator注解bean,将其指定为一个服务激活器,从所述信道处理消息命名someChannel。至于MessageHandler的本身,它是通过一个lambda实现。虽然这是一个简单的MessageHandler,给定的消息时,它发出其有效载荷的标准输出流。

另外,可以声明一个服务激活器,用于在返回一个新的有效载荷之前处理传入的消息。在这种情况下,这个bean应该是一个GenericHandler而非的MessageHandler

@Bean
@ServiceActivator(inputChannel="orderChannel", outputChannel="completeOrder")
public GenericHandler<Order> orderHandler(OrderRepository orderRepo) {
    return (payload, headers) -> {
        return orderRepo.save(payload);
    };
}

在这种情况下,服务激活器是一个GenericHandler,其中的有效载荷为Order类型。当订单到达,它是通过repository进行保存;保存Order后产生的结果被发送到名称为completeChannel的输出通道。

注意,GenericHandler不仅给出了有效载荷,还有消息头(即使该示例不使用任何形式的头信息。同时也可以通过传递了MessageHandlerGenericHandler去调用在流定义中的handler()方法,来使用在Java DSL配置式中的服务激活器:

public IntegrationFlow someFlow() {
    return IntegrationFlows
        ...
        .handle(msg -> {
            System.out.println("Message payload: " + msg.getPayload());
        })
        .get();
}

在这种情况下,MessageHandler是作为一个lambda,但也可以将它作为一个参考方法甚至是一个类,它实现了MessageHandler接口。如果给它一个lambda或方法引用,要知道,它是接受一个消息作为参数。

类似地,如果服务激活器不是流的结束,handler()可以写成接受GenericHandler参数。从之前应用订单存储服务激活器来看,可以使用Java DSL对流程进行配置:

public IntegrationFlow orderFlow(OrderRepository orderRepo) {
    return IntegrationFlows
        ...
        .<Order>handle((payload, headers) -> {
            return orderRepo.save(payload);
        })
        ...
        .get();
}

当利用GenericHandler时,lambda表达式或方法参考接受该消息的有效载荷和报头作为参数。另外,如果选择在一个流程的结束使用GenericHandler,需要返回null,否则会得到这表明有没有指定输出通道的错误。

9.2.7网关

网关是通过一个应用程序可以将数据提交到一个集成信息流和接收这是该流的结果的响应的装置。通过Spring Integration实现的,网关是实现为应用程序可以调用将消息发送到集成信息流的接口。

你已经见过FileWriterGateway消息网关的例子。FileWriterGateway是单向网关,它的方法接受String作为参数,将其写入到文件中,返回void。同样,编写一个双向网关也很容易。当写网关接口时,确保该方法返回某个值发布到集成流程即可。

作为一个例子,假设一个网关处理接受一个String的简单集成信息流,并把特定的String转成大写。网关接口可能是这个样子:

package com.example.demo;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel="inChannel", defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
    String uppercase(String in);
}

令人惊叹的是,没有必要实现这个接口。Spring Integration自动提供运行时实现,这个实现会使用特定的通道进行数据的发送与接收。

uppercase()被调用时,给定的String被发布到名为inChannel的集成流通道中。而且,不管流是如何定义的或是它是做什么的,在当数据到达名为outChannel通道时,它从uppercase()方法中返回。

至于uppercase集成流,它只有一个单一的步骤,把String转换为大写一个简单的集成流。以下是使用Java DSL配置:

@Bean
public IntegrationFlow uppercaseFlow() {
    return IntegrationFlows
        .from("inChannel")
        .<String, String> transform(s -> s.toUpperCase())
        .channel("outChannel")
        .get();
}

正如这里所定义的,流程启动于名为inChannel的通道获得数据输入的时候。然后消息的有效负载通过转换器去执行变成大写字母的操作,这里的操作都使用lambda表达式进行定义。消息的处理结果被发布到名为outChannel的通道中,这个通道就是已经被声明为UpperCaseGateway接口的答复通道。

9.2.8通道适配器

通道适配器代表集成信息流的入口点和出口点。数据通过入站信道适配器的方式进入到集成流中,通过出站信道适配器的方式离开集成流。

入站信道的适配器可以采取多种形式,这取决于它们引入到流的数据源。例如,声明一个入站通道适配器,它采用从AtomicInteger到流递增的数字。使用Java配置,它可能是这样的:

@Bean
@InboundChannelAdapter(
    poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
    return () -> {
        return new GenericMessage<>(source.getAndIncrement());
    };
}

@Bean方法声明了一个入站信道适配器bean,后面跟随着@InboundChannelAdapter注解,它们每1秒(1000 ms)从注入的AtomicInteger提交一个数字到名numberChannel的通道中。

当使用Java配置时,@InboundChannelAdapter意味着是一个入站通道适配器,from()方法就是使用Java DSL来定义流的时候,表明它是怎么处理的。下面对于流定义的一个片段展示了在Java DSL配置中类似的输入通道适配器:

@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
    return IntegrationFlows
        .from(integerSource, "getAndIncrement",
              c -> c.poller(Pollers.fixedRate(1000)))
        ...
        .get();
}

通常情况下,通道适配器通过的Spring Integration的多端点模块之一进行提供。举个例子,假设需要一个入站通道适配器,用它来监视指定的目录,同时将任何写入到那个目录中的文件作为消息,提交到名为file-channel的通道中。下面的Java配置使用FileReadingMessageSourceSpring Integration的文件端点模块来实现这一目标:

@Bean
@InboundChannelAdapter(channel="file-channel",
                       poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource sourceReader = new FileReadingMessageSource();
    sourceReader.setDirectory(new File(INPUT_DIR));
    sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
    return sourceReader;
}

当在Java DSL中写入同样的file-reading入站通道适配器时,来自Files类的inboundAdapter()方法达到的同样的目的。出站通道适配器位于集成信息流的最后位置,将最终消息扇出到应用程序或是其他系统中:

@Bean
public IntegrationFlow fileReaderFlow() {
    return IntegrationFlows
        .from(Files.inboundAdapter(new File(INPUT_DIR))
              .patternFilter(FILE_PATTERN))
        .get();
}

服务激活器(作为消息处理的实现)往往是为出站通道适配器而存在的,特别是当数据需要被扇出到应用程序本身的时候。

值得一提的,Spring Integration的端点模块为几种常见的用例提供了有用的消息处理程序。如在程序清单9.3中所看到的FileWritingMessageHandler出站通道适配器,这就是一个很好的例子。说到Spring Integration端点模块,让我们快速浏览一下准备使用的集成端点模块。

9.2.9端点模块

Spring Integration可以让你创建自己的通道适配器,这是很棒的。但是,更棒的是Spring Integration提供了包含通道超过两打的端点模块适配器,包括入站和出站,用于与各种常用外部系统进行集成,如表9.1所示。

模块 依赖的Artifact ID
AMQP spring-integration-amqp
Spring application events spring-integration-event
RSS and Atom spring-integration-feed
Filesystem spring-integration-file
FTP/FTPS spring-integration-ftp
GemFire spring-integration-gemfire
HTTP spring-integration-http
JDBC spring-integration-jdbc
JPA spring-integration-jpa
JMS spring-integration-jms
Email spring-integration-mail
MongoDB spring-integration-mongodb
MQTT spring-integration-mqtt
Redis spring-integration-redis
RMI spring-integration-rmi
SFTP spring-integration-sftp
STOMP spring-integration-stomp
Stream spring-integration-stream
Syslog spring-integration-syslog
TCP/UDP spring-integration-ip
Twitter spring-integration-twitter
Web Services spring-integration-ws
WebFlux spring-integration-webflux
WebSocket spring-integration-websocket
XMPP spring-integration-xmpp
ZooKeeper spring-integration-zookeeper

从表9.1可以清楚的看出Spring Integration提供了一套广泛的组件,以满足众多集成的需求。大多数应用程序一点都不会用到Spring Integration提供的功能。但是,如果你需要它们,很好,Spring Integration几乎都能覆盖到。

更重要的是,本章在表9.1中列出模块,不可能涵盖提供的所有通道适配器。你已经看到,使用文件系统模块写入到文件系统的例子。而你很快就要使用电子邮件模块读取电子邮件。

每个端点模块提供通道适配器,当使用Java配置时,可以被声明为bean,当时应Java DSL配置时,可以通过静态方法进行引用。鼓励你去探索你最感兴趣的任何端点模块。你会发现它们的使用方法相当一致。但现在,让我们把关注点转向电子邮件端点模块,看看在Taco Cloud应用程序中如何使用它。

上一页
下一页