package com.example.messagingrabbitmq;
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
RabbitMQ 消息传递
本指南将引导您创建一个 Spring Boot 应用程序,该应用程序可以向 RabbitMQ AMQP 服务器发布消息和订阅消息。
您将构建什么
您将构建一个应用程序,该应用程序使用 Spring AMQP 的 RabbitTemplate
发布消息,并使用 MessageListenerAdapter
在 POJO 上订阅消息。
您需要什么
-
约 15 分钟
-
常用的文本编辑器或 IDE
-
Java 17 或更高版本
如何完成本指南
要在您的本地环境中查看最终结果,您可以执行以下任一操作:
-
下载并解压本指南的源代码仓库
-
使用 Git 克隆仓库:
git clone https://github.com/spring-guides/gs-messaging-rabbitmq.git
-
Fork 仓库,这样您可以通过提交 pull request 来请求更改本指南
设置 RabbitMQ Broker
在构建消息传递应用程序之前,您需要设置一个服务器来处理接收和发送消息。本指南假设您使用Spring Boot Docker Compose 支持。这种方法的前提是您的开发机器上有一个可用的 Docker 环境,例如 Docker Desktop。添加一个依赖项 spring-boot-docker-compose
,它会执行以下操作:
-
在您的工作目录中搜索
compose.yml
和其他常见的 compose 文件名 -
使用找到的
compose.yml
调用docker compose up
-
为每个支持的容器创建服务连接 bean
-
在应用程序关闭时调用
docker compose stop
要使用 Docker Compose 支持,您只需遵循本指南即可。根据您引入的依赖项,Spring Boot 会找到正确的 compose.yml
文件并在您运行应用程序时启动您的 Docker 容器。
如果您选择自己运行 RabbitMQ 服务器而不是使用 Spring Boot Docker Compose 支持,您有几种选择:
-
下载服务器并手动运行它
-
如果您使用 Mac,可以使用 Homebrew 安装
-
使用
docker-compose up
手动运行compose.yaml
文件
如果您选择这些替代方法中的任何一种,则应从 Maven 或 Gradle 构建文件中删除 spring-boot-docker-compose
依赖项。您还需要向 application.properties
文件添加配置,如准备构建应用程序一节中更详细地描述的那样。如前所述,本指南假设您在 Spring Boot 中使用 Docker Compose 支持,因此目前不需要对 application.properties
进行额外更改。
从 Spring Initializr 开始
您可以使用这个预初始化的项目,然后点击 Generate 下载 ZIP 文件。这个项目已经配置好,适合本指南中的示例。
手动初始化项目
-
导航到 start.spring.io。这项服务会引入应用程序所需的所有依赖项,并为您完成大部分设置。
-
选择 Gradle 或 Maven 以及您想使用的语言。本指南假设您选择了 Java。
-
点击 Dependencies,然后选择 Spring for RabbitMQ 和 Docker Compose Support。
-
点击 Generate。
-
下载生成的 ZIP 文件,这是一个包含根据您的选择配置好的应用程序的压缩包。
如果您的 IDE 集成了 Spring Initializr,您可以直接在 IDE 中完成此过程。 |
创建 RabbitMQ 消息接收器
对于任何基于消息传递的应用程序,您都需要创建一个接收器来响应已发布的消息。以下列表(来自 src/main/java/com/example/messagingrabbitmq/Receiver.java
)展示了如何实现:
`Receiver` 是一个 POJO,它定义了一个用于接收消息的方法。当您注册它来接收消息时,您可以给它起任何您想要的名字。
为了方便起见,这个 POJO 还带有一个 CountDownLatch 。这让它可以发出消息已被接收的信号。这通常不是您会在生产应用程序中实现的功能。 |
注册监听器并发送消息
Spring AMQP 的 RabbitTemplate
提供了使用 RabbitMQ 发送和接收消息所需的一切。但是,您需要:
-
配置消息监听器容器。
-
声明队列、交换机以及它们之间的绑定。
-
配置一个组件来发送一些消息以测试监听器。
Spring Boot 会自动创建一个连接工厂和一个 RabbitTemplate,从而减少您需要编写的代码量。 |
您将使用 RabbitTemplate
发送消息,并将 Receiver
注册到消息监听器容器以接收消息。连接工厂驱动两者,让它们能够连接到 RabbitMQ 服务器。以下列表(来自 src/main/java/com/example/messagingrabbitmq/MessagingRabbitmqApplication.java
)展示了如何创建应用程序类:
package com.example.messagingrabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class MessagingRabbitmqApplication {
static final String topicExchangeName = "spring-boot-exchange";
static final String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(MessagingRabbitmqApplication.class, args).close();
}
}
`@SpringBootApplication` 注解提供了许多好处,如参考文档中所述。
在 listenerAdapter()
方法中定义的 bean 会被注册为容器中(在 container()
中定义)的消息监听器。它监听 spring-boot
队列上的消息。由于 Receiver
类是一个 POJO,它需要被包装在 MessageListenerAdapter
中,您可以在其中指定它调用 receiveMessage
方法。
JMS 队列和 AMQP 队列有不同的语义。例如,JMS 将排队的消息只发送给一个消费者。虽然 AMQP 队列也做同样的事情,但 AMQP 生产者不直接将消息发送到队列。相反,消息被发送到交换机,交换机可以将其路由到单个队列,或者扇出到多个队列,模拟 JMS 主题的概念。 |
消息监听器容器和接收器 bean 是监听消息所需的全部。要发送消息,您还需要一个 Rabbit 模板。
`queue()` 方法创建一个 AMQP 队列。`exchange()` 方法创建一个主题交换机。`binding()` 方法将这两者绑定在一起,定义了当 `RabbitTemplate` 发布到交换机时发生的行为。
Spring AMQP 要求将 Queue 、TopicExchange 和 Binding 声明为顶级 Spring bean,以便正确设置。 |
在本例中,我们使用一个主题交换机,队列通过路由键 foo.bar.#
进行绑定,这意味着任何使用以 foo.bar.
开头的路由键发送的消息都将被路由到该队列。
发送测试消息
在此示例中,测试消息由 CommandLineRunner
发送,它还会等待接收器中的闩锁,并关闭应用程序上下文。以下列表(来自 src/main/java/com.example.messagingrabbitmq/Runner.java
)展示了它是如何工作的:
package com.example.messagingrabbitmq;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(MessagingRabbitmqApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
注意,模板将消息路由到使用路由键 foo.bar.baz
的交换机,这与绑定匹配。
在测试中,您可以 mock 掉 runner,以便在隔离环境中测试接收器。
运行应用程序
`main()` 方法通过创建一个 Spring 应用程序上下文来启动该过程。这将启动消息监听器容器,它开始监听消息。有一个 `Runner` bean 会自动运行。它从应用程序上下文中检索 `RabbitTemplate`,并在 `spring-boot` 队列上发送一条“Hello from RabbitMQ!”消息。最后,它关闭 Spring 应用程序上下文,应用程序结束。
您可以通过您的 IDE 运行 main 方法。请注意,如果您从解决方案仓库克隆了项目,您的 IDE 可能会在错误的位置查找 `compose.yaml` 文件。您可以配置您的 IDE 以在正确的位置查找,或者您可以使用命令行运行应用程序。`./gradlew bootRun` 和 `./mvnw spring-boot:run` 命令将启动应用程序并自动找到 `compose.yaml` 文件。
准备构建应用程序
要在没有 Spring Boot Docker Compose 支持的情况下运行代码,您需要一个在本地运行的 RabbitMQ 版本来连接。为此,您可以使用 Docker Compose,但首先必须对 `compose.yaml` 文件进行两处更改。首先,将 `compose.yaml` 中的 `ports` 条目修改为 `'5672:5672'`。其次,添加一个 `container_name`。
`compose.yaml` 文件现在应如下所示:
services: rabbitmq: container_name: 'guide-rabbit' image: 'rabbitmq:latest' environment: - 'RABBITMQ_DEFAULT_PASS=secret' - 'RABBITMQ_DEFAULT_USER=myuser' ports: - '5672:5672'
您现在可以运行 `docker-compose up` 来启动 RabbitMQ 服务。现在您应该有一个外部 RabbitMQ 服务器,随时可以接受请求。
此外,您需要告诉 Spring 如何连接到 RabbitMQ 服务器(使用 Spring Boot Docker Compose 支持时,这会自动处理)。将以下代码添加到 `src/main/resources` 下的新 `application.properties` 文件中:
spring.rabbitmq.password=secret spring.rabbitmq.username=myuser
构建应用程序
本节介绍运行本指南的不同方法:
无论您选择哪种方式运行应用程序,输出应该都是相同的。
要运行应用程序,您可以将应用程序打包成可执行的 jar 文件。`./gradlew clean build` 命令会将应用程序编译成可执行的 jar。然后您可以使用 `java -jar build/libs/messaging-rabbitmq-0.0.1-SNAPSHOT.jar` 命令来运行该 jar 文件。
或者,如果您有可用的 Docker 环境,可以使用 buildpacks 直接从 Maven 或 Gradle 插件创建 Docker 镜像。借助Cloud Native Buildpacks,您可以创建可在任何地方运行的 Docker 兼容镜像。Spring Boot 直接为 Maven 和 Gradle 提供了 buildpack 支持。这意味着您只需输入一个命令,即可快速将一个合适的镜像构建到本地运行的 Docker daemon 中。要使用 Cloud Native Buildpacks 创建 Docker 镜像,请运行 `./gradlew bootBuildImage` 命令。在 Docker 环境启用后,您可以使用 `docker run --network container:guide-rabbit docker.io/library/messaging-rabbitmq:0.0.1-SNAPSHOT` 命令运行应用程序。
`--network` 标志告诉 Docker 将我们的指南容器连接到外部容器正在使用的现有网络。您可以在Docker 文档中找到更多信息。 |
无论您选择哪种方式构建和运行应用程序,您应该会看到以下输出:
Sending message...
Received <Hello from RabbitMQ!>
总结
恭喜!您刚刚使用 Spring 和 RabbitMQ 开发了一个简单的发布-订阅应用程序。除了本指南涵盖的内容之外,您还可以通过Spring 和 RabbitMQ 做更多事情,但本指南应该提供了一个良好的开端。
另请参阅
以下指南也可能对您有帮助:
想撰写新指南或对现有指南贡献力量?请查看我们的贡献指南。
所有指南的代码均采用 ASLv2 许可发布,文字内容采用 知识共享署名-禁止演绎 3.0 (CC BY-ND 3.0) 许可发布。 |