Green Beans:Spring Integration 入门

工程 | Josh Long | 2011 年 2 月 24 日 | ...

应用程序不是孤立存在的。它们需要与客户和其他应用程序进行通信。应用程序集成就是实现这种通信。集成使应用程序能够共享服务和数据,而且,同样重要的是,集成可以帮助应用程序与用户连接。

Spring Integration 提供了一个构建集成解决方案的框架,以促进这类解决方案。Spring Integration 解决方案描述了数据通过管道的流动。数据以消息的形式进入处理管道。消息在命名管道(称为通道)中向前传递,这些管道将数据路由到不同的组件(称为端点)。您可以根据需要将任意数量的端点和通道连接在一起。

这个模型在简单性上与 Unix 命令行非常相似。以以下 Unix 命令为例:

cat spring.txt |grep spring |cut -f1 -d, | sort | uniq > spring_cleaning.txt

在此命令中,来自外部资源(名为 spring.txt 的文件)的数据被传递到 grep 命令行。grep 筛选结果并查找所有与单词“spring”匹配的行,并将它们作为输出发送。grep 命令的输出被发送到 cut 命令的输入,该命令将解析结果,按逗号(“,”)分割每一行,并且只保留数据的第一个字段(第一个“,”之后的所有内容都将被丢弃)。这个中继一直持续到最后,最终输出被写回到另一个名为 spring_cleaning.txt 的外部资源。在此命令中,我们使用 Unix 的“管道”(“|”)将一个命令的输出连接到另一个命令的输入。我们通过组合专门的、单一关注的命令来构建一个管道,从而读取、清理、过滤、转换和写入数据。这种方法称为管道和过滤器模型,Spring Integration 让您可以使用这个简单模型来思考更大规模的问题。在 Spring Integration 中,通道就是管道,端点就是过滤器。

Spring Integration 开箱即用

Spring Integration 支持许多不同的用例。以下是一些常见的用例:
  • 转换数据:转换器端点可能会将载荷的类型从一个类更改为另一个类,或者删除、添加或更改消息头。
  • 路由数据:路由器端点提供自定义路由逻辑——也许来自输入通道的数据应该被传递到多个输出通道?
  • 过滤数据:也许输入数据不适合继续处理,应该被剔除。您可以使用过滤器端点有条件地阻止数据继续进行。
  • 适配不同系统到 Spring Integration:适配器提供了数据进入和退出 Spring Integration 解决方案的能力。
  • 拆分数据:当载荷太大,或者意图将其分区为更小的数据块时。一个例子可能是将单个文件拆分成行。
  • 聚合数据:这是拆分功能的逆过程:聚合器等待消息逐一到达,并将它们收集起来,直到满足某个条件。然后,它将所有聚合的消息作为一个消息发送。
这些是 Spring Integration 开发者工具箱中的工具。Spring Integration 的核心只是一个带有通用抽象和接口的 Maven 依赖项。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

Spring Integration 提供了许多现成的模块,每个模块都针对特定类型的需求提供支持(例如:XML 处理或 Web 服务集成)。例如,文件命名空间提供了入站和出站适配器,分别用于从文件系统读取文件和写入文件到文件系统。file:inbound-channel-adapter 适配文件系统——它监控您指定的目录,每当出现一个新文件时,就会为新文件生成载荷类型为 java.io.File 的消息。一个常见的用例是将文件从 java.io.File 转换为文件内容的表示——无论是 String 还是 byte 数组 (byte[])。由于这非常常见,文件命名空间提供了 file:file-to-string-transformer 端点以及 file:file-to-bytes-transformer。最后,一个常见的场景是人们希望将 Stringbyte 数组数据写入文件系统。为此,file 命名空间提供了 file:outbound-channel-adapter 适配器。这些都只是基本端点(例如,适配器和转换器)的面向文件 IO 的实现。

Spring Integration 提供了许多预打包的模块,支持在这些非常相同的端点类型方面的广泛功能。Spring Integration 2.0 附带的一些模块包括文件 IO、JDBC、RSS/ATOM、FTP/FTPS、SFTP、TCP/IP、RMI、HTTP、JMX、电子邮件、IO 流、Twitter、XMPP、Web 服务、HTTP/REST、XML 和 JMS。虽然这个列表很长,但并非详尽无遗。还有更多支持正在到来,或作为其他 Spring 产品组合项目的一部分提供,包括 Gemfire 支持、AMQP 支持、基于 Activiti BPMN 2 业务流程管理项目所做工作的工作流系统集成、基于 Ajax/Comet 的支持、基于 Flex BlazeDS 消息传递的支持等等。

一个简单的例子

让我们来看一个简单的例子——一个假设的在线零售商(“电子零售商”!)处理网站上的交易,并委托几家第三方公司来帮助履行订单。特别地,我们将关注从网站上的购买点到运输公司的数据流。集成由服务发送的 JMS 消息触发。运输公司不隶属于电子零售商,并期望所有订单都通过其 Web 服务端点提交,因此必须使用触发 JMS 消息来调用 Web 服务端点。最后,所有提交给第三方(在本例中是运输公司)的作业都需要进行审计,因此信息应记录在我们数据库中面向报告的审计表中。
Spring Integration Example Process 示例集成解决方案的图表——该图表由 SpringSource Tool Suite 生成

处理 JMS 消息:第一步是接收来自 JMS 代理的消息。我们将使用 Spring Integration 入站适配器——一个接收 JMS 代理消息的端点——以简单、声明式的方式将 JMS 代理连接到我们的应用程序代码。

Spring Integration 允许您处理包裹在 org.springframework.integration.core.Message 中的单个数据块。数据不会凭空出现,它必须来自某个地方。要将数据引入和导出 Spring Integration,您分别使用入站和出站适配器。适配器是一种特殊类型的端点,它告诉您外部系统中发生了什么有趣的事情,并告诉您在外部系统中发生了什么。由于适配器只接收数据或发送数据,它们是单向的。


    <int-jms:message-driven-channel-adapter
      channel="partnerNotifications"
      connection-factory="connectionFactory"
      transaction-manager="jmsTransactionManager"
      acknowledge="transacted"
       destination-name="${jms.partnernotifications.destination}"
    />

    <int:channel id="partnerNotifications"/>

我们不会详细介绍 connectionFactoryjmsTransactionManager bean 的具体细节,因为它们是标准的 Spring JMS 相关 bean,可以在源代码中查看。每当 JMS 代理(在 JMS destination 中)上出现消息时,Spring Integration 就会在名为 partnerNotifications 的通道上发布一个 org.springframework.integration.Message,供其消费。

调用合作伙伴 Web 服务:消息载荷是 XML 文档(以 String 形式)。稍后我们将介绍它如何变成这样,但可以肯定的是,它已经是一种电子零售商的合作伙伴可以使用的通用 XML 格式。下一步是进行 Web 服务调用到运输公司。流程如下:消息通过 JMS 适配器进入,其载荷被用作 Web 服务调用的载荷。在从 JMS 适配器接收到消息后,在将其作为 Web 服务调用发送出去之前,它会被放入 partnerNotifications 通道。

这代表了另一种类型的端点,类似于适配器,称为网关。网关是双向的,并处理请求/应答场景。出站网关向外部系统发送请求,并等待来自该外部系统的应答,该应答作为入站消息返回给您(请求者)。Web 服务调用为每个请求产生一个响应。应答数据作为 Spring Integration 消息发送到 partnerXmlShippingReplies 通道。

更新审计表:一旦收到 Web 服务调用的成功应答,我们就需要更新我们的审计表。此审计表是一个面向数据仓库的表;它包含反规范化的记录,可用于跟踪订单在履行系统中的进度,以及驱动报告和分析。来自 Web 服务的应答消息是一个简单的 XML 文档,其中包含我们需要馈送到审计表的信息。

消息有两个重要部分:它的头和它的载荷。在 Spring Integration 中,载荷可以是任何类型的对象。消息头基本上是 java.util.Map 中的一系列键/值对。头键是 String,但值可以是任何类型。头通常携带关于载荷的元数据,端点在处理载荷时可以依赖这些元数据。

在我们的示例中,我们需要提取相关的 XML 属性,并将它们作为消息头提供,以便馈送到 JDBC 查询以更新表。我们将使用 Spring Integration 的 xml:xpath-header-enricher 来评估 XPath 表达式相对于 Web 服务的响应,并将解析后的表达式作为值提取到消息头中。在下面的示例中,我们创建了三个头:customerIdpurchaseIddate


    <int-xml:xpath-header-enricher input-channel="partnerXmlShippingReplies" output-channel="partnerShippingReplies">
        <int-xml:header name="purchaseId" xpath-expression="//@purchase-id"/>
        <int-xml:header name="customerId" xpath-expression="//@customer-id"/>
        <int-xml:header name="date" xpath-expression="//@confirmation-date"/>
    </int-xml:xpath-header-enricher>

   <int:channel id="partnerShippingReplies"/>

xml:xpath-header-enricher 是一个转换器端点:消息以 Message 的形式进入,载荷为 XML 文档和头,然后以 Message 的形式离开,载荷相同,并且除了已有的头之外,还有三个新头。在此示例中,输出发布在 partnerShippingReplies 通道上。

接下来,我们将消息和新头发送到 jdbc:outbound-channel-adapter,它将用于执行 JDBC 插入。


    <int-jdbc:outbound-channel-adapter
        data-source="dataSource"
        channel="partnerShippingReplies" >
        <int-jdbc:query>
INSERT INTO purchase_fulfillment_log(
    PURCHASE_ID, CUSTOMER_ID, EVENT_DATE, EVENT)
VALUES( :headers[purchaseId], :headers[customerId],
               :headers[date], 'SHIPPED' )
        </int-jdbc:query>
    </int-jdbc:outbound-channel-adapter>

这是一个完整、可工作的集成。我们使用基于 Java 的配置来配置所有必需的 bean,但不需要任何 Java 来处理任何业务逻辑本身——所有实际的处理逻辑都存在于大约 25 行 Spring Integration 命名空间元素中,并依赖于您可能已经熟悉的那些技术。例如,Spring Integration JMS 适配器建立在核心 Spring 中对 JMS 的支持之上。出站 Web 服务网关建立在 Spring Web Services 堆栈之上(正如出站 HTTP 网关建立在 RestTemplate 之上一样)。XML 支持通常建立在 Spring OXM 支持之上。最后,出站 JDBC 适配器建立在 Spring JDBC 支持之上(例如,在示例中,我们可以提供 Spring 的 JdbcTemplate 实例而不是 javax.sql.DataSource)。

Spring Integration 中的错误处理

事务 在示例中,我们从 JMS 消费消息,通过 Web 服务发送消息,然后转换响应,并将其写入数据库。这里有很多活动部件,如果出现问题,您应该了解正在使用的各种机制来处理错误。对于 JMS 和 JDBC,常见的做法是使用事务在发生某种故障时进行回滚。在示例中使用的 jms:message-driven-channel-adapter 中,transaction-manager 引用是可选的,但当它生效时,它的作用正如您所期望的那样:同一事务中的任何异常都会导致 JMS 消息接收回滚。线程本地事务延伸到在同一线程中执行的所有 Spring Integration 组件,在本解决方案的情况下,这包括所有内容。

要进行尝试,请关闭数据库然后再次运行该解决方案——它将尝试从 JMS 代理消费消息,调用 Web 服务,转换响应,然后访问数据库,在那里会发生异常,这将导致 JMS 接收操作回滚,并最终重新排入 JMS 代理(在 ActiveMQ 中,默认情况下,消息最终会进入 ActiveMQ.DLQ 队列,即死信队列)。

在架构中构建一致性 因此,事务是处理错误条件的一种方式,但对于不与事务协同工作的资源(如 Web 服务)几乎没有帮助。一种不使用事务的方法是在实现层面解决问题。例如,Web 服务调用或数据库调用可能被设计成幂等的:具有相同输入的多次调用应产生相同的输出,如果可能的话,没有副作用。例如,使用相同的值更新数据库中的单个行是幂等的:您可以运行相同的语句 100 次,最坏的情况是您得到的结果与只运行一次更新相同(正确)的结果。处理错误的另一种方法是实现补偿逻辑。如果消息的生产者和消费者在同一个线程中,那么当出现问题时,正常的 Java 错误处理逻辑适用:抛出一个异常,它可以由发送者(根据需要)处理。但是,如果生产者和消费者在不同的线程中,那么常规规则就不适用了。在这些情况下,没有消费者可以将异常反馈回去。默认情况下,Spring Integration 会捕获异常并将其作为载荷中带有异常的消息转发到一个众所周知的、自动创建的通道,称为 errorChannel。您可以指定一个将错误转发到的备用通道,方法是在代码中指定一个键为 MessageHeaders.ERROR_CHANNEL 的消息头。由您来消费该通道中的消息并做出适当的反应。

Spring Integration 组件模型

到目前为止,我们通过依赖于满足我们目标的现成支持来构建一切。您会发现这在大多数情况下都是如此——Spring Integration 确实是一个很好的工具箱。就像一盒乐高积木一样,您可以将模块以看似无限的组合方式组合起来,以解决大多数问题。但是,这并不意味着每个问题都已解决。例如,Spring Integration 无法知道您的域模型的具体细节,也无法知道您的业务逻辑和服务的具体细节。有时您希望深入到 Java,然后直接插入正确的行为。

为了支持自定义逻辑,Spring Integration 支持所有核心组件(transformerssplittersroutersadaptersaggregators 等)的通用、可插拔(在核心命名空间中)实现,并期望您提供自定义 Java 逻辑来履行组件的职责。此外,Spring Integration 提供了一个 service-activator 组件,它是一个“逃生舱口”组件:您可以使用它来插入自定义 Java 处理逻辑,无论这段代码的目的是什么。您可以随意使用 service-activator——也许您只想使用 iText 将 PDF 写入文件系统,或者执行某种内在的业务计算?所有这些自定义组件都支持 POJO 实现。它们都通过调用您指定的 bean 中的方法来工作。方法的形式没有严格的规则,因为 Spring Integration 非常灵活。但通常,您应该设想组件会位于其他组件之间,因此必须同时接受入站消息作为参数,并产生出站消息作为其返回类型。消息进,消息出。

方法规范根据组件类型会略有不同:转换器可以接收一种类型的消息并生成完全不同、已转换类型的出站消息(自然!)。拆分器接收一个消息并将其返回类型产生一个消息集合。聚合器则相反:它将一个消息集合作为输入参数,并返回一个单一的聚合消息作为其输出。还有更多,细节当然可以在文档中找到。在我们的例子中,让我们看看一个 servce-activator 来演示方法映射的灵活性。要使用 service-activator,我们必须首先在 XML DSL 中声明它,如下所示:


 <service-activator
      input-channel = "in"
      ref = "helloWorldServiceActivator" method = "sayHello"
      output-channel = "out" />

这个例子很典型:输入通道产生一个消息,组件处理它,并将处理结果发送到输出通道。然而,在这里,处理逻辑由您来提供。ref 属性指定了应该使用哪个 bean 来转换入站消息。它还规定了使用哪个方法。如果 bean 只有一种方法,或者 bean 有多种方法但只有一种方法用 Spring Integration 的 @ServiceActivator 注解标记,那么就不需要像我们在这里那样在 XML 中指定一个特定方法。让我们看看这种 service-activator 的第一版实现。


@Component
public class HelloWorldServiceActivator {

 public Message<String> sayHello( Message<String> inboundMessage) {
 Map<String,Object> headers = inboundMessage.getHeaders();
 String payload = inboundMessage.getPayload();
 System.out.println( "In the sayHello method, printing out the  "+
 "payload of the inbound message: "+payload + ". Also, there are " +
 headers.size() + " headers." ) ;
 return MessageBuilder.withPayload( inboundMessage.getPayload() )
           .copyHeadersIfAbsent( inboundMessage.getHeaders() )
           .build();
 }
}

bean 很简单:它是一个简单的 POJO,并声明了一个单独的方法,该方法接收一个 Spring Integration 消息并将其输入作为输出返回。但是,如果您不想,您不必依赖 Spring Integration 的消息类型。Spring Integration 具有一些智能的启发式方法,可以在许多情况下自动为您做正确的事情。让我们稍微重构一下这段代码,使其处理载荷类型,而不是 Message 包装类


  public String sayHello( String inboundPayload) {
    //  ... same as before
  }

这段代码与之前的代码工作方式相同,只是现在它只依赖于 Message 载荷的类型。头会自动为您复制。如果您愿意,您还可以声明一个位置供 Spring Integration 存放消息头(它们是 java.util.Map<String,Object>),如下所示:


  public String sayHello( Map<String,Object> headers, String inboundPayload) {
    // ... same as before
  }

如果您想对放置位置进行更多控制,以便避免歧义,您可以使用 Spring Integration 的注解。让我们使用注解来重构上一个示例


  public String sayHello( @Headers Map<String,Object> headers, @Payload String inboundPayload) {
    // ... same as before
  }

这里还有一个特别有用的注解——@Header——它告诉 Spring Integration 您只想将一个特定头的_值_注入到您的方法中


  public String sayHello( @Header("id") UUID idHeaderValue, @Payload String inboundPayload) {
    // ... same as before
  }

这降低了您代码的复杂性,并允许您编写具有表现力的集成代码。所有其他组件都支持这种动态映射功能,就像 @ServiceActivatorservice-activator 一样:@Transformertransformer@Splittersplitter@Aggregatoraggregator@Routerrouter 等等。

从您的服务中使用 Spring Integration

Spring Integration 解决方案不一定总是由适配器启动。您可以从 Java 代码启动它们。前面的例子是由 JMS 消息触发的——每当消费了一个 JMS 消息,处理就开始了。

让我们看一下那个交流的另一面——JMS 消息的生产者。在前面的例子中,我们查看了客户在在线电子零售商处结账时发送的消息的处理。在本节中,我们将使用 Spring Integration API 来查看如何从购物车中生成该消息,将其转换为 XML,然后将其发送到 JMS 代理,以便前面的示例在那里等待处理它。


  @Autowired @Qualifier("partnerNotifications")
  private MessageChannel messageChannel ;

  @Override
  @Transactional
  public void checkout(long purchaseId) {
    Purchase purchase = getPurchaseById(purchaseId);

    if (purchase.isFrozen())
      throw new RuntimeException("you can't check out Purchase(#" + purchase.getId() + ") that's already been checked out!");

    Date purchasedDate = new Date();
    Set<LineItem> lis = purchase.getLineItems();
    for (LineItem lineItem : lis) {
      lineItem.setPurchasedDate(purchasedDate);
      entityManager.merge(lineItem);
    }
    purchase.setFrozen(true);
    entityManager.merge(purchase);

    Message<Purchase> msg = MessageBuilder.withPayload(purchase).build();
    this.messageChannel.send(msg);
  }

大部分代码都是典型的服务层代码——唯一让我们感兴趣的部分是 MessageChannel 的使用。MessageChannel 是我们之前在 XML 中配置的各种 Spring Integration 通道类型的运行时基类型。一旦您拥有了 MessageChannel 的引用,就只需要通过它发送消息。您可以像使用低级 JMS 和 AMQP API 发送和接收消息一样,使用 MessageChannel 并直接与之交互来发送和接收消息。

Spring Integration 中的 Message 对象是不可变的——您不直接创建 Message 对象。而是使用 MessageBuilder 类及其静态方法来构建消息。MessageBuilder 类包含基于现有载荷和现有 Headers map 的新消息的工厂方法。API 是流畅的——方法调用可以链接在一起。在此示例中,我们使用 MessageBuilder 类基于 Purchase 对象(一个本地于购物车域模型的 JPA 实体)构建消息。

我们通过使用 Spring Integration 来获得一些间接性:我们可以自由地在稍后更改 Spring Integration 中的消息流向。服务代码无需更改,因为它只与 Spring Integration 进行接口。

JMS 消费者(我们在第一个例子中建立的那个)速度较慢,因为它需要进行昂贵的 Web 服务调用。通过将通知发送到 JMS,然后让其他进程外的集成处理 Web 服务调用,我们可以获得两样东西:结账服务速度更快,并且慢速处理可以独立于服务进行扩展。例如,结账服务可能部署在 Web 应用程序中,每个机器一个实例。另一方面,调用 Web 服务的通知处理只是一个简单的 main(String[]) 类,并且可以在同一台机器上多次运行而没有问题,以满足负载需求。

总结

我们已经探索了集成世界的广阔天地,并学会了如何使用 Spring Integration 以一种干净、灵活的方式将不同的系统连接起来。我们已经探讨了 Spring Integration 如何受益于其在核心 Spring 框架之上的位置——它如何成为希望解决集成问题的 Spring 开发人员的自然下一步。在这篇文章中,我们还探讨了 Spring Integration 友好的、基于 Spring XML DSL 的以及基于 POJO 的编程模型。用户可以在我们的 Git 社区示例项目 中的 Getting Started with Spring Integration 文件夹 下找到此以及所有其他“Green Beans”帖子的源代码。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有