Spring Cloud Stream - 功能性和响应式

工程 | Oleg Zhurakousky | 2019年10月17日 | ...

上一篇文章中,我试图为我们在Spring Cloud Stream (SCSt) 中转向函数式编程模型提供理由。它代码更少配置更少。但最重要的是,您的代码与SCSt的内部完全解耦且独立

在这篇文章中,我将深入探讨并总结我们函数式支持的核心特性,特别是其响应式特性。

重要提示:您使用 @StreamListener/@EnableBinding 所能做的一切,都可以在没有它们的情况下完成。换句话说,函数式支持现在与基于注解的支持功能兼容。

虽然下面描述的所有特性都是 Spring Cloud Function (SCF)(SCSt 的一个依赖项)的特性,但要理解函数在 SCSt 上下文中的额外含义,必须注意某些细微差别。

Supplier、Function 和 Consumer

任何类型为 SupplierFunctionConsumer 的 bean,或任何可以映射到 SupplierFunctionConsumer 的 bean(例如 POJO 函数、Kotlin lambdas 等),都被 SCSt 视为消息处理器FunctionConsumer)或消息源Supplier)。根据所使用的函数式策略类型,输入和输出绑定会自动生成,使用以下命名约定 <函数名>-<in/out>-<索引>

考虑以下示例

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

前面的函数被视为一个消息处理器,它从 uppercase-in-0 消费并发送到 uppercase-out-0 绑定。其余的现有流属性可以像以前一样使用。例如 --spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain

点击此处了解更多详细信息和配置选项。

###命令式或响应式函数可以是命令式响应式。命令式函数在每个独立事件上触发,而响应式函数只触发一次,传递对由Project Reactor提供的整个事件流抽象(如FluxMono)的引用。

考虑以下一对示例

命令式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

响应式

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

除了为您提供一种不同的(单子)编程风格来处理事件(这很容易被视为偏好问题)之外,响应式编程在某些用例中增加了额外的价值,否则这些用例将相当复杂才能实现。虽然本文的范围不包括详细讨论这些用例或响应式模式,但状态管理用例(如窗口化、聚合和分组)仍然值得一提,以及拆分流或合并多个流的用例,我将在下一节中讨论这些用例。

关于响应式函数的绑定和命名规则,它们与上一节中解释的相同。

注意:尽管前面的示例使用 Function 作为示例,但相同的规则适用于 SupplierConsumer。用户指南的Spring Cloud Function 支持部分以及Reactor 文档提供了更多详细信息。

###函数元数 有时数据流需要分类和组织。例如,考虑一个经典的“大数据”用例,处理包含“订单”和“发票”的非组织数据,并且您希望将每个数据存储到单独的数据存储中。这就是函数元数(具有多个输入和输出的函数)支持发挥作用的地方。

让我们看一个这样的函数示例(完整的实现细节可在此处获取这里),

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用其 Tuple 库。Tuple 为我们提供了独特的优势,通过向我们传达基数类型信息。这两者在 SCSt 的上下文中都极为重要。基数让我们知道需要创建多少个输入和输出绑定并将其绑定到函数的相应输入和输出。对类型信息的认识确保了正确的类型转换。

此外,这就是绑定名称命名约定中“索引”部分发挥作用的地方,因为在此函数中,两个输出绑定名称是 organise-out-0organise-out-1

重要提示:目前,函数元数仅支持以复杂事件处理为中心的响应式函数(Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>),其中事件融合的评估和计算通常需要查看事件流而不是单个事件。

有关最新详细信息,请阅读用户指南的具有多个输入和输出参数的函数部分。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有