RabbitMQ 的性能和可伸缩性路由拓扑

工程 | Helena Edelson | 2011年4月1日 | ...

为高度可伸缩系统设计良好的路由拓扑可能像绘制图表一样。需要考虑很多事情,例如问题本身、环境的限制、消息实现的限制以及性能策略。我们经常遇到的问题是,路由缺乏灵活性和表达能力来满足我们的需求。这正是 RabbitMQ 的突出之处。

基本概念

任何熟悉通用消息传递的人都知道将消息从 A 路由到 B 的概念。路由可以是简单的,也可以是相当复杂的,在为可伸缩、复杂系统设计路由拓扑时,它必须是优雅的。保持简洁和解耦,组件可以在不同负载下良好地进行流量控制。这可以表示为简单的映射或复杂的图。在其最简单的形式下,路由拓扑可以表示为节点,例如层级节点。

Hierarchical nodes in message routing topology

对于 RabbitMQ 或 AMQP 的新手(请注意,Rabbit 支持多种协议,包括 STOMP、HTTP、HTTPS、XMPP 和 SMTP),以下是一些基本组件描述:
  • 交换机 (Exchange) 服务器内的实体,接收来自生产者应用的消息,并可选择将这些消息路由到服务器内的消息队列。
  • 交换机类型 (Exchange type) 某种特定交换机模型的算法和实现。与“交换机实例”相对,后者是服务器内接收和路由消息的实体。
  • 消息队列 (Message queue) 一个命名实体,存储消息并将其转发给消费者应用。
  • 绑定 (Binding) 创建消息队列和交换机之间关系的实体。
  • 路由键 (Routing key) 交换机可用于决定如何路由特定消息的虚拟地址。
对于点对点路由,路由键通常是消息队列的名称。对于主题发布/订阅路由,路由键通常是层级结构的。

api.agents.agent-{id}.operations.{operationName}

在更复杂的情况下,路由键可以与基于消息头字段和/或其内容的路由结合使用。交换机检查消息的属性、头字段、消息体内容以及可能来自其他来源的数据,然后决定如何路由消息。基于上述路由键思想的绑定模式可能看起来像 api.agents..operations.,其中我们将交换机 E1 与队列 Q1 使用绑定模式 api.agents..operations. 进行绑定,以便发送到 E1 的任何消息如果其路由键匹配绑定模式,就会路由到 Q1

Rabbit Broker 的结构与 JMS Broker 不同。每个 RabbitMQ 服务器至少包含一个节点 (broker),或者更典型的是集群中的多个节点。每个节点都有一个默认的虚拟主机 "/",并且可以创建更多虚拟主机,例如 "/develoment"。Rabbit 虚拟主机类似于 Tomcat 的虚拟主机,将 Broker 数据划分为子集。在这些虚拟主机内部是交换机和队列。用户使用其凭据连接时,是连接到 Rabbit 节点上的虚拟主机。

这里我们连接到 Rabbit 节点,声明要发布到的交换机、要消费的队列、绑定模式,然后发布一些消息,使用 RabbitMQ Java 客户端 API

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

对“已着陆”火箭的简单消费可能看起来像这样:


 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
 channel.basicConsume(rocketQueue, false, queueingConsumer);

 int landed = 0;
 while (landed < launched) {
     QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     String rocketLanded = new String(delivery.getBody());

     if (rocketLanded.equalsIgnoreCase("Alderaan")) {
         System.out.println("That's no moon, that's a space station.");
     }
     landed++;
 }

问题

在考虑哪种路由策略在性能本身也可以改进的可伸缩环境中表现最佳时,有很多选项。消息传递的一个优点是提供了各种配置,并能够找出既能解决当前需求又能满足增长需求的正确配置。

为了保持简单,我们考虑两种策略:

  1. 使用层级路由键的高度分区路由,较少的主题交换机。
  2. 数量更多的直接交换机和队列,路由分区少得多。
每种场景都遵循这个用例:每个必须扩展的应用既是生产者又是消费者。

从何处开始

在深入研究一种能够随着时间推移清晰高效地扩展的路由解决方案之前,最好先评估你的环境及其组件。例如,什么有助于扩展?一般来说,解耦、分布式、异步性、并行性、抽象和间接性等等。然后考虑哪些元素是当前或潜在的瓶颈。一个基本原则是,高流量/大容量路径需要更高效的吞吐量,否则你的分发会面临瓶颈的风险。一个练习是按照流量或作为热力图来对它们进行排名。接下来,你能否对你的流量进行分类——是否存在总体模式、主题或类似消息类型,以及它们之间的关系是什么?现在开始考虑整合,如何以及在哪里可以提高效率,并应用经过验证的模式来解决这些热点问题,为了扩展而解耦,并提高性能。

常规路由注意事项

所有交换机类型的行为都不同。以下是一些通用规则:
  • 如果应用图中的路由键是有限域,那么许多扇出交换机可能是合适的选择(每个路由键与一个交换机 1:1 映射)。
  • 如果路由键数量可能无限,请考虑主题交换机。
  • 对于主题路由,性能随着绑定数量的增加而降低。
  • 扇出交换机非常快,因为它们没有路由需要处理,但如果绑定到大量队列,情况就会改变。
  • 直接交换机是主题交换机的一种更快形式,前提是你不需要使用通配符。
  • 对超过 100,000 个队列进行故障排除可能比拥有更多绑定、更少交换机和队列的拓扑更繁琐。
  • 数量非常多的交换机和队列会占用更多内存,这可能很重要,但这确实取决于具体情况。
自 RabbitMQ 2.4.0(2011年3月23日发布)起,新的主题路由算法优化可在峰值时比之前的主题算法快 60 倍。因此,一个建议是使用更少的交换机和队列,而使用更多的路由,因为现在时间增加是最小的。

性能

什么是廉价的?

就内存成本而言,是交换机和绑定。RabbitMQ 构建于 Erlang 之上,其中每个节点 (broker) 都是一个进程,每个队列也是一个进程。默认情况下,Erlang VM 进程限制设置为 1M,可以提高。然而,出于可伸缩性考虑,交换机不是一个进程,它只是 RabbitMQ 内置的 Mnesia 数据库中的一行。在集群中,声明一个交换机会使其出现在集群的所有节点上,而声明一个队列只会在其中一个节点上创建。这解释了为什么交换机在节点重启或在集群中创建节点后仍然存在,而队列不会。

要警惕绑定流失。在策略二中,如果你创建许多新队列及其绑定,每当消费者连接时,你可能会遇到问题。例如,给定有大量消息发布的交换机 E1...En,每当消费者 Cm 连接时,它就会从其自己的队列创建到所有 E1...En 的绑定,这可能会导致问题,具体取决于连接速率。

为了缓解绑定流失,考虑使用交换机到交换机绑定,这是 2.3.1 版本的新功能。每个消费者可以拥有自己的二级交换机 Ym,该交换机必须是非自动删除的。然后将所有 E1...En 绑定到 Ym。这样,这些绑定始终存在。在此场景中,每当消费者 Cm 连接时,它只需声明其队列并将该队列绑定到 Ym。如果 Ym 是一个扇出交换机,它将非常快,并将绑定流失率降低到每次连接 1 个绑定,而不是可能每次连接 n 个绑定。

Exchange-to-Exchange Binding

用例

交换机到交换机可伸缩用例

考虑一个具有自主代理的服务器应用。每个代理都在一个虚拟机上,该虚拟机是弹性扩展系统的一部分。当每个代理启动时,它会向服务器发送一条在线消息,随后是许多其他消息,例如认证和数据传输。如果我们有 1,000 个代理,每个代理声明 50 个直接交换机、队列和绑定,那么每个代理必须知道服务器的队列,以便履行队列声明 (queue.declare) 操作上的绑定契约。这不是一个可伸缩的解决方案。

现在考虑创建共享主题交换机:一个用于代理到服务器的路径,另一个用于服务器到代理的路径,第三个用于处理未认证的代理,仅路由到不需要安全认证的队列。现在我们使用绑定模式、消息路由键进行分区,并为每个服务器设置一组供所有连接到它的代理共享。然后,以最简单的形式,当每个代理上线时,它会声明一个私有交换机和队列,并将其交换机绑定到共享主题交换机。

我们的关系现在通过交换机到交换机的映射来表达,这减少了流失率并解耦了代理与“知道”服务器队列的依赖关系。使用此模式,系统变得清晰、解耦且可伸缩。

弹性扩展用例

让我们将前一个场景进一步推进。我们已经在场景二(许多直接路由)的基础上使用了主题发布/订阅路由。现在假设系统要求增加到数据中心中扩展我们的服务器应用集群,拥有 50,000 或更多代理。我们如何应对不同的负载?

认证客户端交换机将消息从代理路由到服务器。它处理所有发布消息到单消费者队列的操作,包括那些产生消息频率最高的队列。根据当前拓扑,对于 10,000 个客户端,这可能是一个潜在的瓶颈,每分钟大约产生 60,000 条消息,即每天 86,400,000 条消息。这很容易解决,根据你的配置(例如是否持久化消息),RabbitMQ 每天可以处理超过 10 亿条消息。

我们的服务器应用正在运行一个 RabbitMQ 集群。记住,在集群中,声明一个交换机会使其出现在所有节点上,而声明一个队列只会在其中一个节点上创建,因此我们必须配置一个解决方案。

生产者和消费者之间的负载均衡

为了有效处理随着更多客户端应用(代理)上线而可能产生的极高负载,我们可以通过几种方式修改此拓扑。首先,对上述配置进行优化,以在 Rabbit 集群中实现消息的负载均衡。我们可以为 Rabbit 集群中的每个节点创建一个队列。如果我们有四个节点,对于每个高流量队列,我们可以为该操作创建 hfq.{0,1,2,3}。现在每个代理可以随机选择一个节点(通过 0 到 3 之间的一个数字,或者更复杂的轮询实现)进行发布。使用 RabbitMQ 有 RPC 调用,或者你可以使用 Rabbit 管理插件 获取节点数量,这可以在你的轮询算法中使用。

带轮询分发的 Worker 队列

Worker 队列(或任务队列)通常用于有效地在多个 worker 之间分发耗时的任务,并轻松实现工作并行化。此外,此拓扑适用于无需执行资源密集型任务并等待其完成的情况。运行多个 worker 队列可以将这些任务分发到它们之间。

对于 Worker 队列,Rabbit 默认使用轮询分发方法,将每条消息按顺序发送给下一个消费者。每个消费者接收到的消息数量大致相同。如果你声明一个队列并启动 3 个竞争性消费者,将它们绑定到交换机,并发送 20,000 条消息,那么消息 0 将路由到第一个消费者,消息 1 到第二个,消息 2 到第三个,依此类推。如果我们开始积压任务,我们可以简单地增加更多 worker,从而使系统轻松扩展。

性能

内存

上述两种选项都不会必然导致 RabbitMQ 的高负载。创建的交换机和队列数量没有硬性限制,在一个 Broker 上运行 100,000 个队列是没问题的。通过适当的调优和足够的内存,你可以运行超过一百万个队列。

RabbitMQ 会动态地将消息推送到磁盘以释放 RAM,因此队列的内存占用不依赖于其内容。队列空闲 10 秒或更长时间后,它会“休眠”,这会触发该队列的 GC。因此,队列所需的内存量可以显著缩小。例如,1000 个空的、空闲的队列可能占用 10MB 的 RAM。当它们全部活跃时(即使为空),当然,根据内存碎片情况,它们可能会占用更多内存。强迫它们再次进入休眠状态以测试行为很困难,因为 Erlang VM 不会立即将内存返还给操作系统。

但是,你可以观察到一个休眠且内存非常碎片化的大进程,因为回收的内存量足以迫使 VM 将内存返还给 OS。如果你运行一个测试,该测试稳步增加 Rabbit 的内存占用,你可以观察到休眠对空闲进程的影响,因为它降低了内存使用量的增长速度。

Erlang 是一个多线程 VM,它利用多核优势。它向开发者提供了“绿色线程”,被称为“进程”,因为与线程不同,它们概念上不共享地址空间。这里有一篇关于 Erlang VM 和进程 的有趣自述。

事务

发布 10,000 条消息的事务可能需要长达四分钟。RabbitMQ 的一个新功能叫做 发布者确认 (Publisher Confirms),比相同但采用事务的代码快 100 倍以上。如果你没有明确要求实现事务,但确实需要验证,可以考虑这个选项。

要点总结

以下是一些最终要点,可帮助你从实现中获得最大的性能收益:
  • 新的主题路由算法优化在峰值时快 60 倍。
  • 使用通配符的绑定模式,其中 '*' 匹配单个词,比 '#' 匹配零个或多个词要快得多。通配符 '#' 在路由表中处理所需的时间比 '*' 长。
  • 交换机到交换机绑定提高了解耦性,增加了拓扑灵活性,减少了绑定流失,并有助于提高性能。
  • 交换机和绑定非常轻量级。
  • RabbitMQ 发布者确认比 AMQP 事务快 100 倍以上。
  • 队列空闲 >=10 秒后会“休眠”,触发队列的 GC,从而显著减少该队列所需的内存。
  • Worker 队列有助于并行化和分发工作负载。
  • 在 Rabbit 集群中分发 Worker 队列有助于扩展。
  • 对你的拓扑进行负载均衡。
这绝不是关于这个主题的论文,确实还有许多其他模式、拓扑和性能细节需要考虑。策略,一如既往,取决于许多因素,但我希望这些内容足够有帮助,或至少能引导读者朝着正确的方向思考。

获取

RabbitMQ GitHub 源代码 RabbitMQ 二进制下载和插件 Erlang 下载 适用于 Java 和 .NET 的 Spring AMQP API 用于监控 RabbitMQ 的 Hyperic Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>

订阅 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

保持领先

VMware 提供培训和认证,助力你快速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区所有即将到来的活动。

查看全部