Spring Cloud Stream - 以及 Spring Integration。

工程 | Artem Bilan | 2019年10月25日 | ...

如果你听过 Oleg Zhurakousky 在 Spring One Platform 2019 上关于 Spring Cloud Stream & Functions 的演讲,或者读过他最近关于 简化 Spring Cloud Stream函数式 Spring Cloud Stream 的博文,你可能会说:“等等!Spring Integration 支持怎么了?我的 @ServiceActivatorIntegrationFlow 现在该怎么办?我以前把 Sink.input() 作为通道来消费绑定器目的地,并带有 Spring Integration 逻辑!” 正如 Oleg 在他的博文中所提到的,这仍然可以通过现有的 @EnableBinding 等方式实现,但我们正在摆脱这种模式,那么我们如何在函数式 Spring Cloud Stream 的世界中仍然受益于 Spring Integration 的所有功能呢?

在这篇博文中,我将结合 Spring Integration 及其在现代基于函数的流中的重要性,对 Spring Cloud Stream 的函数式特性进行阐述!

Spring Integration 作为一个函数?!

是的,我们确实可以创建一个简单的 Function 桥接,它会调用 MessageChannel.send(),但我们也可以使用 Spring Integration 中的 Messaging Gateway 抽象来实现,如下所示

@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
                  extends Function<Message<InputData>, Message<OutputData>> { }

鉴于生成的 bean 是 java.util.function.Function 的扩展,它是一个完全有效的 Spring Cloud Function 和 Spring Cloud Stream 绑定候选对象。Spring Cloud Stream 会使用其泛型输入/输出参数类型在之前和之后执行适当的有效负载转换。此外,头信息会从绑定器传递到下游集成流,然后再返回。这很好,但我们仍然需要了解通道并提供一些 SI 特定注解来将此类网关与我们的流连接起来(样板代码)。

使用 Spring Integration 的 Java DSL,我们可以进一步消除更多的样板代码,同时受益于使用函数式 Spring Cloud Stream。我们需要的是相同的 gateway 方法,但采用 DSL 风格。Oleg 博文中的 uppercase 示例使用 Spring Integration 将如下所示

@SpringBootApplication
public class SampleApplication  {

    @Bean
    public IntegrationFlow uppercaseFlow() {
        return IntegrationFlows.from(Function.class,
                             gateway -> gateway.beanName("uppercase"))
                   .<String, String>transform(String::toUpperCase)
                   .get();
    }
}

用 Spring Integration 实现大小写转换用例虽然很傻,但想象一下我们需要进行一些复杂逻辑,例如 splitscatter-gather(并行调用外部服务),然后 aggregate、进行一些审计,最后才将函数结果返回到输出目的地。所有这些以及更多都可以通过 Spring Integration 及其 EIP 支持、Java DSL 抽象以及当然是前面提到的函数包装器来实现。

java.util.function.Consumerjava.util.function.Supplier 接口可以以类似的方式使用,在其周围的网关代理中根据其契约提供适当的逻辑。

您可以在 Spring Integration 参考手册中查看有关函数支持的更多信息。

响应式流如何?

我们之前展示的所有内容都与命令式函数有关,它们是按事件触发的。响应式函数仅通过将整个事件流作为 Flux 传递给函数来触发一次。Spring Integration 中的响应式流支持可帮助您编写响应式 Spring Integration 流,这些流可以作为 Spring Cloud Stream 中的函数公开。

以下示例展示了如何围绕响应式 Spring Integration 调用构建响应式函数包装器

public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
                           ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows.from(FluxFunction.class,
                        gateway -> gateway.beanName("uppercase"))
            .handle(RSockets.outboundGateway("/uppercase")
                    .command(RSocketOutboundGateway.Command.requestStreamOrChannel)
                    .expectedResponseType(String.class)
                    .clientRSocketConnector(clientRSocketConnector))
            .get();
}

虽然在 RSocket 上实现 uppercase 仍然很愚蠢,但此示例的目标是让您了解如何使用 Spring Integration 解决更复杂的用例。

在这里,我们将一个 Flux 传递给一个函数,并将其传播到 RSocket 请求者以进行 request channel 交互模型。结果 Flux 通过 Spring Integration 内部的 replyChannel 头传回函数返回。

另一个响应式示例可能类似于将数据从模型传输到模型。换句话说,将事件流表示为 Supplier

@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
    return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
                    Publisher<Message<byte[]>> httpRequestPublisher) {
    return () -> Flux.from(httpRequestPublisher);
}

这样,传入的 HTTP 请求会进入一个源 Flux,供输出绑定器目的地向下游拉取,同时遵守背压和其他响应式流要求。

有关 Spring Integration 中响应式流支持的更多信息,请参阅参考手册

总结

Spring Integration 仍然是 Spring Cloud Stream 微服务开发的重要组成部分。它的函数式支持允许将属于企业集成模式类别的复杂用例公开为 Java 函数,从而在 Spring Cloud Stream 中提供一致的执行模型。事实上,通过使用这个基础,Spring Cloud Stream App Starters 最终将被函数实现所取代。

请随时提供任何反馈!

附言:对于那些不耐烦的 Kotlin 用户,我想分享一个最近启动的 Spring Integration Kotlin DSL 项目。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有