保持领先
VMware 提供培训和认证,助力你快速进步。
了解更多为高度可伸缩系统设计良好的路由拓扑可能像绘制图表一样。需要考虑很多事情,例如问题本身、环境的限制、消息实现的限制以及性能策略。我们经常遇到的问题是,路由缺乏灵活性和表达能力来满足我们的需求。这正是 RabbitMQ 的突出之处。
api.agents.agent-{id}.operations.{operationName}
在更复杂的情况下,路由键可以与基于消息头字段和/或其内容的路由结合使用。交换机检查消息的属性、头字段、消息体内容以及可能来自其他来源的数据,然后决定如何路由消息。基于上述路由键思想的绑定模式可能看起来像 api.agents..operations.
,其中我们将交换机 E1
与队列 Q1
使用绑定模式 api.agents..operations.
进行绑定,以便发送到 E1
的任何消息如果其路由键匹配绑定模式,就会路由到 Q1
。
Rabbit Broker 的结构与 JMS Broker 不同。每个 RabbitMQ 服务器至少包含一个节点 (broker),或者更典型的是集群中的多个节点。每个节点都有一个默认的虚拟主机 "/",并且可以创建更多虚拟主机,例如 "/develoment"。Rabbit 虚拟主机类似于 Tomcat 的虚拟主机,将 Broker 数据划分为子集。在这些虚拟主机内部是交换机和队列。用户使用其凭据连接时,是连接到 Rabbit 节点上的虚拟主机。
这里我们连接到 Rabbit 节点,声明要发布到的交换机、要消费的队列、绑定模式,然后发布一些消息,使用 RabbitMQ Java 客户端 API。
package org.demo.simple.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public final class RocketSender {
public void sendRockets() throws IOException {
List<String> rocketsWithRoutings = new RocketRouter().build();
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
String rocketExchange = "rockets.launched";
channel.exchangeDeclare(rocketExchange, "topic");
String rocketQueue = channel.queueDeclare().getQueue();
channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");
for (String rocketTo : rocketsWithRoutings) {
channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
}
channel.close();
connection.close();
}
}
对“已着陆”火箭的简单消费可能看起来像这样:
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);
int landed = 0;
while (landed < launched) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
String rocketLanded = new String(delivery.getBody());
if (rocketLanded.equalsIgnoreCase("Alderaan")) {
System.out.println("That's no moon, that's a space station.");
}
landed++;
}
为了保持简单,我们考虑两种策略:
要警惕绑定流失。在策略二中,如果你创建许多新队列及其绑定,每当消费者连接时,你可能会遇到问题。例如,给定有大量消息发布的交换机 E1...En
,每当消费者 Cm
连接时,它就会从其自己的队列创建到所有 E1...En
的绑定,这可能会导致问题,具体取决于连接速率。
为了缓解绑定流失,考虑使用交换机到交换机绑定,这是 2.3.1 版本的新功能。每个消费者可以拥有自己的二级交换机 Ym
,该交换机必须是非自动删除的。然后将所有 E1...En
绑定到 Ym
。这样,这些绑定始终存在。在此场景中,每当消费者 Cm
连接时,它只需声明其队列并将该队列绑定到 Ym
。如果 Ym 是一个扇出交换机,它将非常快,并将绑定流失率降低到每次连接 1 个绑定,而不是可能每次连接 n 个绑定。
现在考虑创建共享主题交换机:一个用于代理到服务器的路径,另一个用于服务器到代理的路径,第三个用于处理未认证的代理,仅路由到不需要安全认证的队列。现在我们使用绑定模式、消息路由键进行分区,并为每个服务器设置一组供所有连接到它的代理共享。然后,以最简单的形式,当每个代理上线时,它会声明一个私有交换机和队列,并将其交换机绑定到共享主题交换机。
我们的关系现在通过交换机到交换机的映射来表达,这减少了流失率并解耦了代理与“知道”服务器队列的依赖关系。使用此模式,系统变得清晰、解耦且可伸缩。
认证客户端交换机将消息从代理路由到服务器。它处理所有发布消息到单消费者队列的操作,包括那些产生消息频率最高的队列。根据当前拓扑,对于 10,000 个客户端,这可能是一个潜在的瓶颈,每分钟大约产生 60,000 条消息,即每天 86,400,000 条消息。这很容易解决,根据你的配置(例如是否持久化消息),RabbitMQ 每天可以处理超过 10 亿条消息。
我们的服务器应用正在运行一个 RabbitMQ 集群。记住,在集群中,声明一个交换机会使其出现在所有节点上,而声明一个队列只会在其中一个节点上创建,因此我们必须配置一个解决方案。
RabbitMQ 会动态地将消息推送到磁盘以释放 RAM,因此队列的内存占用不依赖于其内容。队列空闲 10 秒或更长时间后,它会“休眠”,这会触发该队列的 GC。因此,队列所需的内存量可以显著缩小。例如,1000 个空的、空闲的队列可能占用 10MB 的 RAM。当它们全部活跃时(即使为空),当然,根据内存碎片情况,它们可能会占用更多内存。强迫它们再次进入休眠状态以测试行为很困难,因为 Erlang VM 不会立即将内存返还给操作系统。
但是,你可以观察到一个休眠且内存非常碎片化的大进程,因为回收的内存量足以迫使 VM 将内存返还给 OS。如果你运行一个测试,该测试稳步增加 Rabbit 的内存占用,你可以观察到休眠对空闲进程的影响,因为它降低了内存使用量的增长速度。
Erlang 是一个多线程 VM,它利用多核优势。它向开发者提供了“绿色线程”,被称为“进程”,因为与线程不同,它们概念上不共享地址空间。这里有一篇关于 Erlang VM 和进程 的有趣自述。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>