使用 RabbitMQ 和 Riak 进行数据事件处理

工程 | Jon Brisbin | 2011 年 4 月 21 日 | ...

随着新应用程序利用 RabbitMQ 等消息代理的可伸缩性优势以及 Riak 等云规模数据存储的优势,两者不可避免地会成为亲密伙伴(那种真正面对面交流的朋友,而不是只通过 Facebook 联系的朋友)。

我们发现自己现在编写的许多应用程序都在同一个应用程序中包含这两个功能。很多时候,我们希望根据消息的结果更新数据,或者根据更新的数据发送消息。两个促进 RabbitMQ 和 Riak 集成的新实用工具允许您直接在其各自的服务器内部执行这些操作。

RabbitMQ 自定义交换器

实验性的 RabbitMQ Riak 自定义交换器的目的是将 AMQP 消息从代理发送到 Riak 集群。当然,您也可以简单地将消费者绑定到特定交换器并自行完成。如果您正在进行任何形式的消息转换,那么您可能仍然需要在特殊的消费者中进行。但拥有一个专用于此目的的交换器类型,可以在最小配置或开销的情况下,让您对消息最终到达的位置拥有很大的灵活性。您可以在传递给交换器声明操作的参数中指定 Riak 服务器的主机和 Protocol Buffers 端口,或者您可以将该信息作为 AMQP 消息头传递——或者您可以两者都做。您可以将桶和键信息指定在 AMQP 消息头中,或者您可以让它分别从交换器名称和路由键推断出来——或者您可以通过覆盖(例如)存储消息的桶名称来结合使用这两种方法。

消息传递到 Riak 后,该交换器会调用主题交换器的路由逻辑,这意味着该交换器的工作方式就像普通主题交换器一样——唯一的区别是它将接收到的所有消息存储在 Riak 中。在不久的将来,将增加对 RabbitMQ 支持的所有交换器类型的支持,而不仅仅是主题交换器类型。

在内部,该交换器使用连接池向 Riak 发送消息。要将池中可用客户端的数量扩展到默认的五个以上,只需在声明交换器时设置“maxclients”参数即可。

RabbitMQ Riak postcommit Hook(后提交钩子)

另一方面是您可以安装到 Riak 服务器中的 postcommit hook(后提交钩子),它将在该条目被修改时向 RabbitMQ 服务器发送任何更新的 Riak 对象。

要指定将此消息发送到何处,您可以在条目上包含多种不同的元数据头,类型为“X-Riak-Meta-”。以下是可以设置在每个单独条目上,或者设置在该桶中键为“AMQP-Meta”的特殊文档上的所有可能选项的完整列表。如果实际条目上不存在任何元数据头,这将提供一组默认的元数据头供检查。

识别的完整头列表为:

  • X-Riak-Meta-Amqp-Exchange
  • X-Riak-Meta-Amqp-Routing-Key
  • X-Riak-Meta-Amqp-Host
  • X-Riak-Meta-Amqp-Port
  • X-Riak-Meta-Amqp-Vhost
  • X-Riak-Meta-Amqp-User
  • X-Riak-Meta-Amqp-Password
  • X-Riak-Meta-Amqp-Ignore

这些选项大多数都是自解释的。需要注意的一个选项是“X-Riak-Meta-Amqp-Ignore”头。通过将其设置为值“true”,RabbitMQ postcommit hook 将忽略对此条目的任何更新,并且不会像通常那样发送消息。

我能用它做什么?

当然,这个概念很简单,但其意义深远。

处理 RabbitMQ 集群时的一个问题是它在底层使用了 mnesia。在许多分布式设置中,这并非总是理想的。特别是间歇连接的 WAN 节点可能会因为与其他代理没有可靠连接而受影响。

通过指定将 Riak 更新发送到哪个 RabbitMQ 服务器,实际上可以建立如下图所示的场景

Riak Shovel Diagram

此图中的两个 RabbitMQ 服务器并未集群。结合使用 RabbitMQ 的 Riak 交换器类型和 Riak 的 RabbitMQ postcommit hook,两个服务器上的消费者都将以类似于 RabbitMQ 的 shovel 插件 的方式接收消息。

请记住,这种自定义交换器类型并未解决跨 WAN 与节点通信的底层问题(或,就此而言,在任何节点随意出现和消失的场景中,例如动态扩展情况)。当然,所有事物都有权衡,因此效果可能会因情况而异。目前,这两种实用工具都不处理重试或重新发送。尚未有官方路线图,但如果存在,重试将排在首位。

使用由 Riak 支持的消息交换器的一个好处是您的消息都存储着。由于 Riak 是一个云规模的数据存储,您可以保留您的交换器接收到的每一条消息,而无需担心存储开销(只需添加更多 Riak 服务器即可获得更大容量)。这也意味着您可以编写一个简单的 Web 界面来显示这些消息,并且只需更新消息,您就可以重新发送其中任何一条(或所有!)消息。这可能是重放一组消息进行测试的好方法,或者如果您想通过更改其中一个元数据头以指向不同的代理来将所有消息重新发送到其他地方。

这只是对一个全新但可能功能强大的工具的快速介绍。我相信您已经有了如何将此方法应用于您自己的问题领域的想法。我很想听听您打算如何做!发推文给我 (@j_brisbin),告诉我您在做什么。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部