领先一步
VMware 提供培训和认证,助您加速进步。
了解更多本系列博客的其他部分
第一部分:Spring Cloud Stream Kafka 应用程序中的事务简介
第二部分:Spring Cloud Stream Kafka 应用程序中的生产者启动事务
第三部分:Spring Cloud Stream 中与外部事务管理器同步
第四部分:Spring Cloud Stream 和 Apache Kafka 的事务回滚策略
在上一系列讨论中,我们已经对 Spring Cloud Stream Kafka 应用中的事务工作原理进行了基本分析。现在,我们终于迎来了重点:精确一次语义,这是流式应用中备受关注且必不可少的功能。在本篇博客系列文章中,我们将探讨如何通过 Apache Kafka 事务在 Spring Cloud Stream 应用中实现精确一次语义。由于我们已经掌握了前几部分关于事务工作原理的知识,理解 Spring Cloud Stream Kafka 应用如何实现精确一次语义将相对容易。
需要注意的是,为了实现精确一次语义,我们无需编写任何新的代码,除了我们在本系列博客前几篇文章中已经看到的代码之外。这篇博客将阐明在 Spring Cloud Stream Kafka 应用中充分支持精确一次语义所必需的一些期望。
在分布式计算中实现精确一次语义是一项艰巨的任务。我们在此不深入探讨所有技术细节,以解释为何这是一项如此困难的任务。有兴趣的读者如果想了解精确一次语义的所有底层原理及其在分布式系统中为何如此难以实现,可以参考相关文献。Confluent 的这篇博客是一个很好的起点,可以帮助理解这些技术挑战以及 Apache Kafka 为实现这些目标所采用的解决方案。
虽然我们不会深入细节,但了解 Apache Kafka 提供的不同投递保证是值得的。主要有三种此类投递保证:
在至少一次的投递语义中,应用程序可能会接收数据一次或多次,但保证至少接收一次。在至多一次语义的投递保证中,应用程序可能接收数据零次或一次,这意味着存在数据丢失的可能。另一方面,精确一次语义正如其名称所示,仅保证一次投递。根据应用程序的用例,您可能可以接受使用其中任何一种保证。默认情况下,Apache Kafka 提供至少一次投递保证,这意味着一条记录可能会被多次投递。如果您的应用程序可以处理重复记录或无记录的后果,那么使用非精确一次保证可能没问题。相反,如果您处理的是金融系统或医疗数据等关键任务数据,则必须保证精确一次的投递和处理,以避免灾难性后果。由于 Apache Kafka 等系统的分布式特性,由于许多移动部分的存在,实现精确一次语义通常非常困难。
我们在本系列博客的前几篇文章中看到了许多不同的场景。Apache Kafka 中的精确一次语义适用于读-处理-写(或消费-转换-生产)应用。有时会让人困惑的是,我们究竟在“一次”做什么?是初始消费、数据处理还是最后的生产部分?Apache Kafka 保证的是整个读->处理-写序列的精确一次语义。在这个序列中,读取和处理部分始终是至少一次的——例如,如果处理或写入部分的任何原因失败。当您依赖精确一次投递时,事务至关重要,这样数据的最终发布才能成功或回滚。一个潜在的副作用是,初始消费和处理可能会发生多次。例如,如果事务被回滚,消费者偏移量不会被更新,下一次轮询(无论是 Spring Cloud Stream 内部的重试还是应用程序重启)都会重新投递并再次处理同一条记录。因此,消费和处理(转换)部分的保证是至少一次,这是一个关键的理解点。任何以 read_committed 隔离级别运行的下游消费者将只会精确地接收一次上游处理器发送的消息。因此,必须理解,在精确一次投递的世界里,处理器和下游消费者都必须协调才能受益于精确一次语义。使用 read_uncommitted 隔离级别的已发布主题的任何消费者可能会看到重复的数据。
另一个需要牢记的点是,由于记录的消费及其处理可能发生多次,应用程序代码需要遵循幂等模式,这主要是在您的代码与数据库等外部系统交互时需要考虑的。在这种情况下,应用程序需要确保用户代码没有副作用。
让我们回顾一下我们之前看到的简单消费-处理-生产循环的代码。
@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
正如我们之前所见,为了使此应用程序具有事务性,我们必须为 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix 配置属性提供一个合适的值。在 Spring Cloud Stream 中,提供此属性是使上述代码段完全具备精确一次投递能力所需的所有操作。整个端到端过程在事务边界内运行(尽管在上面的示例中有两个事务)。我们有一个外部 Kafka 事务,它在容器调用监听器时启动,另一个由事务拦截器启动的 JPA 事务。当发生 StreamBridge 发送时,会使用与初始 Kafka 事务相同的事务资源,但直到控制权返回容器后才提交。当方法退出时,JPA 事务就会提交。假设在这里出现问题,并且数据库操作抛出异常。在这种情况下,JPA 不会提交,它会回滚,异常会传播回监听器容器,此时 Kafka 事务也会被回滚。另一方面,如果 JPA 操作成功,但 Kafka 发布失败并抛出异常,JPA 不会提交而是回滚,异常会传播回监听器。
在上述代码中,如果我们不与外部事务管理器同步而仅仅发布到 Kafka,那么就不需要使用 @Transactional 注释,我们甚至可以将代码内联到 txCode 方法中作为消费者 lambda 的一部分。
@Bean
public Consumer<PersonEvent> process() {
return pe -> {
Person person = new Person();
person.setName(pe.getName());
PersonEvent event = new PersonEvent();
event.setName(person.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
在这种情况下,我们只有一个在容器调用监听器时启动的 Kafka 事务。当代码通过 StreamBridge 发送方法发布记录时,KafkaTemplate 会使用与初始事务相同的事务生产者工厂。
这两种场景下的故事都是,我们是完全事务性的,并且最终的发布仅在事务中进行一次。具有 read_committed 隔离级别的下游消费者应该精确地消费它们一次。
在本系列中,直到目前为止,我们还没有讨论 Kafka Streams。这有些讽刺,因为最初,Kafka Streams 应用是 Apache Kafka 添加事务支持和精确一次语义的原因,但我们还没有讨论它。原因是,在 Kafka Streams 应用中实现精确一次语义非常简单,几乎是默认的。他们称之为,它是一个单一的配置开关。要了解更多关于 Kafka Streams 中精确一次语义的信息,请参阅 Confluent 的这篇博客。
与常规的基于 Kafka 客户端的应用一样,对于 Kafka Streams,当您在消费-处理-生产模式中生产最终输出时,精确一次保证就会起作用,这意味着已发布的任何下游消费者都将精确地消费一次,只要它们使用 read_committed 隔离级别。
Kafka Streams 的 processing.guarantee 配置属性在 Kafka Streams 应用中启用精确一次语义。您可以通过设置 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee 属性来在 Spring Cloud Stream 中进行设置。您需要将其值设置为 exactly_once。默认情况下,Kafka Streams 使用 at_least_once 的值。
状态ful Kafka Streams 应用通常会发生三种主要活动:
模式是接收并处理记录。在此过程中,任何状态信息都会具体化到状态存储中,本质上是更新特定的变更日志主题。最后,出站记录会被发布到另一个 Kafka 主题。如果您注意到这个模式,它看起来与我们已经看到的许多场景相似,除了状态存储部分。当将 processing.guarantee 设置为 exactly_once 时,Kafka Streams 保证,如果在这些活动期间发生异常或应用程序崩溃,整个单元将原子地回滚,就好像什么都没发生一样。应用程序重新启动后,处理器会再次消费记录,处理它,并最终发布数据。由于这在后台以事务方式进行发布,因此任何使用 read_committed 隔离级别的下游消费者都不会在成功发布之前消费该记录,从而处理了实现事务性所需的所有操作(例如,提交已消费记录的偏移量等),从而保证了精确一次投递。
Kafka Streams 的精确一次投递保证是指从 Kafka 相关活动的角度来看,端到端的消费、处理和记录的发布。它在存在外部系统时并不提供此保证。例如,如果您的代码与数据库插入或更新操作等外部系统交互,那么由应用程序决定如何参与事务。Spring 的事务支持在这种情况下再次派上用场。我们不想在此重复代码,但正如我们在此系列中多次看到的,您可以将与数据库交互的代码封装在单独的方法中,用 @Transactional 注释进行注解,并提供一个合适的事务管理器,例如我们之前看到的 JPA 管理器。当此类方法抛出异常时,JPA 事务会回滚,异常会传播到 Kafka Streams 处理器代码,后者最终会将其传播回 Kafka Streams 框架本身,然后该框架会回滚原始 Kafka 事务。再次强调,理解这一点很重要,从流拓扑中的处理器调用的这些操作必须被编写成能够处理幂等性,因为“精确一次”仅适用于整个过程,而不是序列中单独的读取和处理。
正如我们在本文开头提到的,精确一次投递语义在分布式计算中是一个复杂的主题。然而,借助 Kafka 原生提供的实现精确一次语义的解决方案,以及 Spring 在 Spring for Apache Kafka 和 Spring Cloud Stream 框架中的支持,在 Spring Cloud Stream Kafka 应用中实现精确一次投递语义相对容易。