绿豆:Spring Integration 入门

工程 | Josh Long | 2011 年 2 月 24 日 | ...

应用并非孤立存在。它们需要与其客户及其他应用通信。应用集成就是为了实现这种通信。集成让应用能够相互共享服务和数据,而且,集成也常常帮助应用连接其用户。

Spring Integration 提供了一个构建集成解决方案的框架,以促进此类解决方案的实现。Spring Integration 解决方案描述了数据在管道中的流动方式。数据作为消息进入处理管道。消息通过命名管道(称为通道)向前移动,将数据路由到不同的组件(称为端点)。您可以将任意数量的端点和通道串联起来。

这个模型在简单性上与 Unix 命令行非常相似。例如,以下 Unix 命令行咒语:

cat spring.txt |grep spring |cut -f1 -d, | sort | uniq > spring_cleaning.txt

在这个命令中,来自外部资源(名为 spring.txt 的文件)的数据被传递给 grep 命令行。grep 筛选结果并找到所有匹配单词“spring”的行,并将其作为输出发送。grep 命令的输出作为 cut 命令的输入发送,后者将解析结果并按逗号 (",") 分割每一行,仅保留第一列数据(第一个逗号后的所有内容都被丢弃)。这种接力持续到最后,最终输出被写回另一个名为 spring_cleaning.txt 的外部资源。在这个命令中,我们使用 Unix 的“管道”("|")将一个命令的输出连接到另一个命令的输入。我们通过将专门的、单一功能的命令组合成一个管道来读取、清理、过滤、转换和写入数据。这种方法被称为“管道与过滤器”模型,而 Spring Integration 允许您使用这个同样的简单模型来思考更大的问题。在 Spring Integration 中,通道是管道,端点是过滤器。

Spring Integration 自带“电池”(功能完备)

Spring Integration 支持许多不同的用例。以下是一些常见的用例:
  • 转换数据:转换器端点可以改变有效载荷的类型,从一个类变为另一个类,或删除、添加或更改消息头。
  • 路由数据:路由器端点提供自定义路由逻辑——也许来自输入通道的数据应该被发送到多个输出通道?
  • 过滤数据:也许输入数据不适合继续处理,应该被剔除。您可以使用过滤器端点来有条件地阻止数据继续处理。
  • 将不同的系统适配到 Spring Integration:适配器提供了数据进入和离开 Spring Integration 解决方案的能力。
  • 分割数据:当有效载荷太大,或需要将其分割成更小的数据块时。例如,一个文件可能需要按行分割。
  • 聚合数据:这是分割功能的逆操作:聚合器等待消息一个接一个地到达,并收集它们直到满足某个条件。然后,它将所有聚合的消息作为一个消息发送出去。
这些是 Spring Integration 开发人员工具箱中的工具。Spring Integration 的核心就是一个 Maven 依赖项,其中包含了常见的抽象和接口。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>2.0.3.RELEASE</version>
</dependency>

Spring Integration 提供了许多开箱即用的模块,每个模块都为特定类型的需求提供支持(例如:XML 处理或 Web 服务集成)。例如,文件命名空间提供了入站和出站适配器,分别用于从文件系统读取文件和向文件系统写入文件。file:inbound-channel-adapter 适配文件系统——它监控您指定的目录,一旦出现新文件,就会生成一个有效载荷类型为 java.io.File 的消息,包含新文件的有效载荷。一个常见的用例是将文件从 java.io.File 转换为文件内容的表示形式——可以是 String 或字节数组 (byte[])。由于这非常常见,文件命名空间提供了 file:file-to-string-transformer 端点以及 file:file-to-bytes-transformer。最后,一个常见的场景是人们希望将 String 或字节数组数据写入文件系统。为此,file 命名空间提供了 file:outbound-channel-adapter 适配器。这些都仅仅是基于文件 IO 的基本端点实现,例如适配器和转换器。

Spring Integration 附带了许多预打包的模块,这些模块在这些相同的端点类型方面支持广泛的功能。Spring Integration 2.0 中包含的一些模块包括文件 IO、JDBC、RSS/ATOM、FTP/FTPS、SFTP、TCP/IP、RMI、HTTP、JMX、电子邮件、IO 流、Twitter、XMPP、Web 服务、HTTP/REST、XML 和 JMS。虽然这个列表很广泛,但并不完整。还有更多支持正在开发中,或者作为其他 Spring 组合项目的一部分提供,包括 Gemfire 支持、AMQP 支持、基于 Activiti BPMN 2 业务流程管理项目的工作流程系统集成、基于 Ajax/Comet 的支持、Flex BlazeDS 基于消息的支持等等。

一个简单示例

让我们来看一个简单的示例——一个假设的在线零售商(“e-tailer”!)处理网站上的交易,并与几家第三方公司合作完成订单。特别是,我们将关注数据从网站购买点到航运公司的流动。这个集成由服务发送的 JMS 消息触发。航运公司与该在线零售商无关,并期望所有订单都通过其 Web 服务端点提交,因此必须使用触发的 JMS 消息来调用 Web 服务端点。最后,所有提交给第三方的作业(本例中是航运公司)都需要审计,因此信息应记录在数据库中用于报告的审计表中。
Spring Integration Example Process 示例集成解决方案的图示 - 此图由 SpringSource Tool Suite 生成

处理 JMS 消息:第一步是接收来自 JMS 代理的消息。我们将使用 Spring Integration 入站适配器——一个接收来自 JMS 代理消息的端点——以简单、声明式的方式将 JMS 代理连接到我们的应用代码。

Spring Integration 允许您处理封装在 org.springframework.integration.core.Message 中的单独数据块。数据不会凭空出现,它必须来自某个地方。要将数据进出 Spring Integration,您分别使用入站和出站适配器。适配器是一种特殊类型的端点,它会告诉您在外部系统中发生了什么有趣的事情,并告诉您在外部系统中发生了什么。由于适配器只接收数据或发送数据,它们是单向的


    <int-jms:message-driven-channel-adapter
      channel="partnerNotifications"
      connection-factory="connectionFactory"
      transaction-manager="jmsTransactionManager"
      acknowledge="transacted"
       destination-name="${jms.partnernotifications.destination}"
    />

    <int:channel id="partnerNotifications"/>

我们不会详细介绍 connectionFactoryjmsTransactionManager bean 的具体细节,因为它们是标准的 Spring JMS 相关 bean,可以在源代码中查看。无论何时消息出现在 JMS 代理上(在 JMS destination 中),Spring Integration 都会在名为 partnerNotifications 的通道上发布一个 org.springframework.integration.Message,供其消费。

调用合作伙伴 Web 服务:消息有效载荷是一个字符串形式的 XML 文档。我们稍后会介绍它是如何形成的,但现在只需知道它已经采用了电子零售商合作伙伴可以使用的常见 XML 格式。下一步是向航运公司发起 Web 服务调用。流程如下:消息从 JMS 适配器进入,其有效载荷用作 Web 服务调用的有效载荷。它在从 JMS 适配器接收后,但在作为 Web 服务调用发送之前,被放入 partnerNotifications 通道中排队。

这代表了另一种类型的端点,类似于适配器,称为网关。网关是双向的,处理请求/回复场景。出站网关向外部系统发送请求,并等待外部系统的回复,该回复作为入站消息发送给您(请求者)。Web 服务调用对每个请求产生一个响应。回复数据作为 Spring Integration 消息发送到 partnerXmlShippingReplies 通道。

更新审计表:一旦我们收到来自 Web 服务调用的成功回复,我们需要更新我们的审计表。这个审计表是一个面向数据仓库的表;它包含非范式化的记录,可用于跟踪订单在履行系统中的进度,并用于驱动报告和分析。来自 Web 服务的回复消息是一个简单的 XML 文档,其中包含我们需要输入审计表的信息。

消息有两个重要部分:它的头和它的有效载荷。在 Spring Integration 中,有效载荷可以是任何类型的对象。消息头基本上是 java.util.Map 中的一系列键/值对。头的键是 String,但值可以是任何类型。头通常携带关于有效载荷的元数据,端点在处理有效载荷时可以依赖这些元数据。

在我们的示例中,我们需要提取关键的 XML 属性,并将其作为消息头提供,以便输入到 JDBC 查询中更新表。我们将使用 Spring Integration 的 xml:xpath-header-enricher 来评估针对 Web 服务响应的 XPath 表达式,并将解析的表达式提取为消息头的值。在下面的示例中,我们创建了三个头:customerIdpurchaseIddate


    <int-xml:xpath-header-enricher input-channel="partnerXmlShippingReplies" output-channel="partnerShippingReplies">
        <int-xml:header name="purchaseId" xpath-expression="//@purchase-id"/>
        <int-xml:header name="customerId" xpath-expression="//@customer-id"/>
        <int-xml:header name="date" xpath-expression="//@confirmation-date"/>
    </int-xml:xpath-header-enricher>

   <int:channel id="partnerShippingReplies"/>

xml:xpath-header-enricher 是一个转换器端点:消息作为包含 XML 文档有效载荷和头的 Message 进入,然后作为包含相同 XML 文档有效载荷和三个新头(以及已有的头)的 Message 离开。在此示例中,输出被发布到 partnerShippingReplies 通道。

接下来,我们将消息和新头发送到 jdbc:outbound-channel-adapter,在那里它将被用于执行 JDBC 插入操作。


    <int-jdbc:outbound-channel-adapter
        data-source="dataSource"
        channel="partnerShippingReplies" >
        <int-jdbc:query>
INSERT INTO purchase_fulfillment_log(
    PURCHASE_ID, CUSTOMER_ID, EVENT_DATE, EVENT)
VALUES( :headers[purchaseId], :headers[customerId],
               :headers[date], 'SHIPPED' )
        </int-jdbc:query>
    </int-jdbc:outbound-channel-adapter>

这是一个完整的、可工作的集成。我们使用基于 Java 的配置来配置所有必要的 bean,但无需 Java 来处理任何业务逻辑本身——所有实际的处理逻辑都存在于大约 25 行的 Spring Integration 命名空间元素中,并依赖于您很可能已经熟悉的技术。例如,Spring Integration JMS 适配器建立在核心 Spring 中可用的 JMS 支持之上。出站 Web 服务网关建立在 Spring Web Services 栈之上(就像出站 HTTP 网关建立在 RestTemplate 之上一样)。XML 支持通常建立在 Spring OXM 支持之上。最后,出站 JDBC 适配器建立在 Spring JDBC 支持之上(例如,在示例中,我们可以提供 Spring 的 JdbcTemplate 实例而不是 javax.sql.DataSource)。

Spring Integration 中的错误处理

事务 在示例中,我们从 JMS 消费消息,通过 Web 服务发送消息,然后转换响应并将其写入数据库。这里有很多活动部件,如果出现问题,您应该了解处理错误的各种机制。对于 JMS 和 JDBC,通常的直觉是使用事务在发生某种故障时进行回滚。在示例中使用的 jms:message-driven-channel-adapter 中,transaction-manager 引用是可选的,但在使用时会像您期望的那样工作:在与事务相同的线程中发生的任何异常都会导致 JMS 消息接收回滚。线程本地事务扩展到在同一线程中执行的所有 Spring Integration 组件,在本解决方案中,这涵盖了所有内容。

要尝试它,关闭数据库,然后再次运行解决方案——它将尝试从 JMS 代理消费消息,调用 Web 服务,转换回复,然后访问数据库,在那里它会遇到异常,这将导致 JMS 接收操作回滚并最终在 JMS 代理中重新排队(在 ActiveMQ 中,默认情况下,消息最终会进入 ActiveMQ.DLQ 队列,即死信队列)。

在架构中构建一致性 因此,事务是处理错误条件的一种方式,但它们对不使用事务的资源(如 Web 服务)帮助不大。一种无需事务的方式是在实现层面解决问题。例如,Web 服务调用或数据库调用可以设计成幂等的:如果可能,具有相同输入的多次调用应该产生相同的输出,且没有副作用。例如,使用相同的值更新数据库中的单行是幂等的:您可以运行相同的语句 100 次,最坏的情况是您会得到与运行一次更新相同(正确)的结果。处理错误的另一种方式是实现补偿逻辑。如果消息的生产者和消费者在同一个线程中,那么如果出现问题,正常的 Java 错误处理逻辑适用:会抛出异常,并且发送方可以(适当地)处理它。然而,如果生产者和消费者在不同的线程中,那么正常的规则就不适用。在这些情况下,没有消费者可以将异常反馈给发送方。默认情况下,Spring Integration 会捕获异常,并将其作为包含异常作为有效载荷的消息转发到一个已知、自动创建的通道,称为 errorChannel。您可以在代码中通过指定一个键为 MessageHeaders.ERROR_CHANNEL 的消息头来指定应将错误转发到的备用通道。这取决于您从该通道消费消息并适当地对其作出反应。

Spring Integration 组件模型

到目前为止,我们所有构建都依赖于满足我们目标的开箱即用支持。您会发现大多数时候情况都是如此——Spring Integration 确实是一个很好的工具箱。就像一盒乐高积木一样,您可以将模块组合成看似无限的组合来解决大多数问题。然而,这并不意味着所有问题都已解决。例如,Spring Integration 不可能知道您的领域模型的具体细节,它也无法知道您的业务逻辑和服务的具体细节。有时您想直接使用 Java 并插入正确的行为。

为了支持自定义逻辑,Spring Integration 支持所有核心组件(transformerssplittersroutersadaptersaggregators 等)的通用可插拔(在核心命名空间中)实现,并期望您提供自定义的 Java 逻辑来完成组件的任务。此外,Spring Integration 提供了一个 service-activator 组件,这是一个“逃生舱”组件:您可以使用它来插入自定义的 Java 处理逻辑,无论此代码的目的是什么。您可以将 service-activator 用于任何目的——也许您只是想使用 iText 将 PDF 写入文件系统,或者执行某种内在的业务计算?所有这些自定义组件都允许 POJO 实现。它们都通过调用您指定的 bean 上的方法来工作。对于方法的形状和形式没有严格的规定,因为 Spring Integration 非常灵活。然而,通常情况下,您应该想象组件将位于其他组件之间,因此必须既接受入站消息作为参数,又生成出站消息作为其返回类型。消息进,消息出。

方法规范通常会根据组件类型略有不同:转换器可能接收一种类型的消息,并生成一种完全不同的、经过转换类型的出站消息(这是很自然的!)。分割器接收单个消息,并生成消息集合作为其返回类型。聚合器反向工作:它接收消息集合作为输入参数,并返回单个聚合消息作为其输出。这个列表还在继续,当然详细信息可以在文档中找到。对于我们的示例,让我们看一下 service-activator 来演示方法映射的灵活性。要使用 service-activator,我们必须首先在 XML DSL 中声明它,如下所示:


 <service-activator
      input-channel = "in"
      ref = "helloWorldServiceActivator" method = "sayHello"
      output-channel = "out" />

这个例子是典型的:输入通道产生一个消息,组件处理它,并将处理结果通过输出通道发送出去。然而,这里的处理逻辑需要您自己提供。ref 属性指定应该使用哪个 bean 来转换入站消息。它还规定了要使用哪个方法。如果该 bean 只有一个方法,或者如果该 bean 有多个方法但只有一个方法带有 Spring Integration 的 @ServiceActivator 注解,那么就没有必要像这里这样在 XML 中规定一个特定的方法。让我们看看这种 service-activator 的初步实现。


@Component
public class HelloWorldServiceActivator {

 public Message<String> sayHello( Message<String> inboundMessage) {
 Map<String,Object> headers = inboundMessage.getHeaders();
 String payload = inboundMessage.getPayload();
 System.out.println( "In the sayHello method, printing out the  "+
 "payload of the inbound message: "+payload + ". Also, there are " +
 headers.size() + " headers." ) ;
 return MessageBuilder.withPayload( inboundMessage.getPayload() )
           .copyHeadersIfAbsent( inboundMessage.getHeaders() )
           .build();
 }
}

这个 bean 很简单:它是一个简单的 POJO,声明了一个方法,该方法接收一个 Spring Integration 消息并将其输入作为输出返回。然而,如果您不想依赖 Spring Integration 的消息类型,也没有问题。Spring Integration 具有一些智能启发式方法,并且在许多情况下可以自动为您做正确的事情。让我们稍微修改一下该方法的代码,使其处理有效载荷,而不是 Message 包装类:


  public String sayHello( String inboundPayload) {
    //  ... same as before
  }

这段代码与之前的工作方式相同,只是现在它仅依赖于 Message 有效载荷的类型。消息头会自动为您复制。如果您愿意,您也可以声明一个位置让 Spring Integration 放置消息头(消息头存储在 java.util.Map<String,Object> 中),就像这样:


  public String sayHello( Map<String,Object> headers, String inboundPayload) {
    // ... same as before
  }

如果您想对放置位置进行更多控制,也许是为了避免歧义,可以使用 Spring Integration 的注解。让我们使用注解来重写上一个示例:


  public String sayHello( @Headers Map<String,Object> headers, @Payload String inboundPayload) {
    // ... same as before
  }

这里还有一个特别有用的注解——@Header——它告诉 Spring Integration 您只想将某个特定头的值注入到您的方法中:


  public String sayHello( @Header("id") UUID idHeaderValue, @Payload String inboundPayload) {
    // ... same as before
  }

这降低了代码的复杂性,并允许您编写富有表现力的集成代码。所有其他组件都支持这种动态映射能力,就像 @ServiceActivatorservice-activator 一样:@Transformertransformer@Splittersplitter@Aggregatoraggregator@Routerrouter 等等。

从您的服务中使用 Spring Integration

Spring Integration 解决方案不一定总是由适配器启动。您可以从 Java 代码启动它们。前面的示例是由 JMS 消息启动的——处理在消耗 JMS 消息时开始。

让我们看看该交换的另一端——JMS 消息的生产者。在前面的示例中,我们查看了处理客户在在线电子零售商处结账时发送的消息。使用 Spring Integration API 在本节中,我们将看看如何从购物车中生成该消息,将其转换为 XML,然后将其发送到 JMS 代理,前面的示例正在那里等待处理。


  @Autowired @Qualifier("partnerNotifications")
  private MessageChannel messageChannel ;

  @Override
  @Transactional
  public void checkout(long purchaseId) {
    Purchase purchase = getPurchaseById(purchaseId);

    if (purchase.isFrozen())
      throw new RuntimeException("you can't check out Purchase(#" + purchase.getId() + ") that's already been checked out!");

    Date purchasedDate = new Date();
    Set<LineItem> lis = purchase.getLineItems();
    for (LineItem lineItem : lis) {
      lineItem.setPurchasedDate(purchasedDate);
      entityManager.merge(lineItem);
    }
    purchase.setFrozen(true);
    entityManager.merge(purchase);

    Message<Purchase> msg = MessageBuilder.withPayload(purchase).build();
    this.messageChannel.send(msg);
  }

大部分代码是典型的服务层代码——我们感兴趣的唯一部分是 MessageChannel 的使用。MessageChannel 是我们在 XML 中配置的各种 Spring Integration 通道类型的运行时基类型。一旦您获得了对 MessageChannel 的引用,只需通过它发送消息即可。您可以像使用低级别的 JMS 和 AMQP API 发送和接收消息一样,直接使用 MessageChannel 与之交互来发送和接收消息。

Spring Integration 中的 Message 对象是不可变的——您不会直接创建 Message 对象。相反,请使用 MessageBuilder 类及其静态方法来构建 Message。MessageBuilder 类包含基于现有有效载荷和基于现有 Headers map 创建新 Messages 的方法。API 非常流畅——方法调用可以链式进行。在此示例中,我们使用 MessageBuilder 类根据 Purchase 对象(购物车领域模型中本地的 JPA 实体)构建一个 Message。

通过使用 Spring Integration,我们给自己留了一些间接性:我们可以稍后在 Spring Integration 中自由地改变消息的流向。服务代码无需更改,因为它只与 Spring Integration 接口。

JMS 消费者(我们在第一个示例中建立的那个)速度较慢,因为它需要进行代价高昂的 Web 服务调用。通过将通知发送到 JMS,然后让另一个进程外的集成处理 Web 服务调用,我们获得了两样东西:结账服务更快,而且慢速处理可以独立于服务进行扩展。例如,结账服务可以在 Web 应用程序中部署,每台机器一个。另一方面,调用 Web 服务的通知处理只是一个简单的 main(String[]) 类,可以为了满足负载而在同一台机器上运行多次而不会出现问题。

总结

我们探索了广阔的集成世界,并学习了如何使用 Spring Integration 以一种清晰、灵活的方式将不同的系统连接起来。我们探讨了 Spring Integration 如何受益于其位于核心 Spring 框架之上的位置——对于希望解决集成问题的 Spring 开发人员来说,这是自然的下一步。在这篇帖子中,我们还探讨了 Spring Integration 友好的基于 Spring XML DSL 的编程模型,以及其基于 POJO 的编程模型。用户可以在我们基于 Git 的社区示例项目中找到本文以及所有其他“绿豆”帖子的源代码,位于Spring Integration 入门文件夹下。

获取 Spring 邮件列表

订阅 Spring 邮件列表,保持联系

订阅

先行一步

VMware 提供培训和认证,助您快速进步。

了解更多

获取支持

Tanzu Spring 在一份简单订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举办的活动

查看 Spring 社区所有即将举办的活动。

查看全部