一个简单的Groovy DSL,用于构建RabbitMQ AMQP应用程序

工程 | Jon Brisbin | 2011年6月1日 | ...

异步应用程序在开发过程中有时会遇到挑战,因为您通常需要两个独立的组件来查看完整的消息发布和消耗生命周期。经常会出现您编写了一个可以把消息转储到System.out或日志文件中的消费者,只是为了确保您的发布者正在做正确的事情。如果能在单个组件中模拟消息的发布和消耗交互,您就可以真正看到正在发生的事情,那将非常方便。

RabbitMQ Groovy DSL旨在通过提供一个非常简洁易用的DSL语言来创建消息消费者和生产者,从而帮助您快速模拟组件之间的消息交互,而无需编写任何样板代码。

使用Exchange

RabbitMQ DSL的顶层节点是exchange节点。除了设置一个在其作用域内的节点继承的名称之外,它还在您的代理中声明了exchange。


mq.exchange(name: "myexchange") {
  
}

默认情况下,它将声明一个direct exchange。其他exchange类型通过type属性支持。


mq.exchange(name: "myexchange", type: "topic") {
  
}

当您在exchange节点的作用域内使用queueconsumepublish节点(我们稍后会讨论)时,您的exchange名称将从该节点“继承”,因此您无需重复它。

使用Queue

在您的模拟应用程序中向发送和接收消息迈进的下一步是声明一个队列,您的消息将被发送到其中。您可以使用queue节点来完成此操作。


mq.exchange(name: "myexchange") {

  queue(name: "myqueue", routingKey: "test") {
    
  }
  
}

exchange节点的作用域内声明此队列也会将其绑定到包含的exchange。routingKey属性的值将用于声明此绑定。

此示例使用了一个命名队列,但您也可以通过将name属性设置为null来获得一个匿名的、服务器生成的队列。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
  }
  
}

此匿名队列的名称在内部跟踪,因此只要您在该节点的作用域内声明您的消费者和发布者,您就不需要知道它是什么。但是,如果您想编写一些需要匿名队列名称的帮助函数,只需将您的节点设置为一个变量。该变量的值将是一个Spring AMQP Queue对象,它有一个名为(讽刺的是)name的属性。


mq.exchange(name: "myexchange") {

  Q = queue(name: null, routingKey: "test") {
    
  }
  
  println "queue name is: ${Q.name}"
  
}

创建Queue消费者

要处理传入的消息,您需要声明一个消费者。DSL对于如何将代码附加到接收到消息时执行的代码非常灵活。在底层,consume只是一个Spring AMQP SimpleMessageListenerContainer),而代表它的consume节点有两种不同的形式。

使用Groovy闭包

声明消费者的最简单方法就是使用闭包作为接收到消息时执行的方法。此闭包的唯一参数将是一个Spring AMQP Message对象。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
    }
    
  }
  
}

使用事件

RabbitMQ Groovy DSL实际上功能齐全,可以编写完整的生产应用程序,尽管本文主要关注于模拟应用程序。DSL的一个特性是事件的概念。事件在消息生命周期的特定固定时间点(发布消息之前和之后,以及发生错误时)分发,并且可以处理自定义事件作为消息消费者。

要声明事件处理程序,您可以使用on节点(根据约定,您可能希望将其放在源文件的顶部)。


mq.on   error: { err -> err.printStackTrace() },	
      myevent: { msg -> println "myevent: ${new String(msg.body)}" }

这声明了两个事件处理程序:一个用于处理发生的任何异常,另一个用于在我们接收到消息时委托给它。由于在这种情况下我们所做的只是将消息打印到System.out,因此我们可以轻松地在消费者之间共享代码。

为了让我们的消费者在接收到消息时使用此事件处理程序,我们使用consume节点的onmessage属性。


mq.on   error: { err -> err.printStackTrace() },	
      myevent: { msg -> println "myevent: ${new String(msg.body)}" }

mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume onmessage: "myevent"
    
  }
  
}

使用闭包、MessageListener或POJO

但是,您可以将onmessage属性设置为比字符串更多的内容。为了灵活性,您可以将其设置为以下之一:

继续监听消息

除非您的闭包或事件处理程序返回false或null值,否则您的消费者将继续监听消息。要保持您的消费者处于活动状态并等待消息,只需返回true或非null值。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
      // Keep listening for messages and don't exit
      return true
    }
    
  }
  
}

如果从闭包返回false或null,消费者将退出。


mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
      // I'm done with you, please exit
      return false
    }
    
  }
  
}

命令行执行器使用引用计数系统来确定是否有消费者仍然处于活动状态。从标准的Groovy闭包返回false或null将告诉调用者停止消费者的内部MessageListenerContainer。然而,使用MessageListener实现时需要注意的一点是,您必须自己关闭消费者。

consume DSL节点将返回一个特殊的Consumer对象,该对象公开了一个名为shutdown的方法,该方法负责关闭MessageListenerContainer。如果您自己实现MessageListener,则需要在希望消费者退出的任何时候调用此方法,否则系统将不知道何时完成,并且永远不会为您关闭消费者。

consume节点设置为一个变量,并在其上调用shutdown方法。


mq.exchange(name: "myexchange") {

	def consumer
	def listener = [
    onMessage: { msg ->
      println "Invoked from a standard MessageListener"
      consumer?.shutdown()
    }
  ] as MessageListener

	queue(name: null, routingKey: "test.key") {
		consumer = consume onmessage: listener
	}
  
}

发布消息

使用DSL发布消息与消耗它们一样简单。publish基本上有两种变体。


mq.exchange(name: "myexchange") {

  // Return a String, a byte array, or an instance of a Spring AMQP Message
	publish(routingKey: "test.key") {
		"this is from a publish"
	}

  // Write raw bytes to a ByteArrayOutputStream
	publish(routingKey: "test2.key", myHeaderValue: "customHeader", contentType: "text/plain") { out ->
		out.write("these are test bytes".bytes)
	}
  
}

在第一个示例中,我们返回一个字符串(我们也可以使用byte[]),用作消息的正文。在第二个示例中,我们设置了标准的邮件头(在本例中是contentType)以及自定义应用程序邮件头,并且我们可以写入一个传递给我们的闭包的ByteArrayOutputStream

将所有内容整合在一起

请注意,您不必将发布和消耗分成两个独立的文件。您可以将这两个功能放在一起,以便更好地直观地了解您最初要通过消息传递完成什么。


mq.on error: { err -> err.printStackTrace() }

mq.exchange(name: "myexchange") {

  queue(name: null, routingKey: "test") {
    consume { msg ->
      // Handle the message body here, which will always be a byte array
      String bodyAsString = new String(msg.body)
      println "msg body: ${bodyAsString}"
    }    
  }

	publish(routingKey: "test") {
		"this is from a publish"
	}
  
}

虽然我们专注于模拟那些以后可能使用纯Java(甚至完全是另一种语言)更健壮地构建的应用程序,但RabbitMQ DSL也适用于编写简单的维护应用程序,或任何需要消息消费和发布但又不想花费大量精力编写完整消息应用程序的消息传递应用程序。

我从哪里获取它?

RabbitMQ DSL可在GitHub上找到,并获得Apache许可。安装说明在README中。

感谢Joris Kuipers的最新贡献,您可以在Eclipse和STS中使用包含的 rabbitmq.dsld 来获得 一些IDE的自动完成支持

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有