使用 Spring Cloud Stream 和 Apache Kafka Streams 进行流处理。第一部分 - 编程模型

工程 | Soby Chacko | 2019 年 12 月 02 日 | ...

这是系列博客文章的第一篇,我们将探讨如何使用 Spring Cloud Stream 和 Kafka Streams 编写流处理应用程序。

Spring Cloud Stream Horsham Release (3.0.0) 引入了应用程序使用适用于 Kafka 和 Kafka Streams 的绑定器 (binder) 来利用 Apache Kafka 的几种方式的改变。此版本带来的主要增强功能之一是使用完全函数式编程范式编写应用程序的一流支持。这篇博客文章介绍了如何使用这种函数式编程模型来开发使用 Spring Cloud Stream 和 Kafka Streams 的流处理应用程序。在本系列的后续博客文章中,我们将深入探讨更多细节。

Spring Cloud Stream 下有多少种 Kafka 绑定器 (binder)?

这通常是一个令人困惑的问题:如果我想编写基于 Apache Kafka 的应用程序,我应该使用哪种绑定器 (binder)。Spring Cloud Stream 为 Kafka 提供了两种独立的绑定器 - spring-cloud-stream-binder-kafkaspring-cloud-stream-binder-kafka-streams。正如它们的名称所示,如果您想编写使用普通 Kafka 生产者和消费者的标准事件驱动应用程序,您会使用第一种。另一方面,如果您想使用 Kafka Streams 库开发流处理应用程序,请使用第二种绑定器。再次强调,在本篇博客文章中,我们将重点介绍用于 Kafka Streams 的第二种绑定器。

关于这个博客系列的一个一般性说明。本系列主要关注 Spring Cloud Stream 和 Kafka Streams 之间的接触点,而不深入探讨 Kafka Streams 本身的细节。为了编写使用 Kafka Streams 的非平凡流处理应用程序,强烈建议深入理解 Kafka Streams 库。本系列仅停留在实际 Kafka Streams 库的外围,主要关注如何从 Spring Cloud Stream 的角度与其交互。

启动一个 Spring Cloud Stream Kafka Streams 应用程序

其核心在于,所有 Spring Cloud Stream 应用程序都是 Spring Boot 应用程序。要启动一个新项目,请访问 Spring Initializr,然后创建一个新项目。选择 “Cloud Stream”“Spring for Apache Kafka Streams” 作为依赖项。这将生成一个包含您开始开发应用程序所需所有组件的项目。这是来自 Initializr 的截图,其中选择了基本依赖项。

spring-initializr-kafka-streams

给我一个简单示例,展示如何使用 Spring Cloud Stream 快速编写 Kafka Streams 应用程序

以下是一个非常基础但功能完备的 Kafka Streams 应用程序,它使用 Spring Cloud Stream 的函数式编程支持编写。

@SpringBootApplication
public class SimpleConsumerApplication {

   @Bean
   public java.util.function.Consumer<KStream<String, String>> process() {

       return input ->
               input.foreach((key, value) -> {
                   System.out.println("Key: " + key + " Value: " + value);
               });
   }
}

如您所见,这是一个非常简单的应用程序,仅打印到标准输出,但它仍然是一个完整的 Kafka Streams 应用程序。在外层,我们使用 @SpringBootApplication 注解表明这是一个 Boot 应用程序。然后,我们提供一个 java.util.function.Consumer bean,通过 lambda 表达式封装应用程序的逻辑。该消费者以一个 KStream 作为输入,其键和值都表示为 String 类型。

就是这样。您可以针对 Kafka broker 运行此应用程序,并看到其运行效果。在幕后,Spring Cloud Stream 的 Kafka Streams 绑定器 (binder) 会将其转换为一个完整的 Kafka Streams 应用程序,包含 StreamsBuilder、Kafka Streams 拓扑等。Spring Cloud Stream 的主要原则之一是向用户隐藏复杂性和样板代码,以便应用程序开发人员可以专注于手头的业务问题。绑定器 (binder) 会负责创建 Kafka Streams 拓扑、连接到 Kafka 集群、绑定到主题以及从该 Kafka 主题消费数据(在本例中作为 KStream 绑定)。通常,如果他们不使用像 Spring Cloud Stream 这样的框架,完成所有这些事情是应用程序开发人员的责任。

等一下,你确定这能工作吗?

如果您了解 Kafka Streams 的内部原理,您可能会想知道上面展示的内容是否能工作。我们没有提供 Kafka Streams 所需的许多基本信息(例如集群信息、应用程序 ID、要消费的主题、要使用的 Serdes 等)。简短的回答是,即使不提供任何配置属性,它也能工作。这是因为绑定器 (binder) 将使用许多合理的默认值,并就消费哪些主题等做出推断。尽管如此,对于生产环境使用,如果绑定器使用的默认值不合适,我们建议提供所有适用的属性。

让我们看看 Kafka Streams 所需的一些基本信息,以及绑定器如何为它们提供默认值。

集群信息

默认情况下,绑定器会尝试连接到运行在 localhost:9092 上的集群。如果不是这种情况,您可以使用 Spring Cloud Stream 提供的配置属性覆盖它。请参阅 Spring Cloud Stream 参考指南

应用程序 ID

在 Kafka Streams 应用程序中,application.id 是一个必需字段。没有它,您无法启动 Kafka Streams 应用程序。默认情况下,绑定器将生成一个应用程序 ID 并将其分配给处理器。它使用函数 bean 名称作为前缀。例如,如果您有一个如上的消费者,绑定器将生成 application ID 为 process-applicationId。您可以使用此处概述的策略来覆盖它。请参阅 Spring Cloud Stream 参考指南

要消费的主题

对于上面的处理器,您可以按如下方式提供要消费的主题:

spring.cloud.stream.bindings.process-in-0.destination: my-input-topic

在这种情况下,我们表示函数 bean (process) 及其第一个输入 (in-0) 将绑定到名为 my-input-topic 的 Kafka 主题。如果您不提供这样的显式目标,绑定器会假定您正在使用一个与绑定名称(在本例中为 process-in-0)相同的主题。

序列化和反序列化 (Serdes)

Kafka Streams 使用一个称为 Serde 的特殊类来处理数据编组 (marshaling)。它本质上是入站反序列化器和出站序列化器的包装。通常,您必须告诉 Kafka Streams 每个消费者使用哪个 Serde。然而,绑定器 (binder) 通过使用作为 Kafka Streams 一部分提供的参数化类型来推断此信息。例如,对于 KStream<String, String> 的情况,绑定器假定需要使用 String 反序列化器。与往常一样,您可以通过多种方式覆盖这些设置。在本系列的后续博客文章中,我们有一整篇专门讨论这个主题。

我可以在单个 Boot 应用程序中拥有多个处理器吗?

是的,可以。Spring Cloud Stream 用于 Kafka Streams 的绑定器 (binder) 可以轻松地在单个应用程序中提供多个表示为 java.util.function.Functionjava.util.function.Consumer bean 的处理器。绑定器会为每个此类处理器隔离其自己的应用程序 ID 和 StreamsBuilder。它确保它们之间不会互相干扰。从 Kafka Streams 的角度来看,它们是具有各自专用拓扑的多个处理器。虽然在测试和快速尝试某些东西时这是一个合理的用例,但在单个应用程序中拥有多个处理器可能会使其成为一个更难维护的单体。

总结

在这篇博客文章中,我们快速介绍了如何使用 Spring Cloud Stream 的函数式编程支持来编写使用 Kafka Streams 的流处理应用程序。我们看到绑定器 (binder) 负责处理许多基础设施和配置细节,让您能够专注于手头的业务逻辑。在下一篇博客文章中,我们将进一步探讨这种编程模型,看看如何使用 Spring Cloud Stream 和 Kafka Streams 开发更复杂的流处理应用程序。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

获得优势

VMware 提供培训和认证,助力您快速进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部