Spring Cloud Stream和Apache Kafka的事务回滚策略

工程 | Soby Chacko | 2023年10月11日 | ...

本系列博客的其他部分

第一部分:Spring Cloud Stream Kafka 应用程序中的事务简介

第二部分:Spring Cloud Stream Kafka 应用程序中的生产者启动事务

第三部分:Spring Cloud Stream 中与外部事务管理器同步

在本博客系列的最后三个部分,我们分析了 Spring Cloud Stream Kafka 应用程序中的事务是如何工作的。我们探讨了事务在不同场景下的作用,包括生产者和消费者应用程序,以及应用程序如何正确使用它们。现在这些基础知识我们已经掌握,让我们来探讨事务的另一个方面:发生错误时回滚事务。当发生错误且事务管理器无法提交事务时,事务管理器会回滚事务,并且不会将任何内容持久化供下游消费者查看。如果应用程序能够控制这种回滚机制的工作方式,那就太有用了。Spring Cloud Stream 通过对 Apache Kafka 的 Spring 提供底层支持,简化了这种回滚定制。我们需要注意关于生产者和消费者(consume-process-produce)事务性应用程序的一些事项。我们将在下面逐一介绍。

生产者发起的事务

这是我们在上一篇文章中已经看过的代码片段。

@Transactional        
public void send(StreamBridge streamBridge)      
{
    for (int i = 0; i < 5; i++) {
      streamBridge.send("mySupplier-out-0", "my data: " + i);           
    }
}

如果事务方法抛出异常,我们该怎么办?答案是,从 Spring Cloud Stream 的角度来看,我们无需做任何事情。事务拦截器会发起回滚,最终 Kafka 中的事务协调器会中止该事务。最终,异常会传播给调用者,然后调用者可以决定是否重新触发事务方法(如果错误是暂时的)。框架不会重试,因为这是生产者发起的事务。这种情况很简单,因为在事务回滚期间,我们不需要在应用程序或框架层面做任何事情。如果发生错误,它保证会被回滚。然而,请记住,即使事务被回滚了,Kafka 日志中可能仍有未提交的记录。隔离级别为 read_uncommitted(默认值)的消费者仍然会收到这些记录。因此,消费者应用程序必须确保它们使用 read_committed 的隔离级别,这样它们就不会收到上游事务回滚的任何记录。

生产者发起的事务与外部事务同步

我们在本博客系列的最后一部分看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用程序也无需做任何事情来处理错误。事务会从数据库和 Kafka 发布中回滚。

消费者发起的事务回滚

如果生产者发起的事务回滚如此简单,你可能会想,有什么大不了的,为什么我们必须专门用一篇文章来讨论这个话题?什么时候有必要让应用程序提供特定的回滚策略?当你有一个正在进行的消费者发起的事务时,这是有意义的,因为我们需要特别注意如何处理已消费记录的状态及其偏移量。让我们重新审视我们在上一篇博客中使用的消费者发起的事务方法代码。

public Consumer<PersonEvent> process(TxCode txCode) {
   return txCode::run;
}

@Component
class TxCode {

   @Transactional
   void run(PersonEvent pe) {
       Person person = new Person();
       person.setName(pe.getName());

       Person savedPerson = repository.save(person);

       PersonEvent event = new PersonEvent();
       event.setName(savedPerson.getName());
       event.setType("PersonSaved");
       streamBridge.send("process-out-0", event);
   }
}

你可能还记得,这是一个端到端的事务性 consume-process-produce 模式。如果事务方法抛出异常怎么办?在这里,我们需要理解框架在回滚事务时如何处理已消费的记录。Spring for Apache Kafka 的底层消息监听器容器允许设置一个 回滚处理器

消息监听器容器使用失败记录作为列表的开头,调用 AfterRollbackProcessor API 处理来自上次消费者轮询的剩余记录。实现会使用 topic/partition 信息来确保在下次轮询时重新获取失败的记录。当应用程序在 Spring Cloud Stream 中启用事务时,我们使用一个名为 DefaultAfterRollbackProcessor 的默认实现,它实现了 AfterRollbackProcessor API。因此,当事务回滚时,这个实现会默认启动。让我们来看看当这个 AfterRollbackProcessor 在起作用时会发生什么。

Spring Cloud Stream 允许你通过消费者绑定设置方法调用重试的最大次数。例如,spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts。max attempts 值包括初始尝试。此值的默认值为。如果你想禁用重试,可以将此值设置为。在这种情况下,框架只尝试一次记录。此值包含记录的第一次尝试。因此,在默认的三次尝试的情况下,绑定器会在初始尝试后重试两次。

当用户方法抛出异常时,容器最初启动的事务将被回滚。由于我们在事务上下文中,容器随后会在新事务中使用 transaction template 调用 AfterRollbackProcessor 的 process 方法,启动一个新的 Kafka 事务。在运行 AfterRollbackProcessor 的 process 方法时,它会根据 max attempts 配置检查是否还有剩余的重试次数。如果发现还有重试次数,它会提交当前事务,这是一个 no-op 操作,因为在检查过程中没有发生任何事情。消费者会进行一次 seek 操作,定位到失败的记录,以便下次轮询返回该失败记录。然后消费者会轮询更多记录,这会导致失败的记录被重新传递。整个流程会重新开始并继续。如果再次失败,它会重复此过程,直到耗尽所有可用的重试次数。一旦所有重试次数都耗尽,AfterRollbackProcessor 会调用注册的 recoverer。Spring Cloud Stream 会注册一个 recoverer,将错误中的记录发送到错误通道。之后,输入(已恢复)记录的偏移量会被发送到新事务中。之后,当前事务被提交,该提交原子地将偏移量发送到事务并提交记录的偏移量。此时流程完成。已恢复的记录不包含在消费者 seek 中,下次轮询会返回新记录。

如果恢复因任何原因失败,容器的行为将如同未用尽重试次数一样,并陷入无限重试。如上所述,当恢复成功时,失败的记录不包含在 seek 中,因此下次轮询不会返回该记录。

假设应用程序将最大尝试次数设置为两次,并且记录在两次尝试中都失败了,以下是使用事务时的事件顺序

  1. 消费者轮询记录,Spring Kafka 中的监听器在 TransactionTemplate 的 execute 方法中被调用,这会触发 KafkaTransactionManager 启动一个新的事务。
  2. 最终,监听器调用了带有 @Transactional 注解的用户方法。
  3. 事务拦截器会拦截事务方法,并使用其事务管理器启动一个新的 JPA 事务。
  4. 当到达数据库操作时,由于我们处于方法执行中间,因此不会发生提交或回滚。
  5. StreamBridge 调用 send 方法,该方法发布到 Kafka topic。这里不会启动新的 Kafka 事务,因为已经有一个 Kafka 事务在进行中。KafkaTemplate 使用相同的事务资源,即生产者,来发布。
  6. 方法在其任何操作中抛出异常,事务拦截器会捕获该异常并对 JPA 事务执行回滚。
  7. 异常会传播回 Spring Kafka 中的消息监听器容器,其中监听器通过 TransactionTemplate 的 execute 方法调用了用户方法。然后它会回滚 Kafka 事务。
  8. 此时,容器会在新事务中调用 AfterRollbackProcessor,因为我们在事务上下文中。它会通过 TransactionTemplate 启动另一个 execute 操作,由 KafkaTransactionManager 创建一个新的 Kafka 事务。
  9. TransactionTemplate 的 execute 方法调用 AfterRollbackProcessor API 的 process 方法,并立即返回,因为还有一个重试次数(因为我们最多有两次尝试)。
  10. 然后容器提交新的 Kafka 事务,不做任何事情就关闭事务 - 基本上是一个 no-op 操作。
  11. 下面的消费者轮询会重新传递失败的记录,容器通过再次在新事务中调用监听器来重试(步骤 1)。
  12. 步骤 2 - 8 重复。
  13. TransactionTemplate 的 execute 方法调用 AfterRollbackProcessor 的 process 方法,并发现没有剩余的重试次数了。
  14. process 方法调用注册的 recoverer。由于我们将其作为 Spring Cloud Stream 应用程序运行,默认的 recoverer 会将其发送到错误通道。
  15. 记录恢复后,通过事务中的生产者,将已恢复记录(最初由消费者消费)的偏移量发送到事务中。
  16. AfterRollbackProcessor 中的 process 方法返回后,容器会调用事务的 commit 操作,该操作将偏移量原子地发送到事务中,并执行消费者偏移量的提交。

为什么在上面的步骤 8 和每次后续调用 AfterRollbackProcessor 失败后,我们需要一个新的事务?为什么不能在提交原始 Kafka 事务之前调用 AfterRollbackProcessor?虽然在每次失败的尝试后创建一个新的 Kafka 事务来执行后回滚任务可能听起来是不必要的开销,但这是必要的。当原始事务发生回滚时,它不会将偏移量发送到事务。如果有重试,容器会在新事务中再次调用监听器,然后循环继续,直到重试次数用尽并恢复记录。潜在地,容器创建并回滚的事务数量可能与 max attempts 的数量相同,而不会将偏移量发送到事务。每次原始事务回滚时,容器都会为 AfterRollbackProcessor 调用启动一个相应的(no-op 提交的)新事务,直到最后一次恢复后的提交。在恢复记录后,最后一次调用会将偏移量发送到事务中,以原子地提交偏移量并在 Kafka 端执行必要的事务清理。因此,正如我们所见,要将偏移量发送到事务,我们需要在新事务中调用 AfterRollbackProcessor

自定义 AfterRollbackProcessor

如果应用程序想要自定义后回滚任务,而不是使用 Spring Cloud Stream 使用的默认 DefaultAfterRollbackProcessor,那么它可以使用 ListenerContainerCustomizer 来提供自定义的 AfterRollbackProcessor。下面的列表展示了如何做到这一点。

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
   return (container, destination, group) -> container.setAfterRollbackProcessor(
           new DefaultAfterRollbackProcessor<byte[], byte[]>(
                   (record, exception) -> System.out.println("Discarding failed record: " + record),
                   new FixedBackOff(0L, 1)));
}

在提供上述自定义时,recoverer 会记录错误并继续。DefaultAfterRollbackProcessor 的构造函数还带有一个具有无重试的退避。因此,在这个例子中,一旦方法中发生异常,记录就会被记录下来。

事务性 DLQ 在记录恢复期间发布

Spring Cloud Stream 允许你在耗尽所有重试次数后,将失败的记录作为恢复过程的一部分发送到一个唯一的DLQ(死信队列)topic。我们提到 Spring Cloud Stream Kafka 绑定器使用的 DefaultAfterRollbackProcessor 会将记录发送到错误通道。当应用程序启用DLQ时,绑定器会将失败的记录发送到一个特殊的DLT topic。这如何发生的细节超出了我们事务讨论的范围。然而,问题在于DLT发布是否是事务性的。在设置DLQ基础设施时,如果应用程序使用事务(即,它提供了 transaction-id-prefix),绑定器将使用与 KafkaTransactionManager 中相同的原始事务生产者工厂。因此,框架保证事务性地发布到DLT

通过本文的讨论,我们涵盖了在 Spring Cloud Stream Kafka 应用程序中使用事务时的所有主要构建块。在本博客系列的下一部分,我们将探讨 Kafka 中事务的一个实际应用,即流行的 exactly-once-semantics,以及我们如何在 Spring Cloud Stream Kafka 应用程序中启用它们。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有