Spring Cloud Stream - 与 Spring Integration。

工程 | Artem Bilan | 2019 年 10 月 25 日 | ...

如果您听过 Oleg Zhurakousky 在 Spring One Platform 2019 大会上关于 Spring Cloud Stream & Functions 的演讲,或者阅读过他最近关于 Simplified Spring Cloud StreamFunctional Spring Cloud Stream 的博客文章,您可能会想问:“等等!Spring Integration 支持怎么了?我现在如何处理我的 @ServiceActivatorIntegrationFlow?我过去常常将 Sink.input() 作为通道来使用一些 Spring Integration 逻辑消费 binder 目标!”正如 Oleg 在他的博客文章中提到的,使用现有的 @EnableBinding 等方式仍然可能,但我们正逐渐远离这种模型,那么在函数式 Spring Cloud Stream 的世界中,我们如何才能继续受益于 Spring Integration 的所有特性呢?

在这篇博客文章中,我将围绕 Spring Integration 进一步阐述 Spring Cloud Stream 的函数式特性,以及它在现代基于函数的流中的重要性!

Spring Integration 作为函数?!

是的,我们确实可以构建一个简单的 Function 桥,它可以调用 MessageChannel.send(),但我们也可以使用 Spring Integration 中的 Messaging Gateway 抽象来实现,如下所示:

@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
                  extends Function<Message<InputData>, Message<OutputData>> { }

鉴于生成的 bean 是 java.util.function.Function 的扩展,它是一个完全有效的 Spring Cloud Function 和 Spring Cloud Stream 绑定候选。Spring Cloud Stream 利用其泛型输入/输出参数类型来执行适当的负载转换。此外,headers 从 binder 传递到下游 integration flow 并返回。这很好,但我们仍然需要了解通道并提供一些 SI 特定的注解来连接这种 gateway 与我们的 flow (样板代码)。

借助 Spring Integration 的 Java DSL,我们可以进一步减少样板代码,同时获得使用函数式 Spring Cloud Stream 的优势。我们需要的是与 gateway 类似的方法,但采用 DSL 风格。Oleg 博客文章中的 uppercase 示例如果使用 Spring Integration 将会如下所示:

@SpringBootApplication
public class SampleApplication  {

    @Bean
    public IntegrationFlow uppercaseFlow() {
        return IntegrationFlows.from(Function.class,
                             gateway -> gateway.beanName("uppercase"))
                   .<String, String>transform(String::toUpperCase)
                   .get();
    }
}

虽然使用 Spring Integration 实现大小写转换用例可能显得有些简单,但想象一下,我们需要执行一些复杂的逻辑,比如 split(拆分)、带有并行调用外部服务的 scatter-gather(分散-聚集),然后 aggregate(聚合)、进行审计,最后才从我们的函数返回结果到输出目标。所有这些以及更多功能都可以使用 Spring Integration、其 EIP 支持、Java DSL 抽象以及当然还有前面提到的函数包装器来实现。

java.util.function.Consumerjava.util.function.Supplier 接口也可以以类似的方式使用,根据它们的契约在 gateway 代理周围应用适当的逻辑。

您可以在 Spring Integration 的参考手册中找到有关函数支持的更多信息。

响应式流呢?

我们之前展示的所有内容都涉及命令式函数,它们是按事件触发的。而响应式函数则是通过将整个事件流作为一个 Flux 传递给函数来一次性触发的。Spring Integration 中的 Reactive Streams 支持帮助您编写响应式 Spring Integration flow,这些 flow 可以作为 Spring Cloud Stream 中的函数暴露出来。

以下示例展示了如何围绕响应式 Spring Integration 调用构建一个响应式函数包装器:

public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
                           ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows.from(FluxFunction.class,
                        gateway -> gateway.beanName("uppercase"))
            .handle(RSockets.outboundGateway("/uppercase")
                    .command(RSocketOutboundGateway.Command.requestStreamOrChannel)
                    .expectedResponseType(String.class)
                    .clientRSocketConnector(clientRSocketConnector))
            .get();
}

虽然通过 RSocket 实现 uppercase 仍然显得简单,但此示例的目的是让您了解如何使用 Spring Integration 处理更复杂的用例。

在这里,我们获得一个传递给函数的 Flux,并将其传播到 RSocket requester,用于 request channel 交互模型。结果 Flux 通过 Spring Integration 内部的 replyChannel header 传递回函数返回。

另一个响应式示例可能类似于将数据从推送模型转移到拉取模型。换句话说,将事件流表示为 Supplier

@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
    return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
                    Publisher<Message<byte[]>> httpRequestPublisher) {
    return () -> Flux.from(httpRequestPublisher);
}

这样,传入的 HTTP 请求会进入一个源 Flux,供输出 binder 目标下游拉取,同时遵守背压(back-pressure)和其他 Reactive Streams 要求。

有关 Spring Integration 中 Reactive Streams 支持的更多信息,请参阅参考手册

总结

Spring Integration 仍然是 Spring Cloud Stream 微服务开发的重要组成部分。它的函数式支持使得属于企业集成模式类别的复杂用例能够作为 Java 函数暴露出来,从而在 Spring Cloud Stream 中提供一致的执行模型。事实上,通过使用这个基础,Spring Cloud Stream App Starters 最终将被函数实现所取代。

请随时提供任何反馈!

附注:对于那些对 Kotlin 感到期待的人,我想分享一个最近启动的Spring Integration Kotlin DSL 项目。

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

快人一步

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 通过一份简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区中的所有即将到来的活动。

查看全部