领先一步
VMware 提供培训和认证,助您快速提升。
了解更多在本文中,我们将介绍消息传递的核心概念,以及 Spring Framework 及其姊妹项目为各种类型的消息传递提供的丰富支持。
什么是消息传递?为了最好地解释这一点,我将改述 Gregor Hohpe 和 Bobby Woolf 所著的开创性书籍《企业集成模式》(Addison Wesley,2004 年)中提供的示例。当你拨打电话时,你试图将信息传达给另一方。这只有在拨打电话时对方有空的情况下才有效。因为并非总是能够接听电话,所以我们使用语音信箱来排队消息。呼叫者在语音信箱中留言,被呼叫者随后可以稍后异步检索消息(或者确实是许多消息)。
在该示例中,语音信箱位于双方中间。它存储消息,然后在被呼叫者(即接收者)检索时将其传递。在企业消息传递的世界中,事情的运作方式非常相似:一方将消息发送到消息代理(也称为面向消息的中间件——MOM),而另一方——当该方可以时——接收或显式查询消息代理中的任何消息。
这里的类比就不再适用了。消息代理与语音信箱不同,它们有很多选项。消息代理非常适合提供额外服务,例如路由,并对消息传递提供保证。消息代理可以针对不同的用例进行优化,例如,你可以牺牲速度来换取持久性。消息代理可能会将消息持久化到外部存储以确保持久性,尽管这通常是一个可以在速度方面进行切换的配置。
在语音信箱的示例中,一条消息由一方发送,然后传递给另一方——这种通信是点对点的。消息代理支持这种方式,也支持另一种称为发布-订阅的通信类型,其中消息被传递给多个客户端。
消息代理的一个常见用途是解决两个不同系统之间的集成问题。发送到消息代理的数据通常采用发送者和接收者都认可的格式。两个系统要使用消息代理唯一需要达成一致的就是数据契约。消息通常有一个消息体,其中存储消息本身的内容,以及消息头,消息头是键/值对,提供关于消息体本身的一些元数据,这些元数据可用于帮助消息的消费者处理消息。消息头可以是任何你喜欢的内容,但它们通常与消息本身或消息的处理器相关。
Java 消息服务 (JMS) API 指定了与消息代理交互的客户端接口。每个消息代理提供自己的 API 实现,非常类似于 JDBC 驱动程序对 JDBC API 所做的事情。这意味着 JMS 客户端通常应使用与服务器相同版本的客户端。有许多优秀的 JMS 代理实现可供选择。其中一个原因是消息传递始终是应用程序开发的重要组成部分,并且在今天变得更加重要。自 1.1 版本以来,JMS 一直是 J2EE(现在是 Java EE)规范的一部分。在过去的大部分十年中,JMS 规范一直停留在 1.1 版本。
在 JMS 中,客户端使用一个 javax.jms.ConnectionFactory
来创建一个 javax.jms.Connection
。然后可以使用 Connection
来创建一个 javax.jms.Session
。Session
代表客户端与代理的交互,允许发送和接收消息以及其他不太明显的操作。
接口上最有用的方法涉及到消息生产者和消息消费者的创建,它们用于向 javax.jms.Destination
发送和从其接收消息。Destination
映射了 JMS 中消息代理上的“地址”概念。它也映射了代理存储消息的位置概念。在 JMS 中,消息发送到、存储在以及从同一个地方消费,所有这些都由一个 javax.jms.Destination
实例表示。
Destination
是一个接口,它有两个更具体的子接口:javax.jms.Queue
和 javax.jms.Topic
。Queue
代表一个标准队列,它是一个之前描述的点对点结构。Topic
提供发布-订阅消息传递,并且可以将单个消息传递给多个接收者。
要将消息发送到 Destination
,必须创建一个 javax.jms.MessageProducer
。然后可以使用 MessageProducer
来发送 javax.jms.Message
消息。
JMS 支持两种不同的接收消息机制。第一种方式是请求消息,使用 javax.jmx.MessageConsumer#receive()
方法,该方法以同步方式从 Destination
返回单个消息;默认情况下,该方法会阻塞直到收到消息。客户端可以使用 javax.jms.Session#setMessageListener(MessageListener)
方法来安装 javax.jms.MessageListener
,而不是使用 MessageConsumer
。MessageListener
是一个接口,只有一个方法 public void onMessage(javax.jms.Message)
,只要 Destination
上有可供消费的 javax.jms.Message
,就会调用该方法。MessageListener
提供异步消息处理:消息到达时即被处理。
JMS API 中还有更多内容需要学习,但这些类和概念在我们讨论 Spring 对 JMS 消息传递的支持时对你帮助最大。第一层支持是 org.springframework.jms.core.JmsTemplate
,它提供了简化方法,将我们刚刚讨论的内容简化为一行代码。JmsTemplate
需要一个 javax.jms.ConnectionFactory
实例来执行其工作。JmsTemplate
可以为你完成很多工作。例如,发送消息时,JmsTemplate
会建立一个 javax.jms.Session
,设置一个 javax.jms.MessageConsumer
或 javax.jms.MessageProducer
,设置所有用于事务的机制,并为你提供当前 javax.jms.Session
的引用,以便你可以创建所需的消息并发送它。有了所有的错误处理和构建逻辑,这很容易节省几十行代码。一旦你的消息发送出去,它就会销毁或关闭大部分这些对象。这在应用服务器(如 Java EE 服务器)中是标准做法,因为 ConnectionFactory
实例由服务器创建,由服务器管理,并进行池化。它们在使用后缓存这些实例。在这些环境中关闭资源只是将它们返回到连接池。因此,在标准情况下,假设 ConnectionFactory
缓存或池化实例,JmsTemplate
会做正确的事情。
在像应用服务器这样的受管环境中,你通常需要从 JNDI 获取 javax.jms.ConnectionFactory
。你可以使用 Spring 来查找该引用并配置一个 JmsTemplate
。在我们的示例中,我们希望操作更松散,所以我们将使用独立的 ActiveMQ 消息代理。ActiveMQ 是一个流行的开源消息代理。要使用它,请下载它,然后在 bin 文件夹中运行适合你操作系统的启动脚本。在你的应用程序中,你需要客户端库来连接相应版本的 ActiveMQ。在撰写本文时,ActiveMQ 的最新版本是 5.4.2。如果你正在使用 Maven,请将以下依赖项添加到你的 Maven pom 文件中
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>${activemq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-optional</artifactId>
<version>${activemq.version}</version>
</dependency>
请确保为 ${activemq.version}
创建一个 Maven 属性,或者手动将字符串替换为相应的版本。还有一个 activemq-all
依赖项,但它会引入许多可能不必要的 jar 包。对于我们的应用程序,上面提到的两个依赖项就足够了。
让我们看看一个基本的 JMS 应用程序的配置。首先,让我们看看基本的 Spring XML 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
">
<context:property-placeholder location="jms.properties"/>
<context:component-scan base-package="org.springsource.greenbeans.examples.jms.core"/>
<context:component-scan base-package="org.springsource.greenbeans.examples.jms.jmstemplate"/>
<tx:annotation-driven transaction-manager="jmsTransactionManager"/>
</beans>
你可以看到 XML 主要设置了属性占位符解析并启用了类路径扫描。最有趣的部分是@Transactional
注解的方法启用事务。该元素引用了 Spring 上下文中的另一个 bean:jmsTransactionManager
,它在以下 Java 配置类中定义。
package org.springsource.greenbeans.examples.jms.core;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*
import org.springframework.jms.connection.*
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
@Configuration
public class JmsConfiguration {
@Value("${broker.url}")
private String brokerUrl;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(this.brokerUrl);
return new CachingConnectionFactory(activeMQConnectionFactory);
}
@Bean
public JmsTransactionManager jmsTransactionManager() {
return new JmsTransactionManager(this.connectionFactory());
}
@Bean
public JmsTemplate jmsTemplate() {
return new JmsTemplate(this.connectionFactory());
}
}
配置相当简单。首先,我们定义一个 ActiveMQConnectionFactory
实例,然后将其提供给 Spring Framework 的 CachingConnectionFactory
实例。有些代理提供自己的缓存 ConnectionFactory
实现。但是,如果你的代理没有,那么你可以始终使用 Spring 的 Caching ConnectionFactory 实现来获得速度提升。
接下来,我们有一个 JmsTransactionManager
,它提供 JMS 本地事务。在 JMS 中,事务回滚只有两种结果:发送操作失败时,消息未发送;接收操作失败时,消息会被消息代理重新排队。后一种情况可能很复杂。
如果你收到一条消息,然后在处理过程中遇到错误,并且假设你保持了事务开放,那么事务会回滚,消息会返回到代理。消息一旦回到代理会发生什么取决于代理和你的配置。通常,消息会立即被重新投递。然而,这并非总是期望的行为。因此,大多数(如果不是全部)代理都支持某种概念上的死信队列,无法投递的消息会被发送到该队列。该队列中的消息可以按你希望的任何方式处理——也许某个监控工具可以在出现这种错误情况时唤醒某人。然而,大多数代理提供了更多控制。可能可以在错误消息的路由上设置规则。例如,代理可能会立即尝试重新投递消息,然后如果再次失败,它可能会等待一段时间再试,如果还是失败,则会等待更长时间。这通常称为退避(backing off)周期。也许达到某个阈值后,消息可能会被投递到死信队列,或直接丢弃。无论如何,请查阅你的代理文档。
最后,我们通过向 JmsTemplate
提供 ConnectionFactory
的引用来创建它。
让我们看看 JmsTemplate
的实际应用。为了保持示例简单,我们首先讨论如何在恰当命名的 Producer
类中发送消息。消息传递的一个常见用途是将通知发送给一个(或许多)不同的系统,作为同步机制,以便感兴趣的系统拥有某些数据的最新版本。在这个例子中,我们假设有一个简单的 Customer
POJO,包含标准字段:firstName
、lastName
、email
和 id
。
package org.springsource.greenbeans.examples.jms.core;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*
import org.springframework.beans.factory.annotation.*
import org.springframework.jms.core.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*;
@Component
public class Producer {
@Value("${jms.customer.destination}")
private String customerDestination;
@Autowired
private JmsTemplate jmsTemplate;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void sendCustomerUpdate(final Customer customer) throws Exception {
this.jmsTemplate.send(this.customerDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
log.info("Sending customer data " + ToStringBuilder.reflectionToString(customer));
MapMessage mapMessage = session.createMapMessage();
mapMessage.setLong("id", customer.getId());
mapMessage.setString("firstName", customer.getFirstName());
mapMessage.setString("lastName", customer.getLastName());
mapMessage.setString("email", customer.getEmail());
}
});
}
}
在该类中,我们看到一个 sendCustomerUpdate
方法,它接受一个 Customer 引用作为参数。使用 JmsTemplate
的 send 方法——它接受两个参数:第一个是目的地名称的字符串(“customers”),第二个是 Spring Framework 类 MessageCreator
的引用——我们使用传递到我们实现的 createMessage(javax.jms.Session)
方法中的 javax.jms.Session
引用来构建一个 JMS 消息。在 JMS 中可以创建许多类型的消息:javax.jms.TextMessage
、javax.jms.ObjectMessage
、javax.jms.MapMessage
等。ObjectMessage
正如你所期望的那样——它允许你将序列化对象作为 JMS 消息的有效载荷进行传输。通常应避免这样做。序列化的数据类型会将消息的生产者和消费者都耦合到相同的 API 契约,这可能并非总是可行。即使在消息交换的两侧确保类型可用且类版本相同是可行的,与其他更灵活的选项相比,这样做通常效率低下。相反,更倾向于分解——也许你可以使用 javax.jms.TextMessage
将对象编组为 XML 或 JSON 字符串。或者,使用 javax.jms.MapMessage
发送对象的组成部分(原始类型),而不是对象本身,javax.jms.MapMessage
只是一个包含已知键/值对的消息,就像 java.util.Map
一样。这是我们在这里采用的方法。所有 JVM 都支持 int
、long
、String
等原始类型,并且可以以这种方式反序列化传输的数据。
现在让我们看看在 JMS 中接收消息。第一种方法是同步请求消息,一次一个。
package org.springsource.greenbeans.examples.jms.jmstemplate;
import org.apache.commons.logging.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*
@Component
public class RawJmsTemplatePollingMessageConsumer {
@Autowired
protected JmsTemplate jmsTemplate;
@Value("${jms.customer.destination}")
private String destination;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void receiveAndProcessCustomerUpdates() throws Exception {
Message message = this.jmsTemplate.receive(this.destination);
if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message ;
String firstName = mapMessage.getString("firstName");
String lastName = mapMessage.getString("lastName");
String email = mapMessage.getString("email");
Long id = mapMessage.getLong("id");
Customer customer = new Customer(id, firstName, lastName, email );
log.info("receiving customer message: " + customer);
}
}
}
此示例使用 JmsTemplate
实例在有可用新消息时接收它,然后将其转换为 Customer 对象,其步骤与我们发送消息时的逆过程相同,转换后的对象被非常有用地写入日志。如果你需要重复进行这种对 JMS 消息的打包和解包,将会变得很繁琐。将这类逻辑提取到单独的类中通常很有价值。Spring JMS 层次结构支持使用 MessageConverter
层次结构的实例,让你能够覆盖对象的序列化方式。默认的 SimpleMessageConverter
在没有指定其他选项时生效,并且在大多数情况下表现良好,因此我们在这里不覆盖它。然而,如果我们决定以 XML 格式传输对象,我们可以利用 MarshallingMessageConverter
,它利用了 Spring Framework 的 OXM(对象到 XML 编组)支持。最后,请注意 receiveAndProcessCustomerUpdates
方法用 @Transactional
注解进行了修饰。如果在接收消息时出现任何问题,并且抛出了 Exception
,Spring 将回滚接收操作并将消息返回到代理。
这个例子足够简单,但有一些限制。首先,我们的代码与 JMS 和 Spring API 紧密耦合。其次,这只处理一条消息,而且仅在方法被调用时处理。确保方法被调用是实现者的责任。通常,实现者希望消息一到达就尽快异步处理。自然的下一步可能是在一个无限循环中连续调用接收方法,以确保队列中的所有消息都尽快得到处理。之后,为了对于长时间运行的任务实现更高的吞吐量并确保队列始终被清空,你可能会添加线程,以便多个循环始终运行。这些都是逻辑上的后续步骤,但仅仅为了接收和处理消息就需要大量工作。实际上,这里的唯一业务逻辑是接收消息有效载荷并对其进行处理的代码。
Spring Framework 提供了一个开箱即用的解决方案来解决这个问题,而且使用起来非常简单!Spring Framework 中有两种适合不同情况的实现提供了此功能。它们都基于 AbstractJmsListeningContainer
类。如果你愿意,可以直接使用这个层次结构,但实际上,使用 Spring 的 JMS 命名空间来配置它有更简单的方法。
让我们回顾一下之前的 Spring XML 配置,添加 http://www.springframework.org/schema/jms
命名空间,然后添加相应的配置。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jms="http://www.springframework.org/schema/jms"
...
xsi:schemaLocation="… http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
….
<jms:listener-container connection-factory="connectionFactory" acknowledge="auto" transaction-manager="jmsTransactionManager">
<jms:listener destination="${jms.customer.destination}" ref="messageListenerContainerConsumer" method="receiveMessage" />
</jms:listener-container>
</beans>
我们只摘录了已添加到配置文件中的部分。<jms:listener-container>
元素需要引用正在使用的连接工厂和事务管理器。请注意,Spring 消息监听器提供了自己的缓存机制,因此在这里应该使用常规的 ConnectionFactory
:CachingConnectionFactory
在这里是冗余的,不应使用。在元素中,你可以配置任意数量的 <jms:listener>
元素,每个元素指定一个 javax.jms.Destination
实例的名称以及一个将接收新消息的 Spring bean 的引用。此外,你还可以配置在引用的 bean 中应调用哪个方法。如果 Spring bean 实现了 javax.jms.MessageListener
或 Spring 自带的 SessionAwareMessageListener
接口之一,则将使用 javax.jms.Message
调用这些接口上的唯一方法,并且无需指定方法。如果配置了方法,该方法应接受与 javax.jms.Message
的有效载荷类型相同的对象作为参数。对于我们的示例,由于我们期望接收 javax.jms.MapMessage
实例,这将是一个 java.util.Map
实例。
修改后的代码如下
package org.springsource.greenbeans.examples.jms.messagelistenercontainer;
import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;
import java.util.Map;
@Component
public class MessageListenerContainerConsumer {
private Log log = LogFactory.getLog(getClass());
public void receiveMessage(Map<String, Object> message) throws Exception {
String firstName = (String) message.get("firstName");
String lastName = (String) message.get("lastName");
String email = (String) message.get("email");
Long id = (Long) message.get("id");
Customer customer = new Customer(id, firstName, lastName, email);
log.info("receiving customer message: " + customer);
}
}
不错吧?你的代码不关心 JMS,甚至几乎完全不关心 Spring(除了 @Component
注解。当然,你可以简单地使用 XML 或 Java 配置来配置这个 bean,这样也可以避免这个依赖)。此外,你的代码也更容易理解。所有相同的规则都适用——例如,接收期间抛出的异常将触发回滚。你可以使用 XML 的 <jms:listener-container >
元素来指定你想要的监听器数量,从而提高并发性。你还可以控制使用哪种类型的事务管理。
虽然 JMS 是一个非常强大的选项,但它并非没有局限性。客户端与代理版本耦合,安排已部署系统和代理的升级变得非常繁琐。JMS 本质上是以 Java 为中心的。客户端使用 Java 语言驱动程序连接到给定的代理。消息传递完全是为了集成,而在如此多不同平台的今天,我们不能总是假设我们只与其他 Java 客户端集成。虽然一些 JMS 消息代理(甚至是开源的)可以扩展到令人难以置信的吞吐量,但确实存在更快的消息传递选项,如果你的情况需要,至少值得研究替代方案。JMS 是一个好的 API,但没人会称它为最好的 API。因此,尽管许多消息代理支持 JMS,但它们也支持其专有 API 或更强大或更具表达力的替代 API。一个例子是 JMS 在消息发送后缺乏路由能力。
应对这些挑战的一个流行选项是 AMQP 标准。AMQP(高级消息队列协议)最初诞生于摩根大通银行在关键任务应用中面临的挑战。从他们最初的工作中产生了一项规范,围绕这项规范最终形成了一个工作组,如今该工作组包括众多公司,如高盛、Progress Software、微软、Novell、红帽、WS02、高盛、美国银行、巴克莱、思科、瑞士信贷、德意志交易所系统,当然还有 VMware 的 SpringSource 部门。特别是 SpringSource,开发了最流行的基于 AMQP 的消息代理实现 RabbitMQ。
RabbitMQ 是一个开源消息代理。它易于安装,尤其适用于你正在运行的许多系统,其包管理器已经提供了 RabbitMQ。RabbitMQ 使用 Erlang 语言编写。通常,实现细节并不重要,但这个特定的细节之所以突出,是因为 RabbitMQ 的速度。你看,Erlang 是一种轻量级语言,最初用于关键任务电话系统。Erlang 具有一个非常轻量级、直观的线程模型,这使得 Erlang 程序能够实现比当前 JVM 所能达到的更高的并发性。此外,Erlang 的线程模型与其网络模型无缝融合。这意味着扩展到多个线程或多台机器的方式基本相同。所有这一切都表明 RabbitMQ 速度很快。真的很快,并且它对错误具有弹性,这是爱立信等公司能够享受九个九(99.9999999%)可用性的原因之一。
AMQP 是一种线协议(如 HTTP),而不是 API。这使得它与语言无关(事实上,有数十种针对不同语言和平台的已知客户端),这意味着 RabbitMQ 在各种通常不关心消息代理的工具中得到支持,例如 WireShark,一个网络流量监控工具。从概念上讲,任何 AMQP 客户端都应该能够与任何其他 AMQP 实现进行通信。
AMQP 规范规定了客户端和服务器端的所有构造,以及例行管理选项。在 AMQP 中,客户端创建与服务器的连接。客户端可以将消息发送到交换器(exchanges)。交换器将消息路由到代理内部的队列,或者完全丢弃它们。交换器是无状态的守门人,而队列实际上是排队和存储消息的地方。
客户端可以从队列消费消息。交换器和队列之间没有固定的关系:你可以创建任意数量的队列,并将其中一个或多个绑定到一个交换器。交换器和队列之间的关系称为绑定(binding)。如果消息中的路由键匹配某个绑定,交换器将最多向队列投递一份消息副本。这一点很重要,因为我之前说过,可以为单个队列指定许多交换器和绑定。多次匹配不会产生多条消息。交换器决定什么是匹配。有几种知名的交换器,它们指定了不同的匹配算法。
javax.jms.Topic
,用于发布-订阅式消息传递)javax.jms.Queue
,用于点对点消息传递)该规范还允许添加特殊的交换器。例如,RabbitMQ 添加了插件交换器(plugin exchange),这基本上是为第三方(或者实际上是 RabbitMQ 本身)提供额外功能的扩展点。这促进了可安装插件列表的不断增长,这些插件涵盖了从发送 XMPP 消息、处理复制到显示用于管理的 Web UI 等各种功能。
我们将研究 Spring AMQP 的使用,它是 Spring 组合项目,让你能够使用 RabbitMQ 完成规范要求的所有事情,同时也能执行更高级的 RabbitMQ 特定操作。
让我们开始构建我们的示例——其设计基本上与 JMS 示例相同——使用 RabbitMQ 和 Spring AMQP 客户端。首先你需要相应的依赖项。如果你使用 Maven,请将以下依赖项添加到你的 pom.xml
文件中
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${com.rabbitmq.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
请确保为 ${spring.amqp.version}
和 ${com.rabbitmq.version}
属性占位符创建 Maven 属性,或者直接明确替换为相应的版本。在撰写本文时,${spring.amqp.version}
是 1.0.0.M2
,而 ${com.rabbitmq.version}
是 2.1.0
。正如我们在之前的示例中所做的那样,我们安装了一个简单的 Spring XML 配置文件来引导其他所有内容。唯一不同的是,从 <tx:annotation-driven>
元素引用事务管理器实现名称、扫描的包以及加载的属性文件的名称!所以,我们不要在设置上浪费太多时间,直接进入基于 AMQP 的示例配置。
package org.springsource.greenbeans.examples.amqp.core;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.*
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;
@Configuration
@SuppressWarnings("unused")
public class AmqpConfiguration {
@Value("${broker.url}")
private String brokerUrl;
@Value("${broker.username}")
private String username;
@Value("${broker.password}")
private String password;
@Value("${amqp.customer.queue}")
private String customerQueueName;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(this.singleConnectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new JsonMessageConverter();
}
@Bean
public ConnectionFactory singleConnectionFactory() {
SingleConnectionFactory connectionFactory = new SingleConnectionFactory(this.brokerUrl);
connectionFactory.setUsername(this.username);
connectionFactory.setPassword(this.password);
return connectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(this.rabbitTemplate());
}
@Bean
public Queue customerQueue() {
Queue q = new Queue(this.customerQueueName);
amqpAdmin().declareQueue(q);
return q;
}
@Bean
public DirectExchange customerExchange() {
DirectExchange directExchange = new DirectExchange(customerQueueName);
this.amqpAdmin().declareExchange(directExchange);
return directExchange ;
}
@Bean
public Binding marketDataBinding() {
return BindingBuilder.from(customerQueue()).to(customerExchange()).with(this.customerQueueName);
}
}
正如你所见,这里的配置比我们的 JmsTemplate
多了一些,但别担心,主要部分在形式和功能上与其 JMS 对应部分相同。其余的只是细节。首先,我们配置了一些常见的组件——TransactionManager
(RabbitTransactionManager
)、ConnectionFactory
实例和 RabbitTemplate
。这些大部分应该是不言自明的。
让我们深入探讨一些不同之处。第一个细微之处是我们在 RabbitTemplate
上配置了对 JsonMessageConverter
的引用。记住:AMQP 与语言和平台无关。从 Java 发送到 AMQP 代理的消息很可能由 .NET 或 Python 或 PHP 上的客户端消费。当消息被打包并通过网络发送时,有效载荷是以字节流的形式传输的。消息接收者需要能够将这些字节恢复为可在接收者平台上读取的内容。如果消息使用的是 Java 对象,那么这些字节将是序列化的 Java 对象,只有另一侧具有相同类的 Java 客户端才能反序列化它。因此,就像 Spring JMS 支持一样,Spring AMQP 提供了一个 MessageConverter
层次结构。Spring AMQP 层次结构包含一个 MarshallingMessageConverter
,以及一个 SimpleMessageConverter
,此外,它还有一个 JsonMessageConverter
(目前 Spring AMQP 项目所独有),可以将对象转换为 JSON(一种所有主流语言和平台都能解析且比 XML 更简洁/宽松的 JavaScript 对象表示格式),也可以从 JSON 转换回来。在 JMS 中,智能序列化是一个效率和设计问题,但在 AMQP 中,这是一个更为紧迫的问题,因此请注意配置的 MessageConverter
。
你会发现配置中有四个对象在 JMS 示例中没有对应项。第一个是 AmqpAdmin
。AMQP 在协议级别定义了创建应用程序工作所需一切的命令,包括交换器、队列和绑定。在 Spring AMQP API 中,AmqpAdmin
是执行这些命令的关键接口。
在 customerQueue
方法中,我们配置了一个 AMQP 队列,并在 customerExchange
方法中配置了一个 DirectExchange
。最后,我们使用 Spring AMQP 流式 BindingBuilder
API 将队列连接到我们的交换器。在我们的特定示例中——我们将路由键为 “customers”
的消息发送到一个名为“customers”的队列。在我们的特定示例中,除了队列之外,我们不需要声明任何其他内容即可使其工作,因为无名交换器将自动生效并简单地基于路由键路由消息。然而,了解它是如何完成的还是很有用的,即使它有点冗余。我们使用 AmqpAdmin
实例来“声明”这些构造。这些对象是幂等的。你可以“声明”它们一百万次,如果它们已经存在,那么除了其中一次声明之外,其他什么都不会发生,因此在应用程序启动时重复调用是无害的。此外,如果这些构造被设置为持久化,那么甚至每次都不需要声明它们。
让我们看看如何发送消息。
package org.springsource.greenbeans.examples.amqp.core;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
@Component
public class Producer {
@Value("${amqp.customer.exchange}")
private String exchange;
@Value("${amqp.customer.queue}")
private String routingKey;
@Autowired
private RabbitTemplate rabbitTemplate;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void sendCustomerUpdate(Customer customer) {
log.info("sending customer update " + ToStringBuilder.reflectionToString(customer));
this.rabbitTemplate.convertAndSend(this.exchange , this.routingKey, customer);
}
}
在这个类中,我们使用 RabbitTemplate
来发送消息并将其转换为 JSON。我们指定要使用的 routingKey
和应该使用的交换器(两者都设置为“customers”,与我们在配置中设置的绑定类型保持一致)。我们将该类配置为使用 @Transactional
,因此任何发送消息失败的行为都将与使用 JMS 失败时相同。
现在,让我们看看使用 AMQP 接收消息的选项。
package org.springsource.greenbeans.examples.amqp.amqptemplate;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
@Component
public class RawAmqpTemplatePollingMessageConsumer {
@Autowired
protected RabbitTemplate amqpTemplate;
@Value("${amqp.customer.queue}")
private String queue;
private Log log = LogFactory.getLog(getClass());
@Transactional
public void receiveAndProcessCustomerUpdates() throws Exception {
Customer msg = (Customer)this.amqpTemplate.receiveAndConvert(this.queue);
log.info("receiving customer message: " + ToStringBuilder.reflectionToString( msg));
}
}
不出所料,这看起来与第一个同步 JMS 示例几乎完全相同(除了 RabbitTemplate
)。我们省去了一些在第一个示例中必须处理的转换逻辑,但除此之外,它基本上是相同的。如果在消息接收时发生事务回滚,消息将被返回到队列的末尾,最终会被重新投递。
Spring AMQP 也支持异步消息接收,就像 Spring JMS 支持一样。然而,由于 Spring AMQP 项目仍然是一个新生项目,目前没有等效的命名空间支持。因此,我们需要自己配置对象。将以下内容添加到你的配置中。
@Autowired
private MessageListenerContainerConsumer messageListenerContainerConsumer;
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setTransactionManager(this.rabbitTransactionManager());
container.setConnectionFactory(singleConnectionFactory());
container.setQueueName(this.customerQueueName);
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
this.messageListenerContainerConsumer, this.jsonMessageConverter());
container.setMessageListener(messageListenerAdapter);
return container;
}
此配置注入了将进行处理的组件(见下文,messageListenerContainerConsumer
实例通过组件扫描被发现并自动注册到 Spring,这就是我们在此自动装配它的原因),然后配置了一个 SimpleMessageListenerContainer
实例,该实例将负责接收消息、管理事务并在将接收到的消息交给 POJO 之前进行转换。
POJO 本身看起来像这样
package org.springsource.greenbeans.examples.amqp.messagelistenercontainer;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;
@Component
public class MessageListenerContainerConsumer {
private Log log = LogFactory.getLog(getClass() );
public void handleMessage(Customer cu){
log.info("Received customer " + ToStringBuilder.reflectionToString(cu)) ;
}
}
这个类比另一个类更能受益于消息转换器。在这里,我们能够声明一个接受 Customer 类型参数的方法,而 MessageListenerContainer
知道如何转换它,然后将其传递给 handleMessage
方法。然而,所有相同的规则都适用。异常会触发回滚等等。
在本文中,我们探讨了希望使用 Spring 框架在应用中集成企业消息传递的开发者目前可用的两种选项。我们介绍了用于处理企业消息的 Java 消息服务 (JMS) API 和高级消息队列协议 (AMQP)。我们使用核心 Spring 框架和 Spring AMQP 项目提供了同步和异步示例。我们探讨了消息传递如何帮助扩展应用以及如何成为集成应用的便捷方式。我希望这能帮助您更容易理解在使用企业消息传递时的可用选项,以及 Spring 如何让您更容易为您的应用做出正确的选择。一如既往,本文的代码可在我们的 Spring 示例 仓库中找到。