Spring Cloud Stream 应用程序的 Java 函数介绍 - 第 0 部分

版本发布 | David Turanski | 2020年7月13日 | ...

我们很高兴地宣布发布 Spring Cloud Stream applications 2020.0.0-M2。该版本是对遗留的 Spring Cloud Stream App Starters 的全面改造。从此版本开始,我们不再使用基于主题的版本列车名称(按字母顺序排列的著名科学家),而是转向基于日历的版本控制。当前的 GA 版本称为 Einstein,我们很高兴推出 2020.0.0-M2。我们也在弃用 app starters。在重组、重新打包并(在某些情况下)重写底层代码后,我们现在有了新的 Git 仓库: spring-cloud/stream-applications: Functions and Spring Cloud Stream Applications for data driven microservices

我们是怎么走到这一步的?

自 2016 年 Spring Cloud Data Flow 诞生以来,Spring 团队一直维护基于 Spring Cloud Stream 的预打包应用程序。这些是生产就绪的应用程序,建立在 Spring 和 Spring Integration 的成熟能力之上,可提供与常用开源数据存储、消息代理、在线服务和通信协议的开箱即用集成。实际上,我们提供用于企业开发人员构建关键业务系统的底层组件已经有十多年了。下面的时间表总结了从 Spring Integration 组件到预打包应用程序的演变过程。

Stream Applications Timeline

这些应用程序让您在使用 Data Flow 编排数据流管道或直接将它们用作数据微服务时可以快速启动。一如既往,秉承 Spring 的理念,我们为您承担繁重的工作,让您专注于业务逻辑。

其核心是,Spring Cloud Stream 应用程序是一个 Spring Boot 应用程序 (uber jar),它包含 Spring Cloud Stream binder 依赖。binder 暴露了一个服务提供者接口,用于抽象那些利用底层消息中间件(例如 Apache Kafka、RabbitMQ、Amazon Kinesis、Google Pub Sub 和 Solace)进行分布式通信的实现,从而对应用程序隐藏中间件的具体细节。因此,应用程序不知道它用于通信的底层中间件是什么。

这种架构使我们能够实现核心功能而无需 binder,形式为 Spring Cloud Stream App Starters。我们使用了一个定制的 Maven 插件来生成 Maven pom 文件(Apache Kafka binder 和 RabbitMQ binder 各一个),以及一个导入 app starter 配置的通用 Spring Boot main 类。该 pom 文件还包括 监控 和安全支持。预打包的 stream 应用程序可与 Apache Kafka 或 RabbitMQ 一起使用,并作为 Spring Boot 可执行 jar 和 Docker 镜像发布到公共仓库。

在许多方面,预打包的 stream 应用程序可以与 Kafka Connect 相媲美。虽然这不是直接的“苹果对苹果”比较,但 stream 应用程序可以用来替代 Kafka Connect 应用程序。Kafka Connect 应用程序需要 Apache Kafka 来生产和消费数据,而 Spring Cloud Stream 应用程序则可以使用各种中间件技术,包括前面提到的 Kafka。请注意,我们所有的预打包应用程序都是免费和开源的,而许多 Kafka connector 则需要商业许可。

有什么变化?

Java 函数

Java 和 Spring 生态系统的不断进步促使我们重新思考我们的方法。最显著的变化是我们实现了一种分层架构,其中以前由 app starters 提供的核心功能现在作为 Java 函数提供,实现了 java.util.function 包中的标准接口。

此版本中的函数式组件可以作为标准的 Spring bean 暴露,然后通过直接将其嵌入到应用程序中来满足您的数据集成需求。通过在自定义应用程序中注入这些函数,您可以立即受益于底层库提供的功能。例如,其中许多函数都使用了 Spring Integration 适配器。您可以直接调用该函数,使用 Spring Cloud Function 通过 REST 端点调用它,或在无服务器环境中使用它。与 app starters 不同,函数式组件不依赖于 Spring Cloud Stream。然而,它们现在是 stream 应用程序的核心组件。下图展示了组件和应用程序之间的关系

Stream Applications Layered Architecture

Stream 应用程序

通过使用 Spring Cloud Stream,我们可以利用 java.util.function 类型(Supplier、Function、Consumer)与 Spring Cloud Stream 概念(分别为 source、processor 和 sink)的逻辑等价性。像以前一样,我们使用新的改进的 Maven 插件 来生成 Spring Boot main 类、application.properties 文件以及具有内置 监控 和安全支持的特定于 binder 的 Maven pom。除少数例外,我们无需额外代码即可构建 Spring Cloud Stream 应用程序。

新方法具有以下几个优势

  • Spring Cloud Stream v3.x 引入了一个基于 Spring Cloud Function 的强大函数式编程模型。这种方法优于遗留的基于注解的模型(@EnableBinding, @StreamListener)。Spring Cloud Stream 可以直接绑定到 Function @Bean 的输入和输出。使用此模型,既不需要遗留的 Spring Cloud Stream 注解,也不需要 Source、Processor 和 Sink 接口。

下图通过使用打包为 Spring Cloud Stream 应用程序的简单函数来说明此概念。binder 实现和外部配置属性使应用程序能够通过消息代理进行通信,但应用程序代码不关心这些细节。Spring Cloud Stream 会在 time 主题收到消息时调用 ProcessorApplication 中的 helloTime 函数,并将其输出定向到 hello 主题。同样,SinkApplication 中的 printTime Consumer 会在 hello 主题收到消息时触发。但是是什么触发 SourceApplication 呢?您可能已经猜到了,Spring Cloud Stream 会自动配置一个轮询器 (poller),默认情况下每秒调用一次 currentTime Supplier。当然,这是可配置的。

Spring Cloud Stream Example

  • 函数式组件可以打包并部署到 Spring Cloud Stream 之外的广泛用途中,尤其是在 FaaS 环境中。

  • 函数式组件在适当的地方使用 Project Reactor 构建,以实现非阻塞的响应式流。

  • stream 应用程序(或任何使用函数式组件构建的 Spring Boot 应用程序)可以利用 Spring Cloud Function 的声明式函数组合功能。这意味着预打包的 stream 应用程序可以配置执行常见的转换和过滤操作,无需进行任何定制。

  • 新的 stream-applications Git 仓库是一个 monorepo。与 stream-cloud-app-starters 不同,后者每个应用程序都有自己的仓库,新的 stream-applications 仓库将所有内容(函数、应用程序和通用组件)包含在一个仓库中。这简化了依赖管理并允许原子提交。希望这一变化以及其他正在进行的努力能够让开发人员更容易使用,并鼓励社区贡献。

此版本包含什么?

以下是此版本中提供的各种函数和应用程序的部分列表

  • 供应商和来源 (Suppliers and sources): File, FTP, SFTP, AWS S3, HTTP, Geode, TCP, TIme, Twitter, Websocket, JDBC, JMS, RabbitMQ, MQTT。

  • 消费者和接收器 (Consumers and sinks): Analytics, Cassandra, File, FTP, Geode, JDBC, Log, Mongodb, MQTT, Rabbit, Redis, AWS S3, SFTP, TCP, Twitter, Wavefront, Websocket。

  • 函数和处理器 (Functions and processors): Filter, Header Enricher, HTTP Request, Tensorflow (图像识别、对象检测和语义分割), SpEL, Splitter, Task Launch Request, Task Launcher, Twitter。

查看完整的列表,请参考 Stream Applications README

这对现有用户意味着什么?

在许多情况下,新应用程序提供的功能与先前版本相当。在某些情况下(例如 Twitter),我们显著增强了功能。我们还合并并重命名了一些应用程序。简而言之,存在一些破坏性更改。值得注意的是,许多配置属性名称已根据需要更改,以反映与函数式组件的关联(例如,s3.supplier.remoteDir)。此外,这些应用程序可能无法与使用旧版本 Spring Cloud Stream 构建的 stream 应用程序一起使用。例如,使用旧版本 Spring Cloud Stream 构建的 source 不能保证与此版本中的 sink 一起工作。如果您正在使用先前版本中的预打包应用程序,则无需立即升级,除非您被迫利用一些新功能。Einstein 版本列车将进入维护模式,因此今后只包含错误修复。所有新开发都将应用于未来的版本。

如何贡献新的函数或应用程序?

如果您在现有的函数和应用程序目录中找不到所需的内容,请考虑 贡献。这样,整个开源社区都会受益。在后续文章中,我们将通过一个开发函数和 stream 应用程序的真实世界示例进行讲解。

我们鼓励社区参与此项目。我们有几个标记为 适合贡献 的开放议题。除了代码贡献,我们也非常感谢文档改进、创建议题以及给仓库加星标。

敬请关注…​

这篇博客是每周系列文章的第一篇,该系列将更详细地介绍此处引入的主题。在接下来的几周内,请期待更多的深度探讨和专题文章。我们将带您了解此仓库中包含的组件及其相关流程的整体情况。

Spring Cloud Stream 应用程序的 Java 函数介绍 - 第 1 部分

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助您飞速前进。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举办的活动

查看 Spring 社区即将举办的所有活动。

查看全部