领先一步
VMware 提供培训和认证,助你加速进步。
了解更多开发异步应用程序有时会带来挑战,因为你通常需要两个独立的组件才能看到完整的消息发布和消费生命周期。你经常会编写一个消费者,它可以将消息转储到 System.out 或日志文件,只是为了确保你的发布者正在做正确的事情。如果你能在一个组件中模拟消息的发布和消费交互,从而实际看到发生了什么,那将会非常方便。
RabbitMQ Groovy DSL 旨在通过提供一种非常简洁易用的 DSL 语言来创建消息消费者和生产者,从而帮助解决这个问题,让你无需编写任何样板代码即可快速模拟组件之间的消息交互。
RabbitMQ DSL 中的顶级节点是 exchange
节点。除了设置一个由其作用域内的节点继承的名称外,它还在你的代理中声明了该 exchange。
mq.exchange(name: "myexchange") {
}
默认情况下,它会声明一个 direct
exchange。通过 type
属性支持其他 exchange 类型。
mq.exchange(name: "myexchange", type: "topic") {
}
每当你使用 queue
、consume
或 publish
节点(我们稍后会讨论)在 exchange 节点的作用域内时,你的 exchange 名称将从该节点“继承”过来,这样你就无需重复它。
在你的模拟应用程序中发送和接收消息的下一步是声明一个用于接收消息的队列。你可以使用 queue
节点来完成此操作。
mq.exchange(name: "myexchange") {
queue(name: "myqueue", routingKey: "test") {
}
}
在 exchange
节点的作用域内声明此队列也会使其绑定到外围的 exchange。routingKey
属性的值将用于声明此绑定。
这个例子使用了命名队列,但你也可以通过将 name
属性设置为 null 来获得一个匿名的、服务器生成的队列。
mq.exchange(name: "myexchange") {
queue(name: null, routingKey: "test") {
}
}
这个匿名队列的名称在内部被跟踪,因此只要你在该节点的作用域内声明消费者和发布者,你就不需要知道它的名称。但是,如果你想编写一些需要匿名队列名称的辅助函数,只需将你的节点设置为一个变量。该变量的值将是一个 Spring AMQP Queue 对象,该对象有一个属性,说起来很讽刺,也叫 name
。
mq.exchange(name: "myexchange") {
Q = queue(name: null, routingKey: "test") {
}
println "queue name is: ${Q.name}"
}
为了处理接收到的消息,你需要声明一个消费者。消费者的 DSL 在如何附加在接收到消息时执行的代码方面非常灵活。本质上,consume
只是一个 Spring AMQP SimpleMessageListenerContainer,表示它的 consume
节点有几种不同的形式。
声明消费者最简单的方法就是使用 Closure 作为接收到消息时要执行的方法。这个 Closure 的唯一参数将是一个 Spring AMQP Message 对象。
mq.exchange(name: "myexchange") {
queue(name: null, routingKey: "test") {
consume { msg ->
// Handle the message body here, which will always be a byte array
String bodyAsString = new String(msg.body)
println "msg body: ${bodyAsString}"
}
}
}
RabbitMQ Groovy DSL 实际上功能齐全,足以编写一个完整的生产应用程序,尽管我们在本文中只关注模拟应用程序。DSL 的一个特性是事件的概念。事件在消息生命周期的特定固定时间(发布消息之前和之后以及发生错误时)被分派,自定义事件可以作为消息消费者来处理。
要声明事件处理器,你可以使用 on
节点(按照惯例,你可能希望它位于源文件的顶部)。
mq.on error: { err -> err.printStackTrace() },
myevent: { msg -> println "myevent: ${new String(msg.body)}" }
这声明了两个事件处理器:一个用于处理发生的任何异常,另一个用于在我们接收到消息时委托处理。由于在这种情况下我们所做的只是将消息打印到 System.out,因此我们可以轻松地在消费者之间共享代码。
为了告诉我们的消费者在接收到消息时使用此事件处理器,我们使用 consume
节点的 onmessage
属性。
mq.on error: { err -> err.printStackTrace() },
myevent: { msg -> println "myevent: ${new String(msg.body)}" }
mq.exchange(name: "myexchange") {
queue(name: null, routingKey: "test") {
consume onmessage: "myevent"
}
}
但是,你可以将 onmessage 属性设置为不仅仅是一个 String。为了灵活性,你可以将其设置为以下之一:
除非你从你的 Closure 或事件处理器中返回 false
或 null 值,否则你的消费者将继续监听消息。要保持你的消费者处于活动状态并等待消息,只需返回 true
或非 null。
mq.exchange(name: "myexchange") {
queue(name: null, routingKey: "test") {
consume { msg ->
// Handle the message body here, which will always be a byte array
String bodyAsString = new String(msg.body)
println "msg body: ${bodyAsString}"
// Keep listening for messages and don't exit
return true
}
}
}
如果你从你的 Closure 中返回 false
或 null,消费者将退出。
mq.exchange(name: "myexchange") {
queue(name: null, routingKey: "test") {
consume { msg ->
// Handle the message body here, which will always be a byte array
String bodyAsString = new String(msg.body)
println "msg body: ${bodyAsString}"
// I'm done with you, please exit
return false
}
}
}
命令行执行器使用引用计数系统来确定是否有任何消费者仍在活动。从标准 Groovy Closures 返回 false
或 null 将告诉调用者停止消费者的内部 MessageListenerContainer
。然而,在使用 MessageListener
实现时需要注意的一点是,你必须自己关闭消费者。
consume
DSL 节点将返回一个特殊的 Consumer
对象,该对象暴露了一个名为 shutdown
的方法,负责关闭 MessageListenerContainer
。如果你自己实现 MessageListener
,则需要在消费者退出时调用此方法,否则系统将不知道何时完成,并且不会为你关闭消费者。
将 consume
节点设置为一个变量,并在其上调用 shutdown
方法。
mq.exchange(name: "myexchange") {
def consumer
def listener = [
onMessage: { msg ->
println "Invoked from a standard MessageListener"
consumer?.shutdown()
}
] as MessageListener
queue(name: null, routingKey: "test.key") {
consumer = consume onmessage: listener
}
}
使用 DSL 发布消息就像消费消息一样容易。publish
节点基本上有两种变体。
mq.exchange(name: "myexchange") {
// Return a String, a byte array, or an instance of a Spring AMQP Message
publish(routingKey: "test.key") {
"this is from a publish"
}
// Write raw bytes to a ByteArrayOutputStream
publish(routingKey: "test2.key", myHeaderValue: "customHeader", contentType: "text/plain") { out ->
out.write("these are test bytes".bytes)
}
}
在第一个示例中,我们传回一个 String(也可以使用 byte[]
)作为消息体。在第二个示例中,我们设置了标准消息头(此处为 contentType
)以及自定义应用程序头,并且可以写入传递给我们的 Closure 的 ByteArrayOutputStream
。
请注意,你无需将发布和消费分成两个独立的源文件。你可以将这两个功能放在一起,以便更好地了解你最初想通过消息传递实现的目标。
mq.on error: { err -> err.printStackTrace() }
mq.exchange(name: "myexchange") {
queue(name: null, routingKey: "test") {
consume { msg ->
// Handle the message body here, which will always be a byte array
String bodyAsString = new String(msg.body)
println "msg body: ${bodyAsString}"
}
}
publish(routingKey: "test") {
"this is from a publish"
}
}
尽管我们主要关注模拟应用程序,这些应用程序以后可以使用纯 Java(甚至完全使用另一种语言)更健壮地构建出来,但 RabbitMQ DSL 也非常适合编写简单的维护应用程序,或任何需要消息消费和发布但又不想花费大量精力编写一个完整的消息应用程序的场景。
RabbitMQ DSL 在 GitHub 上提供,并采用 Apache 许可证。安装说明在 README 中。
感谢 Joris Kuipers 最近的贡献,你可以使用随附的 rabbitmq.dsld 在 Eclipse 和 STS 中获得 一些 IDE 代码完成支持。