Spring Cloud Stream Kafka 应用中的事务简介

工程 | Soby Chacko | 2023 年 9 月 27 日 | ...

我们正在开始一个新的博客系列,该系列专注于处理 Spring Cloud Stream Kafka 应用中的事务。本博客系列涵盖了使用 Spring Cloud Stream 和 Apache Kafka 编写事务性应用程序的许多底层细节。希望通过本博客系列,我们能为您提供足够的信息,以便为各种业务用例编写事务性 Spring Cloud Stream Kafka 应用程序。

基本构建块

Spring Cloud Stream Kafka 应用程序中的事务基础支持主要来自 Apache Kafka 本身和 Spring for Apache Kafka 库。然而,本系列博客主要关注的是将此支持专门用于 Spring Cloud Stream。如果您熟悉 Apache Kafka 中的事务工作原理以及 Spring for Apache Kafka 如何使其在 Spring 中易于使用,那么本系列会让您感到宾至如归。

虽然 Apache Kafka 提供了基础的事务支持,但 Spring for Apache Kafka (又称 Spring Kafka) 库在 Spring 端扩展了此支持,通过利用 Spring 框架中传统的事务支持,使其对 Spring 开发者来说更加自然。Spring Cloud Stream 中的 Kafka 绑定器在此基础上进一步构建,使得在 Spring Cloud Stream Kafka 应用程序中可以使用相同的支持。在本系列博客的第一部分,我们将简要介绍 Kafka 事务,分析一些需要依赖事务的用例,以及 Apache Kafka 和 Spring 生态系统中的事务构建块。

在 Apache Kafka 中,存在许多需要以事务方式发布、消费和处理记录的用例。当在生产者发起的应用程序或以事务方式实现“消费-处理-发布”模式的进程中生成事务性记录时,这些记录会被原子地写入 Kafka。如果出现任何问题,整个过程将被回滚,并且事务不会被提交。需要注意的一点是,与支持事务的关系型数据库不同,在关系型数据库中,如果发生事务回滚,则不会持久化任何记录;而在 Apache Kafka 中,记录仍会被发布到主题分区。这是因为 Apache Kafka fundamentally 是一个基于日志的、不可修改的、仅追加的架构,它不允许修改记录,例如在将记录添加到记录日志后将其移除。有人可能会想,既然在事务被中止时记录可能被发布到主题分区,从而可能导致消费者看到它们,那么使用事务有什么好处呢?然而,具有适当的隔离级别的消费者即使在回滚事务的记录位于主题分区中时,也不会看到已回滚的记录。因此,从端到端的角度来看,整个过程被保证是完全事务性的。

事务性用例

事务通常会增加 Kafka 应用程序的显着开销。在使用 Apache Kafka 的事务时,每条记录都必须添加特殊的事务日志,并将事务标记发送到特殊的主题状态主题等。所有这些步骤都需要时间和空间,增加了整体延迟。因此,每个应用程序都必须通过分析用例来仔细权衡对事务支持的需求。

事务主要提供了一种保护数据的方法,以提供ACID 功能。它通过提供原子性、一致性、数据隔离和持久性来确保数据完整性。

当今企业中有许多关键任务用例,在这些用例中,使用事务并依赖其带来的 ACID 语义是高度可取的。关于何时使用事务并证明其带来的开销是合理的,没有简单明了的答案。您必须查看应用程序并评估其风险。事务的典型经典示例是任何需要处理金融数据的场景。Bob 向 Alice 发送钱,这一操作会从 Bob 的账户中扣款,Alice 的账户会得到记账。如果在此过程中出现任何问题,整个过程将被回滚,仿佛什么都没有发生,因为我们不希望流程处于混乱状态。如果该过程从 Bob 的账户扣款,但 Alice 未收到记账(或反之),那将是一个问题。从 Apache Kafka 的角度来看,这里有几件事正在发生。首先,消息来到 Kafka 处理器,用于从 Bob 的账户扣款以及接收方信息。处理器处理信息,然后发送一条消息到另一个主题,表明 Bob 的账户已发生扣款。在此之后,另一条消息表明 Alice 已收到记账。此过程中的各种操作需要复杂的协调,以确保一切按预期进行。每当我们有多个相关事件时,事务可能有助于确保数据完整性并提供 ACID 语义。在此示例中,单个事件本身意义不大,但它们共同构成了整个流程,并需要事务性来确保数据完整性。

如果我们想概括这种模式,我们可以说,任何时候我们有一个“消费-处理-发布”模式,它是关键任务的,其中如果一个组件失败,整个处理器都需要表现得好像它从未发生过一样,那么使用事务是一个值得考虑的潜在解决方案。

来自其他领域的更多高级示例

  • 想象一个航空订票系统,它需要发布包含多个航段的预订信息。如果由于任何原因,系统无法发布整个预订,则需要中止该过程并重新开始。
  • 一家经纪公司向结算中心发送包含多个买入订单的请求。假设该进程无法将单个订单作为单一原子单元发布到结算中心正在消费的消息系统中。在这种情况下,经纪公司必须重新发送订单。
  • 一家发送患者测试数据的医疗计费系统到保险公司,必须将患者的各种相关测试发布到消息系统中。
  • 一个在线游戏系统需要跟踪玩家在游戏中的位置,并将它们事务性地发送到一个中央服务器,以确保所有玩家看到正确的坐标,而不是部分更新的位置。
  • 零售商的库存补货系统需要将各种相关产品状态的信息作为单一原子单元发送。
  • 一个在线电子商务订单系统,在一次原子聚合操作中发布订单详情(如订单条目、账户持有人信息、运输信息等)。

与外部数据库同步

另一个事务变得很有用的用例是,当您必须与其他事务性系统同步时。除了发布到 Kafka 之外,假设您还必须在关系型数据库中持久化记录或某些派生信息,所有这些都在一次原子操作中完成。如果一个系统未能发送数据,我们必须回滚。如果您每次只发布一条记录到 Kafka,并且没有其他操作,也没有其他相关操作,那么您就不需要使用事务,这一点我们将在本系列博客的下一部分中看到。然而,即使您只发布到 Kafka 主题一次,但将关系型数据库操作作为同一过程的一部分,使用事务就变得必要,以确保数据完整性。

发布到多个 Kafka 主题

在仅生产者应用程序中,事务的另一个用例是发布到多个 Kafka 主题。假设您有一些关键的业务数据,形式为重要通知(例如订单详情),您希望将其发布到多个 Kafka 主题,订单详情的一部分发布到订单主题,另一部分发布到运输主题。在这种情况下,我们可以使用事务来确保端到端的数据完整性。

对上述事务性用例的概括

上述用例列表并非详尽无遗,其中事务是必需的。在当今的企业中,存在许多其他用例,它们与我们所考察的用例的总体方向并无太大差异,这些用例需要消息系统的事务性处理。

以下列表总结了 Apache Kafka 中事务可能有所帮助的通用用例:

  • “消费-处理-发布”系统,需要将记录作为单一原子单元发布,并提供精确一次语义的交付保证。
  • 多个相关的发布事件,单独来看没有意义。
  • 将数据作为单一原子单元发布到多个主题。
  • 与外部事务管理器同步。

这里是所有这些不同情况的图示表示。它涵盖了我们上面考虑的场景,例如“消费-处理-发布”、“多个生产者”、“与外部事务同步”等。处理器从入站主题消费数据,执行业务逻辑,将某些信息持久化到数据库系统,然后发布到多个 Kafka 主题。

scst-kafka-txn-overview

Apache Kafka 中的事务

有大量的文献可以研究 Apache Kafka 中事务工作原理的底层细节,本文可以为您介绍这些细节。但是,从非常高的层面简要了解实现事务性的 Kafka 客户端 API 仍然是值得的。需要注意的一点是,对于普通消费者来说,Kafka 中没有所谓的事务性消费者,但有感知事务的消费者。消费者通过设置隔离级别来实现这种事务感知。默认情况下,Kafka 中的消费者可以看到所有记录,包括上游生产者未提交的记录,因为 Kafka 消费者中的默认隔离级别是 **read_uncommitted**。Kafka 消费者必须使用 **read_committed** 隔离级别才能提供端到端的事务语义。我们将在本系列博客的后续章节中了解如何在 Spring Cloud Stream 中实现这一点。

在生产者方面,应用程序可以依赖 Kafka 客户端提供的几个 API 方法。让我们来看一下重要的几个。

为了使应用程序具有事务性,Kafka 客户端需要一个事务 ID。应用程序通过一个名为 **transactional.id** 的 Kafka 生产者属性来提供此 ID,事务协调器使用此 ID 来通过注册它来启动事务。事务协调器使用此 ID 来跟踪事务的所有方面,例如初始化、进行中、提交等。

以下列表总结了关键的事务相关生产者 API 方法。

Producer#initTransactions() - 对每个生产者调用一次以启动事务支持。初始化 Kafka 事务。

Producer#beginTransaction() - 在发送记录之前开始事务。

Producer#sendOffsetsToTransaction() - 将已消费记录的偏移量发送到事务。

Producer#commitTransaction() - 提交事务。

Producer#abortTransaction() - 中止事务。

在发送记录之前,我们需要初始化并开始事务。然后,它继续进行数据处理。如果我们消费了一条记录来执行此发布,我们必须使用生产者将已消费记录的偏移量发送到事务。在此之后,事务提交或中止操作可以继续(commitTransaction 或 abortTransaction)。当我们调用 commitTransaction 方法时,Kafka 客户端就会将偏移量原子地发送到 consumer_offsets 主题。

Spring for Apache Kafka 中的事务支持

当使用 Spring for Apache Kafka 或依赖于它的 Spring Cloud Stream Kafka 绑定器等框架时,它们带来了让应用程序主要关注业务逻辑的好处,因为框架处理了我们上面看到的低级样板事务序列。使用 Spring for Apache Kafka 或另一个框架(如使用它的 Spring Cloud Stream)是有益的,因为它使我们不必担心编写低级样板序列(如上所述)来确保所有事务步骤都成功。正如您可以想象的那样,这里有很多移动的部分,如果您遗漏了一个步骤或没有按照预期执行一个步骤,它可能会导致应用程序容易出错。在 Spring 的情况下,我们提到的框架代表应用程序开发人员处理这些问题。让我们简要看看它是如何做到的。

Spring for Apache Kafka 框架通过提供一种熟悉 Spring 开发者的统一事务编程模型来隐藏所有这些底层细节。结果是,当使用 Spring for Apache Kafka 或其他框架(如 Spring Cloud Stream)时,应用程序可以简单地专注于应用程序的业务逻辑,而不是处理复杂的低级事务相关问题。

KafkaTransactionManager

Spring for Apache Kafka 是如何提供这种统一事务编程模型的?简而言之,Spring 开发者传统上使用 @Transactional 注解或编程方法,例如直接在应用程序中使用 TransactionTemplate 来创建本地事务。这些机制需要一个事务管理器实现来驱动事务性方面。Spring for Apache Kafka 提供了一个事务管理器实现。**KafkaTransactionManager** 是 Spring 框架中 **PlatformTransactionManager** 的一个实现。您可以将此事务管理器与 @Transactional 注解一起使用,或者通过使用 TransactionTemplate 在本地事务中使用。KafkaTransactionManager 使用生产者工厂来创建 Kafka 生产者,并提供用于开始、提交和回滚事务的 API。

KafkaResourceHolder

Spring for Apache Kafka 还提供了一个 **KafkaResourceHolder**,它持有 Kafka 生产者资源。Spring for Apache Kafka 中的 KafkaTemplate 会触发一个 KafkaResourceHolder 在当前线程上绑定到一个给定的生产者工厂。在消费者发起的事务中,消息监听器容器执行此绑定,并且生产者工厂与 KafkaTransactionManager 使用的工厂相同。这样,事务就可以为所有发布需求使用同一个事务性生产者。

除了上述组件之外,Spring for Apache Kafka 还提供了其他用于处理事务相关问题的实用工具。随着本系列后续章节的深入,我们会根据需要介绍其中的一些。

在本系列博客的第二部分,我们将继续探讨 Spring Cloud Stream 应用程序中事务使用的更实际的实现细节。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有