领先一步
VMware 提供培训和认证,助您加速进步。
了解更多如果你听过 Oleg Zhurakousky 在 Spring One Platform 2019 上关于 Spring Cloud Stream & Functions 的演讲,或者读过他最近关于 简化 Spring Cloud Stream 和 函数式 Spring Cloud Stream 的博文,你可能会说:“等等!Spring Integration 支持怎么了?我的 @ServiceActivator 或 IntegrationFlow 现在该怎么办?我以前把 Sink.input() 作为通道来消费绑定器目的地,并带有 Spring Integration 逻辑!” 正如 Oleg 在他的博文中所提到的,这仍然可以通过现有的 @EnableBinding 等方式实现,但我们正在摆脱这种模式,那么我们如何在函数式 Spring Cloud Stream 的世界中仍然受益于 Spring Integration 的所有功能呢?
在这篇博文中,我将结合 Spring Integration 及其在现代基于函数的流中的重要性,对 Spring Cloud Stream 的函数式特性进行阐述!
是的,我们确实可以创建一个简单的 Function 桥接,它会调用 MessageChannel.send(),但我们也可以使用 Spring Integration 中的 Messaging Gateway 抽象来实现,如下所示
@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
extends Function<Message<InputData>, Message<OutputData>> { }
鉴于生成的 bean 是 java.util.function.Function 的扩展,它是一个完全有效的 Spring Cloud Function 和 Spring Cloud Stream 绑定候选对象。Spring Cloud Stream 会使用其泛型输入/输出参数类型在之前和之后执行适当的有效负载转换。此外,头信息会从绑定器传递到下游集成流,然后再返回。这很好,但我们仍然需要了解通道并提供一些 SI 特定注解来将此类网关与我们的流连接起来(样板代码)。
使用 Spring Integration 的 Java DSL,我们可以进一步消除更多的样板代码,同时受益于使用函数式 Spring Cloud Stream。我们需要的是相同的 gateway 方法,但采用 DSL 风格。Oleg 博文中的 uppercase 示例使用 Spring Integration 将如下所示
@SpringBootApplication
public class SampleApplication {
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(Function.class,
gateway -> gateway.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.get();
}
}
用 Spring Integration 实现大小写转换用例虽然很傻,但想象一下我们需要进行一些复杂逻辑,例如 split、scatter-gather(并行调用外部服务),然后 aggregate、进行一些审计,最后才将函数结果返回到输出目的地。所有这些以及更多都可以通过 Spring Integration 及其 EIP 支持、Java DSL 抽象以及当然是前面提到的函数包装器来实现。
java.util.function.Consumer 和 java.util.function.Supplier 接口可以以类似的方式使用,在其周围的网关代理中根据其契约提供适当的逻辑。
您可以在 Spring Integration 参考手册中查看有关函数支持的更多信息。
我们之前展示的所有内容都与命令式函数有关,它们是按事件触发的。响应式函数仅通过将整个事件流作为 Flux 传递给函数来触发一次。Spring Integration 中的响应式流支持可帮助您编写响应式 Spring Integration 流,这些流可以作为 Spring Cloud Stream 中的函数公开。
以下示例展示了如何围绕响应式 Spring Integration 调用构建响应式函数包装器
public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows.from(FluxFunction.class,
gateway -> gateway.beanName("uppercase"))
.handle(RSockets.outboundGateway("/uppercase")
.command(RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
虽然在 RSocket 上实现 uppercase 仍然很愚蠢,但此示例的目标是让您了解如何使用 Spring Integration 解决更复杂的用例。
在这里,我们将一个 Flux 传递给一个函数,并将其传播到 RSocket 请求者以进行 request channel 交互模型。结果 Flux 通过 Spring Integration 内部的 replyChannel 头传回函数返回。
另一个响应式示例可能类似于将数据从推模型传输到拉模型。换句话说,将事件流表示为 Supplier
@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
.toReactivePublisher();
}
@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
Publisher<Message<byte[]>> httpRequestPublisher) {
return () -> Flux.from(httpRequestPublisher);
}
这样,传入的 HTTP 请求会进入一个源 Flux,供输出绑定器目的地向下游拉取,同时遵守背压和其他响应式流要求。
有关 Spring Integration 中响应式流支持的更多信息,请参阅参考手册。
Spring Integration 仍然是 Spring Cloud Stream 微服务开发的重要组成部分。它的函数式支持允许将属于企业集成模式类别的复杂用例公开为 Java 函数,从而在 Spring Cloud Stream 中提供一致的执行模型。事实上,通过使用这个基础,Spring Cloud Stream App Starters 最终将被函数实现所取代。
请随时提供任何反馈!
附言:对于那些不耐烦的 Kotlin 用户,我想分享一个最近启动的 Spring Integration Kotlin DSL 项目。