先行一步
VMware 提供培训和认证,助您快速进步。
了解更多应用并非孤立存在。它们需要与其客户及其他应用通信。应用集成就是为了实现这种通信。集成让应用能够相互共享服务和数据,而且,集成也常常帮助应用连接其用户。
Spring Integration 提供了一个构建集成解决方案的框架,以促进此类解决方案的实现。Spring Integration 解决方案描述了数据在管道中的流动方式。数据作为消息进入处理管道。消息通过命名管道(称为通道)向前移动,将数据路由到不同的组件(称为端点)。您可以将任意数量的端点和通道串联起来。
这个模型在简单性上与 Unix 命令行非常相似。例如,以下 Unix 命令行咒语:
cat spring.txt |grep spring |cut -f1 -d, | sort | uniq > spring_cleaning.txt
在这个命令中,来自外部资源(名为 spring.txt
的文件)的数据被传递给 grep
命令行。grep
筛选结果并找到所有匹配单词“spring”的行,并将其作为输出发送。grep
命令的输出作为 cut
命令的输入发送,后者将解析结果并按逗号 (",") 分割每一行,仅保留第一列数据(第一个逗号后的所有内容都被丢弃)。这种接力持续到最后,最终输出被写回另一个名为 spring_cleaning.txt
的外部资源。在这个命令中,我们使用 Unix 的“管道”("|")将一个命令的输出连接到另一个命令的输入。我们通过将专门的、单一功能的命令组合成一个管道来读取、清理、过滤、转换和写入数据。这种方法被称为“管道与过滤器”模型,而 Spring Integration 允许您使用这个同样的简单模型来思考更大的问题。在 Spring Integration 中,通道是管道,端点是过滤器。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
Spring Integration 提供了许多开箱即用的模块,每个模块都为特定类型的需求提供支持(例如:XML 处理或 Web 服务集成)。例如,文件命名空间提供了入站和出站适配器,分别用于从文件系统读取文件和向文件系统写入文件。file:inbound-channel-adapter
适配文件系统——它监控您指定的目录,一旦出现新文件,就会生成一个有效载荷类型为 java.io.File
的消息,包含新文件的有效载荷。一个常见的用例是将文件从 java.io.File
转换为文件内容的表示形式——可以是 String
或字节数组 (byte[]
)。由于这非常常见,文件命名空间提供了 file:file-to-string-transformer
端点以及 file:file-to-bytes-transformer
。最后,一个常见的场景是人们希望将 String
或字节数组数据写入文件系统。为此,file
命名空间提供了 file:outbound-channel-adapter
适配器。这些都仅仅是基于文件 IO 的基本端点实现,例如适配器和转换器。
Spring Integration 附带了许多预打包的模块,这些模块在这些相同的端点类型方面支持广泛的功能。Spring Integration 2.0 中包含的一些模块包括文件 IO、JDBC、RSS/ATOM、FTP/FTPS、SFTP、TCP/IP、RMI、HTTP、JMX、电子邮件、IO 流、Twitter、XMPP、Web 服务、HTTP/REST、XML 和 JMS。虽然这个列表很广泛,但并不完整。还有更多支持正在开发中,或者作为其他 Spring 组合项目的一部分提供,包括 Gemfire 支持、AMQP 支持、基于 Activiti BPMN 2 业务流程管理项目的工作流程系统集成、基于 Ajax/Comet 的支持、Flex BlazeDS 基于消息的支持等等。
处理 JMS 消息:第一步是接收来自 JMS 代理的消息。我们将使用 Spring Integration 入站适配器——一个接收来自 JMS 代理消息的端点——以简单、声明式的方式将 JMS 代理连接到我们的应用代码。
Spring Integration 允许您处理封装在 org.springframework.integration.core.Message
中的单独数据块。数据不会凭空出现,它必须来自某个地方。要将数据进出 Spring Integration,您分别使用入站和出站适配器。适配器是一种特殊类型的端点,它会告诉您在外部系统中发生了什么有趣的事情,并告诉您在外部系统中发生了什么。由于适配器只接收数据或发送数据,它们是单向的。
<int-jms:message-driven-channel-adapter
channel="partnerNotifications"
connection-factory="connectionFactory"
transaction-manager="jmsTransactionManager"
acknowledge="transacted"
destination-name="${jms.partnernotifications.destination}"
/>
<int:channel id="partnerNotifications"/>
我们不会详细介绍 connectionFactory
和 jmsTransactionManager
bean 的具体细节,因为它们是标准的 Spring JMS 相关 bean,可以在源代码中查看。无论何时消息出现在 JMS 代理上(在 JMS destination
中),Spring Integration 都会在名为 partnerNotifications
的通道上发布一个 org.springframework.integration.Message
,供其消费。
调用合作伙伴 Web 服务:消息有效载荷是一个字符串形式的 XML 文档。我们稍后会介绍它是如何形成的,但现在只需知道它已经采用了电子零售商合作伙伴可以使用的常见 XML 格式。下一步是向航运公司发起 Web 服务调用。流程如下:消息从 JMS 适配器进入,其有效载荷用作 Web 服务调用的有效载荷。它在从 JMS 适配器接收后,但在作为 Web 服务调用发送之前,被放入 partnerNotifications
通道中排队。
这代表了另一种类型的端点,类似于适配器,称为网关。网关是双向的,处理请求/回复场景。出站网关向外部系统发送请求,并等待外部系统的回复,该回复作为入站消息发送给您(请求者)。Web 服务调用对每个请求产生一个响应。回复数据作为 Spring Integration 消息发送到 partnerXmlShippingReplies
通道。
更新审计表:一旦我们收到来自 Web 服务调用的成功回复,我们需要更新我们的审计表。这个审计表是一个面向数据仓库的表;它包含非范式化的记录,可用于跟踪订单在履行系统中的进度,并用于驱动报告和分析。来自 Web 服务的回复消息是一个简单的 XML 文档,其中包含我们需要输入审计表的信息。
消息有两个重要部分:它的头和它的有效载荷。在 Spring Integration 中,有效载荷可以是任何类型的对象。消息头基本上是 java.util.Map
中的一系列键/值对。头的键是 String
,但值可以是任何类型。头通常携带关于有效载荷的元数据,端点在处理有效载荷时可以依赖这些元数据。
在我们的示例中,我们需要提取关键的 XML 属性,并将其作为消息头提供,以便输入到 JDBC 查询中更新表。我们将使用 Spring Integration 的 xml:xpath-header-enricher
来评估针对 Web 服务响应的 XPath 表达式,并将解析的表达式提取为消息头的值。在下面的示例中,我们创建了三个头:customerId
、purchaseId
和 date
。
<int-xml:xpath-header-enricher input-channel="partnerXmlShippingReplies" output-channel="partnerShippingReplies">
<int-xml:header name="purchaseId" xpath-expression="//@purchase-id"/>
<int-xml:header name="customerId" xpath-expression="//@customer-id"/>
<int-xml:header name="date" xpath-expression="//@confirmation-date"/>
</int-xml:xpath-header-enricher>
<int:channel id="partnerShippingReplies"/>
xml:xpath-header-enricher
是一个转换器端点:消息作为包含 XML 文档有效载荷和头的 Message
进入,然后作为包含相同 XML 文档有效载荷和三个新头(以及已有的头)的 Message
离开。在此示例中,输出被发布到 partnerShippingReplies
通道。
接下来,我们将消息和新头发送到 jdbc:outbound-channel-adapter
,在那里它将被用于执行 JDBC 插入操作。
<int-jdbc:outbound-channel-adapter
data-source="dataSource"
channel="partnerShippingReplies" >
<int-jdbc:query>
INSERT INTO purchase_fulfillment_log(
PURCHASE_ID, CUSTOMER_ID, EVENT_DATE, EVENT)
VALUES( :headers[purchaseId], :headers[customerId],
:headers[date], 'SHIPPED' )
</int-jdbc:query>
</int-jdbc:outbound-channel-adapter>
这是一个完整的、可工作的集成。我们使用基于 Java 的配置来配置所有必要的 bean,但无需 Java 来处理任何业务逻辑本身——所有实际的处理逻辑都存在于大约 25 行的 Spring Integration 命名空间元素中,并依赖于您很可能已经熟悉的技术。例如,Spring Integration JMS 适配器建立在核心 Spring 中可用的 JMS 支持之上。出站 Web 服务网关建立在 Spring Web Services 栈之上(就像出站 HTTP 网关建立在 RestTemplate
之上一样)。XML 支持通常建立在 Spring OXM 支持之上。最后,出站 JDBC 适配器建立在 Spring JDBC 支持之上(例如,在示例中,我们可以提供 Spring 的 JdbcTemplate 实例而不是 javax.sql.DataSource
)。
jms:message-driven-channel-adapter
中,transaction-manager
引用是可选的,但在使用时会像您期望的那样工作:在与事务相同的线程中发生的任何异常都会导致 JMS 消息接收回滚。线程本地事务扩展到在同一线程中执行的所有 Spring Integration 组件,在本解决方案中,这涵盖了所有内容。要尝试它,关闭数据库,然后再次运行解决方案——它将尝试从 JMS 代理消费消息,调用 Web 服务,转换回复,然后访问数据库,在那里它会遇到异常,这将导致 JMS 接收操作回滚并最终在 JMS 代理中重新排队(在 ActiveMQ 中,默认情况下,消息最终会进入 ActiveMQ.DLQ
队列,即死信队列)。
在架构中构建一致性 因此,事务是处理错误条件的一种方式,但它们对不使用事务的资源(如 Web 服务)帮助不大。一种无需事务的方式是在实现层面解决问题。例如,Web 服务调用或数据库调用可以设计成幂等的:如果可能,具有相同输入的多次调用应该产生相同的输出,且没有副作用。例如,使用相同的值更新数据库中的单行是幂等的:您可以运行相同的语句 100 次,最坏的情况是您会得到与运行一次更新相同(正确)的结果。处理错误的另一种方式是实现补偿逻辑。如果消息的生产者和消费者在同一个线程中,那么如果出现问题,正常的 Java 错误处理逻辑适用:会抛出异常,并且发送方可以(适当地)处理它。然而,如果生产者和消费者在不同的线程中,那么正常的规则就不适用。在这些情况下,没有消费者可以将异常反馈给发送方。默认情况下,Spring Integration 会捕获异常,并将其作为包含异常作为有效载荷的消息转发到一个已知、自动创建的通道,称为 errorChannel
。您可以在代码中通过指定一个键为 MessageHeaders.ERROR_CHANNEL
的消息头来指定应将错误转发到的备用通道。这取决于您从该通道消费消息并适当地对其作出反应。
为了支持自定义逻辑,Spring Integration 支持所有核心组件(transformers
、splitters
、routers
、adapters
、aggregators
等)的通用可插拔(在核心命名空间中)实现,并期望您提供自定义的 Java 逻辑来完成组件的任务。此外,Spring Integration 提供了一个 service-activator
组件,这是一个“逃生舱”组件:您可以使用它来插入自定义的 Java 处理逻辑,无论此代码的目的是什么。您可以将 service-activator
用于任何目的——也许您只是想使用 iText 将 PDF 写入文件系统,或者执行某种内在的业务计算?所有这些自定义组件都允许 POJO 实现。它们都通过调用您指定的 bean 上的方法来工作。对于方法的形状和形式没有严格的规定,因为 Spring Integration 非常灵活。然而,通常情况下,您应该想象组件将位于其他组件之间,因此必须既接受入站消息作为参数,又生成出站消息作为其返回类型。消息进,消息出。
方法规范通常会根据组件类型略有不同:转换器可能接收一种类型的消息,并生成一种完全不同的、经过转换类型的出站消息(这是很自然的!)。分割器接收单个消息,并生成消息集合作为其返回类型。聚合器反向工作:它接收消息集合作为输入参数,并返回单个聚合消息作为其输出。这个列表还在继续,当然详细信息可以在文档中找到。对于我们的示例,让我们看一下 service-activator
来演示方法映射的灵活性。要使用 service-activator
,我们必须首先在 XML DSL 中声明它,如下所示:
<service-activator
input-channel = "in"
ref = "helloWorldServiceActivator" method = "sayHello"
output-channel = "out" />
这个例子是典型的:输入通道产生一个消息,组件处理它,并将处理结果通过输出通道发送出去。然而,这里的处理逻辑需要您自己提供。ref
属性指定应该使用哪个 bean 来转换入站消息。它还规定了要使用哪个方法。如果该 bean 只有一个方法,或者如果该 bean 有多个方法但只有一个方法带有 Spring Integration 的 @ServiceActivator
注解,那么就没有必要像这里这样在 XML 中规定一个特定的方法。让我们看看这种 service-activator
的初步实现。
@Component
public class HelloWorldServiceActivator {
public Message<String> sayHello( Message<String> inboundMessage) {
Map<String,Object> headers = inboundMessage.getHeaders();
String payload = inboundMessage.getPayload();
System.out.println( "In the sayHello method, printing out the "+
"payload of the inbound message: "+payload + ". Also, there are " +
headers.size() + " headers." ) ;
return MessageBuilder.withPayload( inboundMessage.getPayload() )
.copyHeadersIfAbsent( inboundMessage.getHeaders() )
.build();
}
}
这个 bean 很简单:它是一个简单的 POJO,声明了一个方法,该方法接收一个 Spring Integration 消息并将其输入作为输出返回。然而,如果您不想依赖 Spring Integration 的消息类型,也没有问题。Spring Integration 具有一些智能启发式方法,并且在许多情况下可以自动为您做正确的事情。让我们稍微修改一下该方法的代码,使其处理有效载荷,而不是 Message 包装类:
public String sayHello( String inboundPayload) {
// ... same as before
}
这段代码与之前的工作方式相同,只是现在它仅依赖于 Message
有效载荷的类型。消息头会自动为您复制。如果您愿意,您也可以声明一个位置让 Spring Integration 放置消息头(消息头存储在 java.util.Map<String,Object>
中),就像这样:
public String sayHello( Map<String,Object> headers, String inboundPayload) {
// ... same as before
}
如果您想对放置位置进行更多控制,也许是为了避免歧义,可以使用 Spring Integration 的注解。让我们使用注解来重写上一个示例:
public String sayHello( @Headers Map<String,Object> headers, @Payload String inboundPayload) {
// ... same as before
}
这里还有一个特别有用的注解——@Header
——它告诉 Spring Integration 您只想将某个特定头的值注入到您的方法中:
public String sayHello( @Header("id") UUID idHeaderValue, @Payload String inboundPayload) {
// ... same as before
}
这降低了代码的复杂性,并允许您编写富有表现力的集成代码。所有其他组件都支持这种动态映射能力,就像 @ServiceActivator
和 service-activator
一样:@Transformer
和 transformer
、@Splitter
和 splitter
、@Aggregator
和 aggregator
、@Router
和 router
等等。
让我们看看该交换的另一端——JMS 消息的生产者。在前面的示例中,我们查看了处理客户在在线电子零售商处结账时发送的消息。使用 Spring Integration API 在本节中,我们将看看如何从购物车中生成该消息,将其转换为 XML,然后将其发送到 JMS 代理,前面的示例正在那里等待处理。
@Autowired @Qualifier("partnerNotifications")
private MessageChannel messageChannel ;
@Override
@Transactional
public void checkout(long purchaseId) {
Purchase purchase = getPurchaseById(purchaseId);
if (purchase.isFrozen())
throw new RuntimeException("you can't check out Purchase(#" + purchase.getId() + ") that's already been checked out!");
Date purchasedDate = new Date();
Set<LineItem> lis = purchase.getLineItems();
for (LineItem lineItem : lis) {
lineItem.setPurchasedDate(purchasedDate);
entityManager.merge(lineItem);
}
purchase.setFrozen(true);
entityManager.merge(purchase);
Message<Purchase> msg = MessageBuilder.withPayload(purchase).build();
this.messageChannel.send(msg);
}
大部分代码是典型的服务层代码——我们感兴趣的唯一部分是 MessageChannel
的使用。MessageChannel
是我们在 XML 中配置的各种 Spring Integration 通道类型的运行时基类型。一旦您获得了对 MessageChannel
的引用,只需通过它发送消息即可。您可以像使用低级别的 JMS 和 AMQP API 发送和接收消息一样,直接使用 MessageChannel
与之交互来发送和接收消息。
Spring Integration 中的 Message
对象是不可变的——您不会直接创建 Message
对象。相反,请使用 MessageBuilder
类及其静态方法来构建 Message。MessageBuilder
类包含基于现有有效载荷和基于现有 Headers map 创建新 Messages 的方法。API 非常流畅——方法调用可以链式进行。在此示例中,我们使用 MessageBuilder
类根据 Purchase
对象(购物车领域模型中本地的 JPA 实体)构建一个 Message。
通过使用 Spring Integration,我们给自己留了一些间接性:我们可以稍后在 Spring Integration 中自由地改变消息的流向。服务代码无需更改,因为它只与 Spring Integration 接口。
JMS 消费者(我们在第一个示例中建立的那个)速度较慢,因为它需要进行代价高昂的 Web 服务调用。通过将通知发送到 JMS,然后让另一个进程外的集成处理 Web 服务调用,我们获得了两样东西:结账服务更快,而且慢速处理可以独立于服务进行扩展。例如,结账服务可以在 Web 应用程序中部署,每台机器一个。另一方面,调用 Web 服务的通知处理只是一个简单的 main(String[])
类,可以为了满足负载而在同一台机器上运行多次而不会出现问题。