领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多在上一篇博文中,我尝试为我们在Spring Cloud Stream (SCSt) 中转向函数式编程模型提供理由。它代码更少,配置更少。但最重要的是,您的代码与 SCSt 的内部完全解耦和独立。
在这篇博文中,我将深入探讨并总结我们函数式支持的核心功能,特别是围绕其响应式功能。
重要提示:您可以使用
@StreamListener/@EnableBinding
完成的所有操作,也可以在没有它们的情况下完成。换句话说,函数式支持现在与基于注释的支持在功能上兼容。
虽然下面描述的所有功能都是Spring Cloud Function (SCF)(SCSt 的一个依赖项)的功能,但必须注意某些细微差别才能理解函数在 SCSt 上下文中附加的含义。
任何类型为Supplier
、Function
或Consumer
的 Bean,或任何可以映射到Supplier
、Function
或Consumer
的 Bean(例如 POJO 函数、Kotlin lambda 等),都被 SCSt 视为消息处理程序(Function
或Consumer
)或消息源(Supplier
)。根据所使用的函数式策略类型,使用以下命名约定<function-name>-<in/out>-<index>
自动生成输入和输出绑定。
考虑以下示例
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
前面的函数被视为一个消息处理程序,它从uppercase-in-0
消费并发送到uppercase-out-0
绑定。其余现有的流属性可以像以前一样使用。例如--spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain
。
点击此处以获取更多详细信息和配置选项。
###命令式或响应式函数可以是命令式或响应式。命令式函数在每个单独事件上触发,而响应式函数触发一次,将对整个事件流抽象(例如Flux
和Mono
)的引用传递给由Project Reactor提供。
考虑以下一对示例
命令式
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
}
响应式
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux.map(value -> value.toUpperCase());
}
}
除了为您提供一种不同的(单子)编程风格来处理事件(这很容易被视为偏好问题)之外,响应式编程还为某些用例增加了额外的价值,否则这些用例的实现将非常复杂。虽然讨论这些用例或响应式模式的细节超出了本文的范围,但状态管理用例(例如窗口化、聚合和分组)以及拆分流或合并多个流的用例仍然值得一提,我将在下一节中讨论这些用例。
关于响应式函数的绑定和命名规则,与上一节中解释的相同。
注意:虽然前面的示例使用
Function
作为示例,但相同的规则也适用于Supplier
和Consumer
。用户指南中的Spring Cloud Function 支持部分以及Reactor 文档提供了更多详细信息。
###函数元数有时需要对数据流进行分类和组织。例如,考虑一个处理非组织数据的经典大数据用例,例如包含“订单”和“发票”,并且希望每个都进入一个单独的数据存储。这就是函数元数(具有多个输入和输出的函数)支持发挥作用的地方。
让我们看一个此类函数的示例(完整的实现细节可在此处找到),
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用其 Tuple 库。元组通过向我们传达基数和类型信息来提供独特的优势。两者在 SCSt 的上下文中都非常重要。基数让我们知道需要创建多少个输入和输出绑定以及绑定到函数的相应输入和输出。了解类型信息可确保正确的类型转换。
此外,这就是绑定名称命名约定中“索引”部分发挥作用的地方,因为在此函数中,两个输出绑定名称为organise-out-0
和organise-out-1
。
重要提示:目前,函数元数仅支持以复杂事件处理为中心的响应式函数(
Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>
),其中对事件汇合的评估和计算通常需要查看事件流而不是单个事件。
有关最新详细信息,请阅读用户指南中的具有多个输入和输出参数的函数部分。