消息驱动 POJO!

工程 | Mark Fisher | 2006年8月11日 | ...

在所有新的 Spring 2.0 功能和改进中,我必须承认消息驱动 POJO 是我个人最喜欢的功能之一。我有一种感觉,许多其他 Spring 用户也会有同感。

我在这里提供一个快速介绍。还有很多内容要展示,我会在后续帖子中继续。不过,就目前而言——这应该能为您提供足够的信息,让您开始使用一些真正基于 POJO 的异步 JMS!我希望您和我一样对此感到兴奋 ;)

先决条件

您需要在类路径中包含以下 JAR 文件。我还列出了我正在使用的版本(任何 spring-2.x 版本都应该可以。事实上,我大约在 2 分钟前才将 RC3 放在那里)

  • activemq-core-3.2.2.jar
  • concurrent-1.3.4.jar
  • geronimo-spec-j2ee-managment-1.0-rc4.jar
  • commmons-logging-1.0.4.jar
  • log4j-1.2.9.jar
  • jms-1.1.jar
  • spring-2.0-rc3.jar

设置环境

首先,我们需要设置环境。我将使用 ActiveMQ,但更改提供商的影响将仅限于此文件中的修改。我将此文件命名为“shared-context.xml”,因为正如您很快将看到的那样,我将为 JMS 通信的两端导入这些 Bean 定义。以下是“共享”Bean 定义:连接工厂和两个队列(一个用于请求,一个用于回复)


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                           http://www.springframework.org/schema/beans/spring-beans.xsd">
	
    <bean id="requestQueue" class="org.activemq.message.ActiveMQQueue">
        <constructor-arg value="requestQueue"/>
    </bean>
 
    <bean id="replyQueue" class="org.activemq.message.ActiveMQQueue">
        <constructor-arg value="replyQueue"/>
    </bean>
 
    <bean id="connectionFactory" class="org.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://:61616"/>
    </bean>
 
</beans>

正如您所见,我将在 TCP 上运行 ActiveMQ(我只是从发行版的 bin 目录中运行“activemq”)。也可以嵌入式运行(使用“vm://”),或者您可以运行 org.activemq.broker.impl.Main 类的 main 方法。如果您想获取发行版,请访问:http://www.activemq.org

示例域

我在这里故意保持简单——主要目标是演示各个部分是如何组合在一起的。但我最想指出的一件事是,我“域”中的这些类是 POJO。您将完全看不到 Spring 或 JMS 的依赖项。

最终,我们将接受用户的输入(通过 stdin 的“name”),并将其转换为某个未指定的事件的“注册请求”。消息将异步发送,但我们将有另一个队列来处理回复。ReplyNotifier 将然后将确认(或“未确认”消息)写入 stdout。

顺便说一句,我正在一个“blog.mdp”包中创建所有这些类。第一个类是 RegistrationRequest


package blog.mdp;

import java.io.Serializable;

public class RegistrationRequest implements Serializable {

    private static final long serialVersionUID = -6097635701783502292L;

    private String name;
	
    public RegistrationRequest(String name) {
        this.name = name;
    }
	
    public String getName() {
        return name;
    }
}

接下来是 RegistrationReply


package blog.mdp;

import java.io.Serializable;

public class RegistrationReply implements Serializable {

    private static final long serialVersionUID = -2119692510721245260L;

    private String name;
    private int confirmationId;
	
    public RegistrationReply(String name, int confirmationId) {
        this.name = name;
        this.confirmationId = confirmationId;
    }
	
    public String toString() {
        return (confirmationId >= 0) 
                ? name + ": Confirmed #" + confirmationId 
                : name + ": Not Confirmed";
    }
}

以及 RegistrationService


package blog.mdp;

import java.util.HashMap;
import java.util.Map;

public class RegistrationService {
	
    private Map registrations = new HashMap();
    private int counter = 100;

    public RegistrationReply processRequest(RegistrationRequest request) {
        int id = counter++;
        if (id % 5 == 0) {
            id = -1;
        }
        else {
            registrations.put(new Integer(id), request);
        }
        return new RegistrationReply(request.getName(), id);
    }
}

正如您所见,这只是提供一个示例。实际上,注册映射可能会做一些事情。另外,您可以看到 20% 的注册尝试将被拒绝(给定一个 -1 的 confirmationId)——这不是处理注册请求的实用方法,但它会为回复消息提供一些变化。同样,重要的是这个服务类与 Spring 或 JMS 没有任何联系。尽管如此,正如您稍后将看到的,它将处理通过 JMS 发送的消息的有效负载。换句话说,这个 RegistrationService 就是 消息驱动 POJO

最后,创建一个简单的类来记录回复消息


package blog.mdp;

public class ReplyNotifier {

    public void notify(RegistrationReply reply) {
        System.out.println(reply);
    }
}

配置消息驱动 POJO

现在是最重要的一部分。我们如何使用 Spring 配置 POJO 服务,以便它可以接收 JMS 消息?答案是 2 个 bean 定义(嗯,如果算上服务本身,就是 3 个)。在这个下一个 bean 定义文件中,请注意实际接收消息并启用异步 监听器 使用的“container”。容器需要知道它从中接收消息的 connectionFactorydestination。有多种类型的容器可用,但这超出了本博客的范围。有关更多信息,请阅读参考文档:消息监听器容器

在这种情况下,“listener”是 Spring 的 MessageListenerAdapter 的实例。它有一个指向 delegate(POJO 服务)和处理程序方法的名称的引用。在这种情况下,我们还提供了一个 defaultResponseDestination。对于返回 void 的方法,您显然不需要这样做。同样(在生产应用程序中可能更常见),您可以省略它,而选择设置传入的 JMS 消息的“reply-to”属性。

既然我们已经讨论了各种参与者,以下是 bean 定义(我将此文件命名为“server-context.xml”)


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                           http://www.springframework.org/schema/beans/spring-beans.xsd">
	
    <import resource="shared-context.xml"/>
	
    <bean id="registrationService" class="blog.mdp.RegistrationService"/>
	
    <bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="registrationService"/>
        <property name="defaultListenerMethod" value="processRequest"/>
        <property name="defaultResponseDestination" ref="replyQueue"/>
    </bean>
	
    <bean id="container" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageListener" ref="listener"/>
        <property name="destination" ref="requestQueue"/>
    </bean>
	
</beans>

这里的最后一步是为运行服务提供一个引导机制,因为这是一个简单的独立示例。我创建了一个微不足道的 main 方法来启动一个具有相关 bean 定义的 ApplicationContext,然后阻塞


package blog.mdp;

import java.io.IOException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class RegistrationServiceRunner {
	
    public static void main(String[] args) throws IOException {
        new ClassPathXmlApplicationContext("/blog/mdp/server-context.xml");
        System.in.read();
    }
}

配置客户端

在“客户端”方面,我们将发送注册请求并记录回复。首先,我将列出 bean 定义。在上一节之后,您应该理解“container”和“listener”的作用。在这种情况下,delegateReplyNotifier,并且由于它返回 void,因此它本身不发送回复(因此,不存在“defaultResponseDestination”属性)。我将此文件命名为“client-context.xml”

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                           http://www.springframework.org/schema/beans/spring-beans.xsd">
	
    <import resource="shared-context.xml"/>
	
    <bean id="replyNotifier" class="blog.mdp.ReplyNotifier"/>
	
    <bean id="listener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="replyNotifier"/>
        <property name="defaultListenerMethod" value="notify"/>
    </bean>
	
    <bean id="container" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageListener" ref="listener"/>
        <property name="destination" ref="replyQueue"/>
    </bean>
	
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="requestQueue"/>
    </bean>
	
</beans>

那里还定义了另一个 bean——Spring 的“jmsTemplate”的实例。我们将使用它将注册请求消息发送到其 defaultDestination。使用 Spring 提供的简单的 convertAndSend(..) 方法,发送 JMS 消息非常简单。我创建了一个类,该类接受用户输入,然后使用此“jmsTemplate”发送消息


package blog.mdp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

public class RegistrationConsole {
	
    public static void main(String[] args) throws IOException {
        ApplicationContext context = new ClassPathXmlApplicationContext("/blog/mdp/client-context.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate");
		
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));		
		
        for (;;) {
            System.out.print("To Register, Enter Name: ");
            String name = reader.readLine();
            RegistrationRequest request = new RegistrationRequest(name);
            jmsTemplate.convertAndSend(request);
        }
    }
}

运行示例

现在是激动人心的部分。启动 ActiveMQ broker(如“设置环境”部分中简要讨论的)。运行 RegistrationServiceRunner 的 main(..) 方法。运行 RegistrationConsole 的 main(..) 方法。输入一个名字,您应该在同一个控制台中看到回复。

更多资源

希望这足以让您了解 Spring 的新消息驱动 POJO 支持。但是,正如我提到的,还有很多内容——不同的容器类型、事务支持、消费者线程配置、可插拔的消息转换策略等等。请继续关注 Interface21 Team Blog 以获取有关这些功能的更多示例和信息。在此期间,您可以查看 Spring 参考文档关于 JMS。另外,请务必访问 Spring 支持论坛的“Remoting and JMS”部分,开始探索这个激动人心的新功能。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有