快人一步
VMware 提供培训和认证,助您加速进步。
了解更多如果您听过 Oleg Zhurakousky 在 Spring One Platform 2019 大会上关于 Spring Cloud Stream & Functions 的演讲,或者阅读过他最近关于 Simplified Spring Cloud Stream 和 Functional Spring Cloud Stream 的博客文章,您可能会想问:“等等!Spring Integration 支持怎么了?我现在如何处理我的 @ServiceActivator
或 IntegrationFlow
?我过去常常将 Sink.input()
作为通道来使用一些 Spring Integration 逻辑消费 binder 目标!”正如 Oleg 在他的博客文章中提到的,使用现有的 @EnableBinding
等方式仍然可能,但我们正逐渐远离这种模型,那么在函数式 Spring Cloud Stream 的世界中,我们如何才能继续受益于 Spring Integration 的所有特性呢?
在这篇博客文章中,我将围绕 Spring Integration 进一步阐述 Spring Cloud Stream 的函数式特性,以及它在现代基于函数的流中的重要性!
是的,我们确实可以构建一个简单的 Function
桥,它可以调用 MessageChannel.send()
,但我们也可以使用 Spring Integration 中的 Messaging Gateway 抽象来实现,如下所示:
@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
extends Function<Message<InputData>, Message<OutputData>> { }
鉴于生成的 bean 是 java.util.function.Function
的扩展,它是一个完全有效的 Spring Cloud Function 和 Spring Cloud Stream 绑定候选。Spring Cloud Stream 利用其泛型输入/输出参数类型来执行适当的负载转换。此外,headers 从 binder 传递到下游 integration flow 并返回。这很好,但我们仍然需要了解通道并提供一些 SI 特定的注解来连接这种 gateway 与我们的 flow (样板代码)。
借助 Spring Integration 的 Java DSL,我们可以进一步减少样板代码,同时获得使用函数式 Spring Cloud Stream 的优势。我们需要的是与 gateway
类似的方法,但采用 DSL 风格。Oleg 博客文章中的 uppercase
示例如果使用 Spring Integration 将会如下所示:
@SpringBootApplication
public class SampleApplication {
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(Function.class,
gateway -> gateway.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.get();
}
}
虽然使用 Spring Integration 实现大小写转换用例可能显得有些简单,但想象一下,我们需要执行一些复杂的逻辑,比如 split
(拆分)、带有并行调用外部服务的 scatter-gather
(分散-聚集),然后 aggregate
(聚合)、进行审计,最后才从我们的函数返回结果到输出目标。所有这些以及更多功能都可以使用 Spring Integration、其 EIP 支持、Java DSL 抽象以及当然还有前面提到的函数包装器来实现。
java.util.function.Consumer
和 java.util.function.Supplier
接口也可以以类似的方式使用,根据它们的契约在 gateway 代理周围应用适当的逻辑。
您可以在 Spring Integration 的参考手册中找到有关函数支持的更多信息。
我们之前展示的所有内容都涉及命令式函数,它们是按事件触发的。而响应式函数则是通过将整个事件流作为一个 Flux
传递给函数来一次性触发的。Spring Integration 中的 Reactive Streams 支持帮助您编写响应式 Spring Integration flow,这些 flow 可以作为 Spring Cloud Stream 中的函数暴露出来。
以下示例展示了如何围绕响应式 Spring Integration 调用构建一个响应式函数包装器:
public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows.from(FluxFunction.class,
gateway -> gateway.beanName("uppercase"))
.handle(RSockets.outboundGateway("/uppercase")
.command(RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
虽然通过 RSocket 实现 uppercase
仍然显得简单,但此示例的目的是让您了解如何使用 Spring Integration 处理更复杂的用例。
在这里,我们获得一个传递给函数的 Flux
,并将其传播到 RSocket requester,用于 request channel
交互模型。结果 Flux
通过 Spring Integration 内部的 replyChannel
header 传递回函数返回。
另一个响应式示例可能类似于将数据从推送模型转移到拉取模型。换句话说,将事件流表示为 Supplier
:
@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
.toReactivePublisher();
}
@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
Publisher<Message<byte[]>> httpRequestPublisher) {
return () -> Flux.from(httpRequestPublisher);
}
这样,传入的 HTTP 请求会进入一个源 Flux
,供输出 binder 目标下游拉取,同时遵守背压(back-pressure)和其他 Reactive Streams 要求。
有关 Spring Integration 中 Reactive Streams 支持的更多信息,请参阅参考手册。
Spring Integration 仍然是 Spring Cloud Stream 微服务开发的重要组成部分。它的函数式支持使得属于企业集成模式类别的复杂用例能够作为 Java 函数暴露出来,从而在 Spring Cloud Stream 中提供一致的执行模型。事实上,通过使用这个基础,Spring Cloud Stream App Starters 最终将被函数实现所取代。
请随时提供任何反馈!
附注:对于那些对 Kotlin 感到期待的人,我想分享一个最近启动的Spring Integration Kotlin DSL 项目。