Green Beans:企业消息传递和 Spring 入门

工程 | Josh Long | 2011 年 1 月 26 日 | ...

在本文中,我们将介绍消息传递的核心概念,以及 Spring 框架及其姐妹项目提供的丰富消息传递支持。

什么是消息传递?为了最好地解释这一点,我将转述 Gregor Hohpe 和 Bobby Woolf 的开创性著作《企业集成模式》(Addison Wesley, 2004)中提供的示例。当你打电话时,你会尝试将信息传达给另一方。只有当对方在你打电话时能够接听,这才会起作用。由于并不总能接到电话,因此我们使用语音邮箱来排队消息。呼叫者将消息留在语音邮箱中,然后被呼叫者可以稍后异步地检索消息(或实际上,许多消息)。

在这个例子中,语音邮箱位于双方之间。它存储消息,然后在被呼叫者(接收者)检索时将其传递。在企业消息传递领域,情况非常相似:一方将消息发送到消息代理(也称为消息导向中间件 - MOM),而另一方——当该方能够——在稍后接收或显式查询消息代理中的任何消息。

这正是类比停止有用的地方。与语音邮箱相比,消息代理具有许多选项。消息代理非常适合提供附加服务,例如路由,并保证消息传递。消息代理可以针对不同的用例进行优化,例如,你可以权衡速度与持久性。消息代理可能会将消息持久化到外部存储以确保持久性,尽管这通常是可以通过配置来切换以提高速度的选项。

在语音邮箱示例中,消息由一方发送,然后传递给另一方——通信是*点对点*的。消息代理支持这一点,以及另一种称为*发布/订阅*的通信类型,其中消息会传递给多个客户端。

消息代理的一个常见用途是解决两个不同系统之间的集成问题。发送到消息代理的数据通常是发送方和接收方都通用的格式。两个系统在使用消息代理时需要就数据合同达成一致。消息通常包含消息正文,其中存储了消息本身的内容,以及消息头,它们是键/值对,提供了有关消息正文的元数据,可用于帮助消息的消费者处理消息。消息头可以是任何你想要的内容,但它们通常与消息本身或消息的处理器相关。

Java Message Service

Java 消息服务 (JMS) API 规定了与消息代理交互的客户端接口。每个消息代理都提供自己的 API 实现,非常类似于 JDBC 驱动程序对 JDBC API 的作用。这意味着 JMS 客户端通常应该使用与服务器相同版本的客户端。有许多优秀的 JMS 代理实现可供选择。其中一个原因是消息传递一直是应用程序开发的重要组成部分,并且今天仍然如此。自 1.1 版本以来,JMS 一直是 J2EE(现为 Java EE)规范的一部分。在过去十年的大部分时间里,JMS 规范都停留在 1.1 版本。

在 JMS 中,客户端使用 javax.jms.ConnectionFactory 创建 javax.jms.Connection。然后可以使用 Connection 创建 javax.jms.SessionSession 代表客户端与代理的交互,并允许发送和接收消息以及其他不太明显的。操作。

该接口上最有用的方法与创建 javax.jms.Destination 的消息生产者和消息消费者有关。Destination 映射了消息代理上“地址”的 JMS 概念。它还映射了代理存储消息的位置。在 JMS 中,消息从同一位置发送、存储和消费,所有这些都由 javax.jms.Destination 实例表示。

[caption id="attachment_7506" align="alignnone" width="573" caption="上方,蓝色元素代表生产者和消费者。橙色元素代表代理中缓冲消息的目标。在 JMS 中,这些是主题或队列。"][/caption]

Destination 是一个接口,有两个更具体的子接口 javax.jms.Queuejavax.jms.TopicQueue 代表标准的队列,这是之前描述的点对点构造。Topic 提供发布/订阅消息传递,并将一条消息传递给多个接收者。

要将消息发送到 Destination,您必须创建一个 javax.jms.MessageProducer。然后可以使用 MessageProducer 发送 javax.jms.Message

JMS 支持两种不同的接收消息的机制。第一种方式是使用 javax.jmx.MessageConsumer#receive() 方法来请求消息,该方法以*同步*方式返回 Destination 中的单个消息;默认情况下,该方法会阻塞直到收到消息。而不是使用 MessageConsumer,客户端可以通过调用 javax.jms.Session#setMessageListener(MessageListener) 来安装 javax.jms.MessageListenerMessageListener 是一个接口,只有一个方法 public void onMessage(javax.jms.Message),每当 Destination 上有可供消费的 javax.jms.Message 时,就会调用该方法。MessageListener 提供*异步*消息处理:当消息到达时,它们会被处理。

JMS API 还有很多内容需要学习,但这些类和概念将在我们讨论 Spring 对 JMS 消息传递的支持时对您最有帮助。第一层支持是 org.springframework.jms.core.JmsTemplate,它提供了简化方法,将我们刚刚讨论的内容压缩成单行代码。JmsTemplate 需要一个 javax.jms.ConnectionFactory 实例来执行其工作。JmsTemplate 可以为您完成许多工作。例如,要发送消息,JmsTemplate 会建立一个 javax.jms.Session,设置一个 javax.jms.MessageConsumerjavax.jms.MessageProducer,设置所有事务的机制,并为您提供当前 javax.jms.Session 的引用,以便您可以创建您选择的消息并发送它。所有这些错误处理和构造逻辑,轻松就能节省几十行代码。一旦消息发送完毕,它会销毁或关闭大部分这些对象。这是应用程序服务器(如 Java EE 服务器)中的标准做法,因为 ConnectionFactory 实例由服务器创建、由服务器管理并进行池化。它们在使用后会缓存这些实例。在这些环境中关闭资源只是将它们返回到池中。因此,JmsTemplate 在标准情况下会做正确的事情,假设 ConnectionFactory 缓存或池化实例。

在应用程序服务器等托管环境中,您通常需要从 JNDI 获取 javax.jms.ConnectionFactory。您可以使用 Spring 来为您查找该引用并配置 JmsTemplate。在我们的示例中,我们希望更宽松地操作,因此我们将使用独立的 ActiveMQ 消息代理。ActiveMQ 是一个流行的、开源消息代理。要使用它,请下载它,然后在 bin 文件夹中运行适合您操作系统的启动脚本。在您的应用程序中,您需要客户端库来连接到相应版本的 ActiveMQ。在撰写本文时,ActiveMQ 的最新版本是 5.4.2。如果您使用 Maven,请将以下依赖项添加到您的 Maven pom 文件中



            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>${activemq.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-optional</artifactId>
                <version>${activemq.version}</version>
            </dependency>

请确保为 ${activemq.version} 创建一个 Maven 属性,或者手动将字符串替换为相应的版本。还有一个 activemq-all 依赖项,但它会拉下许多可能不必要的 jar。对于我们的应用程序,上述两个依赖项就足够了。

将 Spring 与 JMS 结合使用

让我们检查一个基本 JMS 应用程序的配置。首先,让我们检查基本的 Spring XML 配置



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
       ">

    <context:property-placeholder location="jms.properties"/>
    <context:component-scan base-package="org.springsource.greenbeans.examples.jms.core"/>
    <context:component-scan base-package="org.springsource.greenbeans.examples.jms.jmstemplate"/>
    <tx:annotation-driven transaction-manager="jmsTransactionManager"/>

</beans>

您可以看到 XML 主要设置了属性占位符解析并启用了类路径扫描。最有趣的部分是元素,它告诉 Spring 启用所有带有 @Transactional 注释的方法上的事务。该元素引用了 Spring 上下文中的另一个 bean,jmsTransactionManager,它是在以下 Java 配置类中定义的。



package org.springsource.greenbeans.examples.jms.core;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*
import org.springframework.jms.connection.*
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;

@Configuration
public class JmsConfiguration {

  @Value("${broker.url}")
  private String brokerUrl;

  @Bean
  public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(this.brokerUrl);
    return new CachingConnectionFactory(activeMQConnectionFactory);
  }

  @Bean
  public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
  }

  @Bean
  public JmsTemplate jmsTemplate() {
    return new JmsTemplate(this.connectionFactory());
  }
}

配置相当简单。首先,我们定义一个 ActiveMQConnectionFactory 实例,然后将其交给 Spring 框架的 CachingConnectionFactory 实例。有些代理提供自己的缓存 ConnectionFactory 实现。但是,如果您自己的代理没有,那么您总是可以使用 Spring Caching ConnectionFactory 实现来提高速度。

接下来,我们有一个 JmsTransactionManager,它提供 JMS 本地事务。在 JMS 中,事务回滚只有两种结果:发送操作失败时,消息未被发送;接收操作失败时,消息会被重新排队到消息代理。后一种情况可能很复杂。

如果您收到一条消息,然后在处理过程中遇到错误,并且假设您保持事务打开,那么事务将被回滚,消息将被返回到代理。一旦进入代理,会发生什么取决于代理和您的配置。通常,消息将被立即重新传递。然而,这并非总是期望的行为。因此,大多数(如果不是全部)代理都支持某种形式的死信队列,无法传递的消息将被发送到其中。您可以按需处理此队列中的消息——也许当这种错误情况发生时,某个监控工具会叫醒某人。然而,大多数代理提供更多的控制。可能可以设置关于错误消息路由的规则。例如,代理可能会尝试立即重新传递消息,然后,如果再次失败,它可能会等待一段时间再试,如果仍然失败,则等待更长时间。这通常称为退避期。也许在达到某个阈值后,消息将被发送到死信队列,或被完全丢弃。无论如何,请检查您的代理文档。

最后,我们通过提供对 ConnectionFactory 的引用来创建一个 JmsTemplate

让我们看看 JmsTemplate 的实际应用。为了保持示例简单,我们首先讨论在一个名为 Producer 的类中如何发送消息。消息传递的一个常见用途是向一个(或多个)不同系统发送通知,作为同步机制,以便感兴趣的系统拥有某个数据项的最新版本。在此示例中,我们假设有一个简单的 Customer POJO,其中包含标准字段:firstNamelastNameemailid


package org.springsource.greenbeans.examples.jms.core;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*
import org.springframework.beans.factory.annotation.*
import org.springframework.jms.core.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*;


@Component
public class Producer {

  @Value("${jms.customer.destination}")
  private String customerDestination;

  @Autowired
  private JmsTemplate jmsTemplate;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void sendCustomerUpdate(final Customer customer) throws Exception {
    this.jmsTemplate.send(this.customerDestination, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
           log.info("Sending customer data " + ToStringBuilder.reflectionToString(customer));
           MapMessage mapMessage = session.createMapMessage();
           mapMessage.setLong("id", customer.getId());
           mapMessage.setString("firstName", customer.getFirstName());
           mapMessage.setString("lastName", customer.getLastName());
           mapMessage.setString("email", customer.getEmail());
      }
    });
  }
}

在类中,我们看到一个 sendCustomerUpdate 方法,它以 Customer 引用作为其参数。使用 JmsTemplate 的 send 方法——它接受两个参数:第一个是目标名称("customers")的字符串,第二个是 Spring 框架类 MessageCreator 的引用——我们使用传递到我们实现的 createMessage(javax.jms.Session) 方法的 javax.jms.Session 引用来构建一个 JMS 消息。JMS 中有许多类型的消息可以创建:javax.jms.TextMessagejavax.jms.ObjectMessagejavax.jms.MapMessage 等。ObjectMessage 的作用正如您所期望的那样——它允许您将序列化的对象作为 JMS 消息的有效负载进行传输。通常,应避免这样做。序列化的数据类型将消息的生产者和消费者与相同的 API 合约耦合在一起,这在并非总是可行的。即使能够保证在消息交换的双方都可用且类版本相同的类型,与更灵活的其他选项相比,这样做通常效率低下。相反,优先选择分解——您可以使用 javax.jms.TextMessage 将对象封送为 XML 或 JSON 字符串。或者,使用 javax.jms.MapMessage 发送对象的构成部分,而不是对象本身,它只是一个具有已知键/值对的消息,就像 java.util.Map 一样。这就是我们在这里采用的方法。所有 JVM 都拥有 intslongsStrings 等,并且可以反序列化以这种方式传输的数据。

现在我们来看一下 JMS 中接收消息。第一种方法是同步地请求它们,一次一个。



package org.springsource.greenbeans.examples.jms.jmstemplate;

import org.apache.commons.logging.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*

@Component
public class RawJmsTemplatePollingMessageConsumer {

  @Autowired
  protected JmsTemplate jmsTemplate;

  @Value("${jms.customer.destination}")
  private String destination;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void receiveAndProcessCustomerUpdates() throws Exception {
    Message message = this.jmsTemplate.receive(this.destination);
    if (message instanceof MapMessage) {

      MapMessage mapMessage = (MapMessage) message ;
      String firstName = mapMessage.getString("firstName");
      String lastName = mapMessage.getString("lastName");
      String email = mapMessage.getString("email");
      Long id = mapMessage.getLong("id");

      Customer customer = new Customer(id, firstName, lastName, email );

      log.info("receiving customer message: " + customer);

    }
  }
}

此示例使用 JmsTemplate 实例在有可用消息时接收一条新消息,然后将其转换为 Customer 对象,步骤与发送消息时相反,并将转换后的对象有益地写入日志。如果不得不重复多次,这种 JMS 消息的打包和解包会变得很繁琐。通常,将此类逻辑提取到单独的类中是有价值的。Spring JMS 层次结构支持使用 MessageConverter 层次结构的实例,让您覆盖对象的序列化方式。默认的 SimpleMessageConverter 在未另行指定时生效,并且在大多数情况下都做得很好,所以我们在这里不覆盖它。但是,如果我们决定要将对象作为 XML 传输,我们可以利用 MarshallingMessageConverter,它利用 Spring 框架的 OXM(对象-XML 封送)支持。最后,请注意 receiveAndProcessCustomerUpdates 方法装饰了 @Transactional 注释。如果在接收消息时发生任何错误,并且抛出了 Exception,Spring 将回滚接收操作并将消息返回给代理。

监听使其更简单

这个例子足够简单,但有一些限制。首先,我们的代码与 JMS 和 Spring API 紧密耦合。其次,这只处理一条消息,并且仅在调用方法时才处理。实现者有责任确保调用该方法。通常,实现者希望消息在到达后尽快处理,异步处理。一个自然的下一步可能是从一个无限循环中连续调用 receive 方法,以确保队列中的所有消息都尽快得到处理。在此之后,为了实现更高的吞吐量,特别是对于长时间运行的任务,并确保队列始终被清空,您可以添加线程,以便始终运行多个循环。这些都是逻辑上的下一步,但仅仅为了接收和处理消息,它们也需要大量的工作。实际上,这里的唯一业务逻辑是处理消息有效负载并对其执行某些操作的代码。

Spring 框架提供了现成的解决方案,并且使用起来很简单!Spring 框架中有两种实现可以提供此功能,适用于不同的情况。它们都基于 AbstractJmsListeningContainer 类。如果您愿意,您可以直接使用此层次结构,但碰巧的是,使用 Spring 的 JMS 命名空间配置它还有一种更简单的方式。

让我们回顾一下我们之前的 Spring XML 配置,添加 http://www.springframework.org/schema/jms 命名空间,然后进行相应的配置。



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jms="http://www.springframework.org/schema/jms"
       ...
       xsi:schemaLocation="…  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
       ….

    <jms:listener-container  connection-factory="connectionFactory" acknowledge="auto" transaction-manager="jmsTransactionManager">
        <jms:listener destination="${jms.customer.destination}" ref="messageListenerContainerConsumer" method="receiveMessage" />
    </jms:listener-container>

</beans>

我们只摘录了已添加到配置文件中的部分。<jms:listener-container> 元素需要对连接工厂和正在使用的事务管理器进行引用。请注意,Spring 消息监听器提供自己的缓存,因此您应该在此处使用普通的 ConnectionFactoryCachingConnectionFactory 在这里是多余的,不应该使用。在 元素中,您可以配置任意多的 <jms:listener> 元素,每个元素指定目标(javax.jms.Destination)实例的名称以及将接收新消息的 Spring Bean 的引用。可选地,您可以配置在引用的 Spring Bean 中应调用哪个方法。如果 Spring Bean 实现 javax.jms.MessageListener 或 Spring 自己的 SessionAwareMessageListener 接口之一,则会调用每个接口上的唯一方法,并将 javax.jms.Message 传递给它,无需指定方法。如果配置了方法,则该方法应以与 javax.jms.Message 的有效负载相同类型的对象作为其参数。对于我们的示例,由于我们期望接收 javax.jms.MapMessage 实例,这将是一个 java.util.Map 实例。

修改后的代码是


package org.springsource.greenbeans.examples.jms.messagelistenercontainer;

import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;

import java.util.Map;

@Component
public class MessageListenerContainerConsumer {

  private Log log = LogFactory.getLog(getClass());

  public void receiveMessage(Map<String, Object> message) throws Exception {
    String firstName = (String) message.get("firstName");
    String lastName = (String) message.get("lastName");
    String email = (String) message.get("email");
    Long id = (Long) message.get("id");
    Customer customer = new Customer(id, firstName, lastName, email);
    log.info("receiving customer message: " + customer);
  }
}

不算差吧?您的代码不了解 JMS,甚至几乎不了解 Spring(除了 @Component 注释)。当然,您也可以使用 XML 或 Java 配置来配置此 bean,这样可以避免此依赖关系。)此外,您的代码更容易理解。所有相同的规则都适用——例如,接收过程中抛出的异常将触发回滚。您可以通过在 XML <jms:listener-container > 元素中指定所需的监听器数量来提高并发性。您还可以控制使用的是哪种事务管理类型。

AMQP

虽然 JMS 是一个非常强大的选项,但它并非没有局限性。客户端与代理版本耦合,并且随着部署的系统和代理的标志日升级,很快就会变得很麻烦。JMS 本质上是 Java 中心化的。客户端使用 Java 语言驱动程序连接到给定的代理。消息传递的本质是集成,我们不能总是假定我们正在与其他 Java 客户端集成,尤其是在这样一个拥有众多不同平台的时代。虽然一些 JMS 消息代理(甚至是开源的)可以扩展到极高的吞吐量,但确实存在更快的消息传递选项,如果您的场景需要,那么至少值得研究替代方案。JMS 是一个不错的 API,但没有人会称之为最好的 API。因此,虽然许多消息代理支持 JMS,但它们也支持其专有 API 或更强大、更具表现力的替代 API。例如,JMS 在消息发送后缺乏路由功能。

一个符合这些挑战的流行选项是 AMQP 标准。AMQP(高级消息队列协议)是一个标准,最初是为了应对摩根大通银行在关键任务应用程序中面临的挑战而诞生的。从他们工作的开端出现了一个规范,围绕这个规范最终形成了一个工作组,今天包括许多公司,如高盛、Progress Software、微软、Novell、Red Hat、WS02、高盛、美国银行、巴克莱、思科、瑞士信贷、德国证券交易所系统,当然还有 VMware 的 SpringSource 部门。SpringSource 特别开发了最流行的基于 AMQP 的消息代理实现 RabbitMQ。

RabbitMQ 是一个 开源消息代理。它易于安装,特别是如果您运行的是许多拥有现成 RabbitMQ 包管理器的系统。RabbitMQ 用 Erlang 语言编写。通常,实现细节不应很重要,但这个细节特别重要,因为它关乎 RabbitMQ 的速度。您知道,Erlang 是一种轻量级语言,最初部署在关键任务电话系统中。Erlang 具有非常轻量级、直观的线程模型,使得 Erlang 程序能够实现比 JVM 目前能够实现的高得多的并发性。此外,Erlang 的线程模型与其网络模型无缝融合。这意味着扩展到多个线程或多台机器基本上是按相同的方式完成的。所有这些都意味着 RabbitMQ 速度很快。非常快,而且它对错误具有弹性,这也是 Ericsson 等公司享有九个九(99.9999999%)可用性的原因之一。

AMQP 是一个线路协议(类似于 HTTP),而不是一个 API。这使得它与语言无关(事实上,有数十种针对不同语言和平台的已知客户端),并且意味着 RabbitMQ 在您通常不认为会关心消息代理的各种工具中都得到了支持,例如 WireShark,一个网络流量监控工具。从概念上讲,任何 AMQP 客户端都应该能够与任何其他 AMQP 实现进行通信。

深入了解 AMQP 代理

AMQP 规范在客户端和服务器端都指定了所有构造,以及例行的管理选项。在 AMQP 中,客户端与服务器建立连接。客户端可以向交换机发送消息。交换机将消息路由到代理内的队列,或完全阻止它们。交换机是无状态的网关,而队列实际队列化并存储消息。

客户端可以从队列中消费消息。交换机和队列之间没有关系:您可以创建任意多的队列,并将其中一个或多个绑定到交换机。交换机和队列之间的关系称为绑定。如果消息中的路由键与绑定匹配,交换机会将最多一个消息副本传递给队列。这一点很重要,因为我之前说过,可以为单个队列指定多个交换机和绑定。多个匹配不会产生多个消息。交换机决定什么构成匹配。有几个众所周知的交换机,它们指定不同的匹配算法。

  • 扇形交换机:扇形交换机将收到的所有消息路由到绑定到该交换机的每个队列(这与 javax.jms.Topic 最相似,用于发布-订阅式消息传递)
  • 直连交换机:当路由键(消息的常见头)与绑定键相同时进行匹配(这与 javax.jms.Queue 最相似,用于点对点消息传递)
  • 主题交换机:主题交换机在 JMS 中没有 API 特定等价物。它最类似于某些消息代理中的分层主题。主题交换机将路由键头与使用特殊语法来允许通配符的交换机绑定进行匹配。例如,绑定键可能指定以下内容:“.years.#”。此通配符将匹配任意一个词,后跟一个点(“.”),然后是“years”,再后跟一个点(“.”),最后是零个或多个词。因此,“taxes.years.2011”会匹配,而“taxes.years,”也会匹配,但“years.2322”不会匹配。
  • 头交换机:匹配头键或头键值对的存在。
[caption id="attachment_7484" align="alignnone" width="698" caption="上面,蓝色的圆圈是生产者和消费者,绿色的元素是交换机,橙色的元素是存储消息的 AMQP 队列。它们只是消息的命名存储,与 JMS 意义上的队列没有关系。"][/caption]

规范还允许添加特殊交换机。例如,RabbitMQ 添加了插件交换机,这基本上是第三方(或 RabbitMQ 本身)提供额外功能的扩展点。这催生了越来越多的可安装插件,它们可以处理从发送 XMPP 消息、处理复制到显示管理 Web UI 等各种事情。

将 Spring 与 AMQP 结合使用

我们将研究 Spring AMQP 的使用,它是 Spring 产品组合项目,允许您使用 RabbitMQ 来完成规范要求的所有工作,以及更多高级的 RabbitMQ 特定操作。

让我们开始构建我们的示例——设计基本上与我们的 JMS 示例相同——使用 RabbitMQ 和 Spring AMQP 客户端。您首先需要的是适当的依赖项。如果您使用 Maven,请将以下依赖项添加到您的 pom.xml 文件中


            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>${com.rabbitmq.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>${spring.amqp.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-amqp</artifactId>
                <version>${spring.amqp.version}</version>
            </dependency>

请务必为 ${spring.amqp.version}${com.rabbitmq.version} 属性占位符创建 Maven 属性,或直接将它们替换为适当的版本。在撰写本文时,${spring.amqp.version}1.0.0.M2${com.rabbitmq.version}2.1.0。就像我们在上一个示例中所做的那样,我们安装了一个简单的 Spring XML 配置文件来引导所有其他内容。唯一不同的是从 <tx:annotation-driven> 元素中引用的事务管理器实现的名称、扫描的包以及加载的属性文件的名称!所以,让我们不要在这里浪费太多时间,直接进入我们基于 AMQP 的示例的配置。


package org.springsource.greenbeans.examples.amqp.core;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.*
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;

@Configuration
@SuppressWarnings("unused")
public class AmqpConfiguration {

  @Value("${broker.url}")
  private String brokerUrl;

  @Value("${broker.username}")
  private String username;

  @Value("${broker.password}")
  private String password;

  @Value("${amqp.customer.queue}")
  private String customerQueueName;

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory());
    rabbitTemplate.setMessageConverter(jsonMessageConverter());
    return rabbitTemplate;
  }

  @Bean
  public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(this.singleConnectionFactory());
  }

  @Bean
  public MessageConverter jsonMessageConverter() {
    return new JsonMessageConverter();
  }

  @Bean
  public ConnectionFactory singleConnectionFactory() {
    SingleConnectionFactory connectionFactory = new SingleConnectionFactory(this.brokerUrl);
    connectionFactory.setUsername(this.username);
    connectionFactory.setPassword(this.password);
    return connectionFactory;
  }

  @Bean
  public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(this.rabbitTemplate());
  }

  @Bean
  public Queue customerQueue() {
    Queue q = new Queue(this.customerQueueName);
    amqpAdmin().declareQueue(q);
    return q;
  }

  @Bean
  public DirectExchange customerExchange() {
    DirectExchange directExchange = new DirectExchange(customerQueueName);
    this.amqpAdmin().declareExchange(directExchange);
    return directExchange ;
  }

  @Bean
  public Binding marketDataBinding() {
    return BindingBuilder.from(customerQueue()).to(customerExchange()).with(this.customerQueueName);
  }
}

正如您所见,这里的处理过程比我们的 JmsTemplate 要多一些,但不要担心,主要部分在形式和功能上与其 JMS 对应项相同。其余的只是细节。首先,我们配置了通常的组件——TransactionManagerRabbitTransactionManager)、ConnectionFactory 实例和 RabbitTemplate。其中大部分应该是相当容易理解的。

让我们深入研究那些不匹配的领域。第一个细微之处是我们为 RabbitTemplate 配置了对 JsonMessageConverter 的引用。请记住:AMQP 是语言和平台无关的。从 Java 发送到 AMQP 代理的消息很可能被 .NET、Python 或 PHP 上的客户端消费。当消息被打包并通过网络发送时,有效负载以字节流的形式传输。消息的接收者需要能够将这些字节还原成接收者平台可读的内容。如果消息使用了 Java 对象,那么这些字节将是序列化的 Java 对象,并且只有另一端具有相同类的 Java 客户端才能反序列化它。因此,正如在 Spring JMS 支持中一样,Spring AMQP 提供了一个 MessageConverter 层次结构。Spring AMQP 层次结构有一个 MarshallingMessageConverter,还有一个 SimpleMessageConverter,此外,它还有一个 JsonMessageConverter(目前是 Spring AMQP 项目特有的),它将对象转换为 JSON(JavaScript 对象表示法,可被所有主流语言和平台解析,并且比 XML 更简洁/不易出错)或从 JSON 转换回来。在 JMS 中,智能序列化是效率和设计的问题,但在 AMQP 中,这是一个更紧迫的问题,所以请注意配置的 MessageConverter

您会在配置中发现四个在 JMS 示例中没有对应项的对象。第一个是 AmqpAdmin。AMQP 在协议级别定义了用于创建应用程序所需的所有内容(包括交换机、队列和绑定)的命令。在 Spring AMQP API 中,AmqpAdmin 是访问这些命令的关键接口。

customerQueue 方法中,我们配置了一个 AMQP 队列,在 customerExchange 方法中配置了一个 DirectExchange。最后,我们使用 Spring AMQP 的流式 BindingBuilder API 将队列连接到我们的交换机。在我们这个特定的示例中——我们发送一个路由键为 "customers" 的消息到一个名为 "customers" 的队列。在我们的特定示例中,除了队列之外,我们不需要声明任何东西就可以使其工作,因为默认的无名交换机会被激活,并仅根据路由键路由消息。然而,即使它有点多余,了解它是如何完成的也很有用。我们使用 AmqpAdmin 实例来 "声明" 这些构造。这些对象是幂等的。您可以声明它们一百万次,如果它们已经存在,那么除了其中一次声明之外,任何其他声明都不会发生什么,所以重复调用应用程序启动时的调用是无害的。此外,如果这些构造被设置为持久化,那么每次都不需要声明它们。

让我们看看如何发送消息。



package org.springsource.greenbeans.examples.amqp.core;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;

@Component
public class Producer {

  @Value("${amqp.customer.exchange}")
  private String exchange;
  
  @Value("${amqp.customer.queue}")
  private String routingKey;

  @Autowired
  private RabbitTemplate rabbitTemplate;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void sendCustomerUpdate(Customer customer) {
    log.info("sending customer update " + ToStringBuilder.reflectionToString(customer));
    this.rabbitTemplate.convertAndSend(this.exchange , this.routingKey, customer);
  }
}

在这个类中,我们使用 RabbitTemplate 发送消息并将其转换为 JSON。我们指定了我们想要使用的 routingKey 以及应该使用哪个交换机(两者都是 "customers",与我们在配置中设置的绑定类型一致)。我们已将该类配置为使用 @Transactional,因此任何发送消息的失败都会与使用 JMS 发生失败时的行为相同。

现在,让我们看看使用 AMQP 接收消息的选项。



package org.springsource.greenbeans.examples.amqp.amqptemplate;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;

@Component
public class RawAmqpTemplatePollingMessageConsumer {

  @Autowired
  protected RabbitTemplate amqpTemplate;

  @Value("${amqp.customer.queue}")
  private String queue;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void receiveAndProcessCustomerUpdates() throws Exception {
    Customer msg = (Customer)this.amqpTemplate.receiveAndConvert(this.queue);
    log.info("receiving customer message: " + ToStringBuilder.reflectionToString(  msg));
  }
}

不出所料,这看起来几乎(除了 RabbitTemplate)与第一个同步 JMS 示例相同。我们省去了一些在第一个示例中需要处理的转换逻辑,但除此之外,基本上是相同的。如果消息接收时发生事务回滚,消息将返回到队列末尾,最终会被重新传递。

Spring AMQP 也支持异步消息接收,就像 Spring JMS 支持一样。然而,由于 Spring AMQP 项目仍然是一个初生的项目,因此没有等效的命名空间支持。所以,我们需要自己配置对象。在您的配置中添加以下内容。



  @Autowired
  private MessageListenerContainerConsumer messageListenerContainerConsumer;

  @Bean
  public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setTransactionManager(this.rabbitTransactionManager());
    container.setConnectionFactory(singleConnectionFactory());
    container.setQueueName(this.customerQueueName);

    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
           this.messageListenerContainerConsumer, this.jsonMessageConverter());
    container.setMessageListener(messageListenerAdapter);
    return container;
  }

此配置注入了将执行处理的组件(见下文,messageListenerContainerConsumer 实例通过组件扫描被拾取并自动注册到 Spring,这就是为什么我们在这里自动注入它的原因),然后配置一个 SimpleMessageListenerContainer 实例,该实例将负责接收消息、管理事务以及在将消息传递给 POJO 之前进行转换。

POJO 本身看起来像这样



package org.springsource.greenbeans.examples.amqp.messagelistenercontainer;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;

@Component
public class MessageListenerContainerConsumer {
  
  private Log log = LogFactory.getLog(getClass() );
  
  public void handleMessage(Customer cu){
    log.info("Received customer " + ToStringBuilder.reflectionToString(cu)) ;
  }
}

这个类比其他类更能受益于消息转换器。在这里,我们可以声明一个接受 Customer 类型参数的方法,而 MessageListenerContainer 知道如何将其转换并传递给 handleMessage 方法。然而,所有相同的规则都适用。异常将触发回滚等。

总结

在本文中,我们探讨了开发人员今天希望在应用程序中集成企业消息传递的两个选项,使用了 Spring 框架。我们介绍了 Java 消息服务 (JMS) API 和高级消息队列协议 (AMQP) 来处理企业消息。我们使用核心 Spring 框架和 Spring AMQP 项目提供了同步和异步示例。我们讨论了消息传递如何帮助扩展应用程序以及它如何成为集成应用程序的便捷方式。我希望这将有助于您更轻松地理解使用企业消息传递时可能做出的选择,以及 Spring 如何让您更容易为您的应用程序做出正确的选择。一如既往,本文的代码可以在我们的 Spring Samples 存储库中找到。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有