Spring Cloud Stream 2.0 - 内容类型协商与转换

工程 | Oleg Zhurakousky | 2018年2月26日 | ...

这是为准备 Spring Cloud Stream 2.0.0.RELEASE 而发布的一系列预发布博文中的第一篇。

序言

Spring Cloud Stream 2.0 对基于通道的绑定器的内容类型协商进行了全面重构,以提高性能、灵活性,最重要的是一致性。接下来的博文将重点介绍一些已完成的关键点、预期以及它将如何帮助您。

引言

数据转换是任何消息驱动的微服务架构的核心功能之一。在 Spring Cloud Stream 中,此类数据表示为 Spring Message

在消息流(一个流)的各个点,消息可能需要在到达目的地之前被转换为所需的形状/大小。这需要有两个原因:

1. 将传入消息的内容线上传输格式转换为与应用程序提供的处理程序的签名匹配。2. 将传出消息的内容转换为下一个处理程序的签名(如果存在内部流)或转换回线上传输格式。

线上传输格式通常是 byte[],由绑定器实现控制。

在 Spring Cloud Stream Message 中,转换是通过 org.springframework.messaging.converter.MessageConverter 抽象实现的。

以下一系列步骤展示了典型的消息流以及 Message 经历的转换,使用了 Spring Cloud Stream 的 Processor 合约来描述,本质上涵盖了入站出站内容转换的需求。

1. 从绑定器接收一个线上传输格式的 Spring Message。2. 确保 Spring Message 中设置了输入 contentType 头。3. 将 Spring Message 线上传输格式转换为应用程序提供的 MessageHandler 的签名。4. 调用应用程序提供的 MessageHandler。5. 将 MessageHandler 的返回值转换回 Spring Message。6. 确保 Spring Message 中设置了输出 contentType 头。7. 将 Spring Message 转换回线上传输格式。 8. 将线上传输格式的 Spring Message 发送回绑定器。

虽然以上对典型消息流中的主要状态变化进行了全面总结,但细节往往是关键,让我们更仔细地查看每个步骤。

详情

  1. 传入消息由绑定器接收,并以线上传输格式发送到绑定器的输入通道(例如,Processor.INPUT')。
  2. 内部输入通道已预先配置了一个通道拦截器,用于在传入消息不包含 contentType 头时注入该头,仅当传入消息没有 contentType 头时才执行此操作。这是必需的,以确保在需要时,下游消息转换可以考虑 contentType(稍后详细介绍)。注入的 contentType 来自为每个单独的目标绑定设置的内容类型,默认为 application/json

例如,'spring.cloud.stream.bindings.myInput.content-type=text/plain' 为 'myInput'(入站)目标绑定设置了 'text/plain' 的内容类型。这意味着每条传入消息都将被注入 'contentType=text/plain' 头,除非消息本身已包含 'contentType' 头。 换句话说,由头提供的 contentType 优先于按绑定设置的 contentType。3. 现在,借助 HandlerMethodArgumentResolvers 和预配置或用户提供的 MessageConverters,传入消息将被转换为应用程序提供的 MessageHandler 的签名(例如,public Text process(Foo foo){..})。此类处理程序方法通常使用 @StreamListener@ServiceActivator@Transformer 等注解。在此过程中,某些转换器可能需要 contentType,而步骤 2 中的操作可确保此类消息始终通过其 contentType 头可用。当然,如果此类方法以 Message 作为其输入参数,则不会执行任何转换。4. 调用处理程序方法,成功后,将开始从处理程序方法的返回值创建传出消息的过程(假定处理程序方法不是 void)。5. 处理程序方法返回的值将被转换回 Spring Message,仅当返回值本身不是 Message 时才进行此操作。这意味着将创建一个新的 Spring Message,其载荷是处理程序的返回值。传入消息的头将被复制到新的传出消息中,但会剥离任何由'SpringIntegrationProperties.messageHandlerNotPropagatedHeaders'标识的头。默认情况下,只有一个头设置在此处——contentType。这意味着新创建的传出消息没有设置 contentType 头。这是为了确保 contentType 可以随着实际数据的应用程序级转换而演变。注意:仅当处理程序方法返回非 Message 对象时,才会剥离 contentType 消息将被发送到绑定器的输出通道。6. 与绑定器的输入通道类似,绑定器的输出通道(例如,Processor.OUTPUT)也预先配置了通道拦截器。在这里,我们可以选择性地将 contentType 头注入传出消息,为将传出消息的内容转换回线上传输格式做准备。我们来看仅有的两种可能场景:a. *传出 Message 有一个设置好的 contentType*。由于头设置的 contentType 优先于任何其他 contentType,因此不会执行任何 contentType 注入,并且头设置的 contentType 的值将在转换回线上传输格式时使用。b. *传出 Message 没有设置 contentType*。绑定 contentType(默认值或提供的值)将被注入为头,并将在转换回线上传输格式时使用。7. 使用可用的 MessageConverters 之一将消息转换为线上传输格式。8. 转换后的消息被发送回绑定器,保留注入的或存在的 contentType 头。换句话说,传出消息将始终具有 contentType 头。

自定义

以上涵盖了默认的开箱即用行为。但这可能不够,那么我们是否可以,如果可以,如何自定义? 2.0 版本中内容类型协商改进的目标不仅是为了回答这类问题,而且是为了确保答案是一致的——入站和出站通道拦截器用于转换到/从线上传输格式的 'MessageConverters' 是与 'HandlerMethodArgumentResolvers' 用于转换到/从强类型的 'MessageConverters' 相同的

要添加自定义MessageConverter,只需创建一个 org.springframework.messaging.converter.MessageConverter 的实现,将其配置为 @Bean,并为该 Bean 添加 @StreamMessageConverter 注解,它将被添加为现有MessageConverters堆栈中的第一个转换器,本质上优先于现有的MessageConverters

总结

希望现在应该很清楚,任何和所有内容类型转换都是由 MessageConverters 完成的。虽然 MessageConverters 的实现各不相同,但大多数都利用 contentType 头和目标类型(targetClass),这使得它们能够执行类内转换以及到/从线上传输格式的转换。目前有一组预配置的 MessageConverters 来支持大多数用例,因此对于大多数典型数据类型(例如,json、文本等),最终用户实际上不需要做任何事情。尽管如此,了解现在的工作方式与如何自定义仍然是有益的——自定义现有的和/或引入新的 MessageConverter 实现

结论

我们目前正在更新文档,其中将包含有关此以及 2.0 版本中进行的其他许多主题的更多详细信息和示例。此预发布博文系列的主要目标是提高大家的认识,促进“试用”并征求反馈。因此;Spring Cloud Stream 2.0.0.RC1 可在 此处 获取。

我们鼓励您使用以下任一方式提供反馈:

祝您使用愉快!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有