领先一步
VMware 提供培训和认证,助您加速进步。
了解更多随着新应用程序利用消息队列(如 RabbitMQ)的可扩展性优势以及云规模数据存储(如 Riak)的优势,两者结合并快速成为好友(那种真正面对面交流的朋友,而不是只在 Facebook 上联系的朋友)是不可避免的。
如今,我们编写的许多应用程序都会在同一个应用程序中包含这两个功能。我们经常希望在收到消息后更新数据,或者在数据更新后发送消息。两个新的实用工具促进了 RabbitMQ 和 Riak 的集成,使您可以在各自的服务器内直接完成其中任何一项操作。
实验性的 RabbitMQ Riak 自定义交换器的目的是将 AMQP 消息从代理发送到 Riak 集群。当然,您也可以简单地将一个消费者绑定到一个特定的交换器来自己完成。如果您正在进行任何类型的消息转换,那么您可能仍然需要在特殊的消费者中进行。但是,拥有一个专门用于此目的的交换器类型,可以在最小的配置或开销下,让您在消息的最终去向方面获得极大的灵活性。您可以在传递给交换器声明操作的参数中指定 Riak 服务器的 IP 地址和 Protocol Buffers 端口,或者可以将该信息作为 AMQP 消息头传递——或者两者都进行。您可以指定存储桶和键信息作为 AMQP 消息头,或者让它分别从交换器名称和路由键推断出来——或者您可以组合使用,例如覆盖消息将存储的存储桶名称。
消息被发送到 Riak 后,该交换器会调用主题交换器的路由逻辑,这意味着该交换器就像一个普通的主题交换器一样工作——除了它会将收到的所有消息存储在 Riak 中。在不久的将来,将支持 RabbitMQ 支持的所有交换器类型,而不仅仅是主题交换器类型。
在内部,该交换器使用连接池将消息发送到 Riak。要将池中可用客户端的数量从默认的五个扩展,只需在声明交换器时设置“maxclients”参数即可。
硬币的另一面是您可以安装到 Riak 服务器中的 postcommit hook(提交后钩子),它会在每次条目修改时将任何更新的 Riak 对象发送到 RabbitMQ 服务器。
要指定消息的发送位置,您可以包含多种不同的“X-Riak-Meta-”类型的元数据头在您的条目中。以下是可以在每个单独条目上或在存储桶中具有“AMQP-Meta”键的特殊文档上设置的可能选项的完整列表。这将提供一组默认的元数据头,如果实际条目上不存在这些头,就会检查它们。
识别的完整头部列表是
其中大多数选项都是不言自明的。需要注意的一个选项是“X-Riak-Meta-Amqp-Ignore”头部。通过将其设置为“true”,RabbitMQ postcommit hook 将会忽略对此条目的任何更新,而不会像平常一样发送消息。
这个概念当然很简单,但其影响是深远的。
处理 RabbitMQ 集群时的一个问题是它底层使用了 mnesia。在许多分布式设置中,这并不总是理想的。尤其是经常断开连接的 WAN 节点可能会因为无法与其他代理建立稳定连接而受到影响。
通过指定发送 Riak 更新的 RabbitMQ 服务器,实际上可以设置如下所示的场景:
此图中的两个 RabbitMQ 服务器并未集群。通过结合使用 RabbitMQ 的 Riak 交换器类型和 Riak 的 RabbitMQ postcommit hook,两个服务器上的消费者将以类似于 RabbitMQ 的 shovel 插件 的方式接收消息。
请记住,这种自定义交换器类型并未解决跨 WAN 节点通信的根本问题(或者在任何节点会临时出现或消失的场景中,例如动态扩展情况)。当然,凡事皆有权衡,您的体验可能会有所不同。目前,这两个实用工具都无法处理重试或重新发送。还没有官方路线图,但如果有的话,重试将是首要事项。
使用基于 Riak 的消息交换器的一个好处是您的消息都得到了存储。由于 Riak 是一个云规模的数据存储,您可以保存交换器收到的每条消息,而无需担心存储开销(只需添加更多 Riak 服务器即可增加容量)。这也意味着您可以编写一个简单的 Web 界面来显示这些消息,并且通过简单地更新消息,您可以重新发送任何(或所有!)它们。这可以成为重放一系列消息进行测试的好方法,或者如果您想将所有消息重新发送到其他地方,只需更改其中一个元数据头部指向不同的代理即可。
这只是一个全新的、但可能非常强大的工具的简要介绍。我敢肯定您已经有了如何将这种方法应用到您自己的问题领域的想法。我非常想听听您的计划!通过 Twitter(@j_brisbin)告诉我您正在做什么。