领先一步
VMware 提供培训和认证,助您加速进步。
了解更多本系列博客的其他部分
第一部分:Spring Cloud Stream Kafka 应用程序中的事务简介
第二部分:Spring Cloud Stream Kafka 应用程序中的生产者启动事务
在 本系列博客的前一部分 中,我们了解了事务管理的基础知识,主要是在使用生产者发起的 Spring Cloud Stream Kafka 应用程序时。在该讨论中,我们还简要地看到了 Spring Cloud Stream Kafka 消费者应用程序如何以适当的隔离级别消费以事务方式生成的记录。当您与外部事务管理器(例如关系数据库的事务管理器)同步时,我们提到您必须使用事务来确保数据完整性。在这一部分中,我们将了解在使用外部事务管理器时,如何在 Spring Cloud Stream 中实现事务保证。
在我们开始探索之前,请记住,在实践中实现分布式事务非常困难。您必须依赖两阶段提交 (2PC) 策略和一个适当的分布式事务管理器,例如兼容 JTA 的事务管理器,才能正确完成此操作。尽管如此,大多数企业用例可能不需要这种复杂程度,而且我们在此博客中描述的大多数我们考虑并在实践中看到人们使用的用例,最好坚持使用非分布式事务方法。这篇由 Spring 工程团队的 Dr. Dave Syer 于 2009 年发表的 文章,即使在 14 年后仍然与理解分布式事务的挑战以及 Spring 中推荐的替代方法相关。
让我们回到我们的讨论:在使用生产者发起的应用程序和消费-处理-生产(读取-处理-写入)应用程序中的外部事务管理器时,在 Spring Cloud Stream Kafka 应用程序中实现事务性。
现在,我们可以通过草拟一些代码来为我们将在讨论中使用的示例领域设定场景。我们使用了一些领域对象来驱动演示,并为它们创建了伪代码。
假设消息传递系统处理“事件”域类型——我们使用 PersonEvent
class PersonEvent {
String name;
String type;
//Rest omitted for brevity
}
我们还需要一个 Person 对象的域实体
@Entity
@Table(name = "person")
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String name;
// Rest omitted for brevity
}
最后,我们需要一个 CrudRepository 来处理 Person 域对象
public interface PersonRepository extends CrudRepository<Person, String> {}
在生产者发起的场景中,假设当一个方法被调用时(例如,通过 REST),会创建一个 Person 域对象,将其持久化到数据库,然后作为 PersonEvent 通过 StreamBridge 发送到出站 Kafka 主题。
在 消费-处理-生产 场景中,假设输入主题接收一个 PersonEvent,处理器从中生成一个 Person 域对象来持久化到数据库。最后,它将另一个 PersonEvent 发布到出站 Kafka 主题。
我们这里也使用 JPA 进行讨论。Spring Cloud Stream 应用程序是 Boot 应用程序,您可以在应用程序中包含 spring-boot-starter-jpa 依赖项,并包含适当的 spring.jpa.* 属性来驱动必要的自动配置。假设 Spring Boot 将为我们自动配置一个 JPATransactionManager。
让我们将用例分解为各种场景。
在生产者发起的场景中,我们必须以事务方式执行两项操作:数据库操作,然后是 Kafka 发布操作。这是基本思路。请记住,此代码仅显示了涉及内容的核心。在实际环境中,代码几乎肯定会比这复杂得多。
@Autowired
Sender sender;
@PostMapping("/send-data")
public void sendData() {
sender.send(streamBridge, repository);
}
@Component
static class Sender {
@Transactional
public void send(StreamBridge streamBridge, PersonRepository repository) {
Person person = new Person();
person.setName("Some Person");
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
上述生产者发起的代码是完全事务性的。在本博客的前一部分中,我们看到,如果您只有 Kafka 事务,那么添加 Transactional 注释是不够的。如前所述,Transactional 注释没有事务管理器,我们需要一个自定义事务管理器来使用相同的底层事务资源来实现事务性。然而,在这里情况有所不同。我们有 Spring Boot 自动配置的 JpaTransactionManager,事务拦截器使用它来启动事务。由于我们配置了 transaction-id-prefix,StreamBridge 发送操作可以以事务方式完成。但是,KafkaTemplate 通过 TransactionSynchronizationManager 将 Kafka 事务与已存在的 JPA 事务同步。方法退出时,主事务首先提交,然后是同步事务,在本例中是 Kafka 事务。
StreamBridge 发送操作触发新的 Kafka 事务,通过事务同步管理器与 JPA 事务同步。关于在 Spring 中同步事务的一般说明: 听起来它在后台进行复杂的事务同步。然而,正如我们在本文开头所暗示的,这里没有进行分布式事务同步,更不用说在各种事务之间进行任何智能同步的方法了。事务本身对同步一无所知。Spring TransactionSynchronizatonManager 仅协调多个事务的提交和回滚。在此上下文中同步事务在功能上类似于嵌套两个或多个 @Transactional 方法或 TransactionTempate 对象。配置项更少,因为 Spring 会为您处理嵌套。
假设由于流程中的一些新要求,我们需要颠倒提交顺序,让 Kafka 事务先于 JPA 提交。我们该怎么做?一个可能直观出现的解决方案是明确地将 Kafka 事务管理器提供给 @Transactional 注释,让 JPA 事务与作为主事务的 Kafka 事务同步。代码如下所示:
@Transactional(“customKafkaTransactionManager)
public void send(StreamBridge streamBridge, PersonRepository repository) {
Person person = new Person();
person.setName("Some Person");
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
我们需要提供一个自定义 Kafka 事务管理器
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder) this.binderFactory.getBinder("kafka", MessageChannel.class);
ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
return kafkaTransactionManager;
}
由于 Spring Boot 在检测到事务管理器已存在时不会配置事务管理器,因此我们必须自己配置 JPA 事务管理器
@Bean
public PlatformTransactionManager transactionManager(
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
return transactionManager;
}
我们的直觉在这里奏效了吗?我们成功地改变了事务应用的顺序了吗?不幸的是,没有。这样做无效,因为 JPA 事务管理器不允许其事务与其他事务同步,例如在本例中(自定义 Kafka 事务管理器)的主事务。在我们的例子中,尽管我们创建了一个自定义 Kafka 事务管理器作为主管理器,但 JPA 事务在执行存储库保存方法时会自行启动和提交,而不会与主事务同步。
JpaTransactionManager 创建一个 JPA 事务,而不同步与主事务。那么,我们如何颠倒事务呢?有两种方法可以做到这一点。
首先,我们可以尝试链接事务管理器。ChainedTransactionManager 是 Spring Data 项目中的一个事务管理器实现。您可以将事务管理器的列表指定给 ChainedTransactionManager,它会按照其列表中的事务管理器顺序启动事务。在退出时(即方法退出时),事务将按照事务管理器列表的相反顺序提交。
虽然这听起来是一个合理的策略,但要记住的一个大缺点是 ChainedTransactionManager 目前已被弃用,并且不是推荐的选项。弃用的原因在 Javadoc 中。要点是,人们通常期望 ChainedTransactionManager 是一个神奇的万能药,可以解决所有事务问题,包括两阶段提交的分布式事务和其他问题,而事实恰恰相反。ChainedTransactionManager 仅确保事务以特定顺序启动和提交。它不保证任何事务同步,更不用说任何分布式事务协调。如果您对 ChainedTransactionManager 的限制感到满意,并且需要特定顺序,就像我们的用例一样。在这种情况下,只要您记住您正在使用框架中已弃用的类,就可以合理地使用此事务管理器。
让我们尝试在我们的场景中使用 ChainedTransactionManager,看看效果如何。Spring for Apache Kafka 提供了一个名为 ChainedKafkaTransactionManager 的子类,由于父类已被弃用,因此它也被弃用了。
我们使用前面在链接事务中看到的相同的自定义 KafkaTransactionManager bean。
我们还需要创建 JpaTransactionManager bean,如前所述,因为 Spring Boot 不会自动配置它,因为它已经检测到了自定义 KafkaTransactionManager bean。
添加这两个 bean 后,让我们创建 ChainedKafkaTransactionManager bean
@Bean
public ChainedKafkaTransactionManager chainedKafkaTransactionManager(KafkaTransactionManager kafkaTransactionManager, PlatformTransactionManager transactionManager) {
return new ChainedKafkaTransactionManager(jpaTransactionManager, kafkaTransactionManager);
}
有了这些设置,让我们修改我们的 Transactional 注释
@Transactional("chainedKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository) {
..
}
上面的配置实现了我们想要的结果。当您运行此应用程序时,我们将颠倒事务,如预期的那样——即,Kafka 先提交,然后是 JPA。
TransactionInterceptor 使用自定义 ChainedKafkaTransactionManager 来启动事务。它使用 JpaTransactionManager 启动 Jpa 事务,并为 KafkaTransactionManager 执行相同的操作。StreamBridge 执行 Kafka 发布。我们在这里看到了与上面 JPA 相同的情况。由于已经存在 Kafka 事务,因此它不会启动新的 Kafka 事务。StreamBridge 发送操作是通过最初的 Kafka 事务所使用的相同的事务性生产者工厂完成的。此处不发生提交或回滚。如果您对链接事务管理器的限制感到满意,则此方法有效。请记住,这里没有事务同步。事务管理器在事务开始时按给定顺序应用,在提交或回滚时按相反顺序应用。如果您选择此路线,由于您使用的是框架中已弃用的类,最好将它们复制并在您的项目中保留,而不是依赖于框架。由于它们已被弃用,不保证会有新功能和错误修复。未来的版本可能会完全删除它们。也有可能这些类永远不会被删除,并且弃用状态是为了阻止使用它们(因为人们认为它们的功能比实际功能更强大)。
如果您不想依赖框架中已弃用的类,或者不想复制它们并在您的端维护它们,那么您还有另一个选择。您可以创建两个事务方法并嵌套调用。这是该想法的蓝图:
@Component
static class Sender {
@Transactional("jpaTransactionManager")
public void send(StreamBridge streamBridge, PersonRepository repository, KafkaSender kafkaSender) {
Person person = new Person();
person.setName("Some Person");
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
kafkaSender.send(streamBridge, event);
}
}
@Component
static class KafkaSender {
@Transactional("customKafkaTransactionManager")
public void send(StreamBridge streamBridge, PersonEvent event) {
streamBridge.send("process-out-0", event);
}
}
请确保嵌套调用在不同的类中,原因与我们在本博客系列的 第二部分 中讨论过的 AOP 代理工作方式有关。
在这种情况下,两个方法都是事务性的,并且它们是嵌套的。当事务拦截器拦截第一个方法调用时,它会启动 JPA 事务。在执行过程中,嵌套调用(其方法也带有 @Transactional 注释)会进来。由于此 bean 方法带有 @Transactional 注释,Spring AOP 会将 bean 包装在 AOP 通知中。由于我们从不同类中的另一个 bean 调用此通知的 bean,因此代理机制会正确调用通知的 bean。另一个事务拦截器通过使用不同的事务管理器(即 KafkaTransactionManager)启动新事务。当发生 Kafka 发布时,事务不会立即提交或回滚,因为它是作为方法一部分启动的事务,并且在方法退出时发生提交或回滚。此时,控制权返回到第一个方法并继续。一旦退出原始方法,JPA 事务就会通过拦截器提交。如果发布到 Kafka 的方法抛出异常,它将回滚该事务。在这种情况下,回滚后,异常会传播回第一个事务方法(JPA 方法),后者由于异常也会回滚其事务。
使用此技术时的一个重要注意事项 嵌套方法的调用应该是第一个方法做的最后一件事,因为如果第一个方法在 Kafka 调用成功后未能执行某些代码,Kafka 事务已经提交。第一个方法中的失败不会自动回滚 Kafka 事务。
通过我们在本系列中到目前为止获得的对事务的核心理解,让我们看一下事件驱动和流应用程序中的一个关键模式,称为 消费-处理-生产 模式。在 Spring Cloud Stream 中,此类模式的实现如下所示:
@Bean
public Function<PersonEvent, PersonEvent> process(TxCode txCode) {
return pe -> txCode.run(pe);
}
@Component
class TxCode {
@Transactional
PersonEvent run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
return event;
}
}
我们有一个 Spring Cloud Stream 函数,它从输入主题消费 PersonEvent,然后在函数 lambda 表达式的主体内调用一个函数进行处理。此函数返回另一个 PersonEvent,我们将其发布到出站 Kafka 主题。如果我们不在事务上下文中,我们可以在函数 lambda 表达式中内联上面的 run 方法。但是,为了实现事务语义,@Transactional 注释必须位于不同类中的方法上。
为了使绑定器具有事务性,请确保提供具有有效值的 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。
上面的代码是完全事务性的吗?然而,现实是它在端到端方面仅是部分事务性的。让我们看一下事件顺序。
由于我们提供了 transaction-id-prefix,因此绑定器是事务性的。当消费者在消息侦听器容器中轮询记录时,它会在 TrasactionTemplate#execute 方法内调用内部侦听器方法。因此,执行侦听器方法(调用用户方法)的整个端到端过程都在由 KafkaTransactionManager 启动的事务中运行。当事务启动时,TransactionSynchronizationManager 将资源(生产者)绑定到事务。当调用用户方法(用 @Transactional 注释的方法)时,事务拦截器会拦截该调用,让包装的 AOP 通知处理实际调用。由于我们有一个 JpaTransactionManager,事务拦截器会使用该管理器并启动新事务。由每个事务管理器实现决定它是否希望与现有事务同步。如我们上面已经讨论过的,在 JpaTransactionManager(以及许多其他类似的数据库事务管理器实现)的情况下,它不允许与现有事务同步。因此,JPA 事务独立运行,如上文各节所示。当 run 方法退出时,事务拦截器会使用 JPA 事务管理器执行提交或回滚操作。至此,JPA 事务管理器完成了其工作。此时,方法调用的响应返回给调用者,即 Spring Cloud Stream 基础结构。此机制在 Spring Cloud Stream 中获取此响应并将其发送到 Kafka 中的出站主题。它使用与初始事务开始时绑定的相同的事务性生产者。发送记录后,控制权返回到消息侦听器容器,然后提交或回滚事务。
TransactionTemplate 的 execute 方法调用侦听器。KafkaTransactionManager 启动新事务。3. Kafka 资源被绑定(生产者)。4. 当到达用户代码时,事务拦截器最终会拦截该调用并使用 JpaTransactionManager 启动新事务。5. AOP 代理然后调用实际方法。当方法退出时,JpaTransactionManager 提交或回滚。6. 方法的输出返回到 Spring Cloud Stream 中的调用者。7. 然后使用步骤 4 中绑定的同一事务资源将响应发送到 Kafka 出站。8. 控制权返回到消息侦听器容器,KafkaTransactionManager 提交或回滚。
那么,这里有什么问题?它看起来像事务性的,但实际上只是一部分事务性的。最根本的问题是,整个端到端过程不在单个原子事务的边界内,这是一个重大问题。这里有两个事务——Kafka 和 JPA——并且 JPA 和 Kafka 事务之间没有同步。如果数据库事务已提交而 Kafka 发送失败,则无法回滚 JPA 事务。
我们可能会认为 ChainedTransactionManager 可以提供帮助。虽然这种直觉有一些优点,但它与上述代码不兼容。由于在调用侦听器方法时在容器中创建的 Kafka 事务,ChainedTransactionManager 不会从提供给它的任何 Kafka 事务管理器创建新的 Kafka 事务。当退出用户方法时,我们仍然有一个 JPA 事务需要提交或回滚。Kafka 事务必须等到调用返回到容器才能提交或回滚。
问题在于我们使用了 Spring Cloud Stream 中的一个函数,该函数允许框架发布到 Kafka。在我们的例子中,任何用户指定的事务,如 JPA 事务,都发生在 Spring Cloud Stream 进行 Kafka 发布之前。我们需要确保用户代码是发布到 Kafka 的代码,以便我们可以将整个事务代码视为一个单元。为此,我们应该切换到 Consumer 而不是 Function,然后使用 StreamBridge API 发布到 Kafka。请看这个修改后的代码:
@Bean
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
然后我们使用相同的 TxCode,如上
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
请注意,run 方法不返回任何内容,但我们通过 StreamBridge API 显式发送到出站 Kafka 主题。
TransactionTemplate 的 execute 方法调用侦听器。KafkaTransactionManager 启动新事务。JpaTransactionManager 启动新事务。StreamBridge 作为方法执行的一部分进行。底层 KafkaTemplate 使用在步骤 4 中绑定的相同的事务性生产者工厂。JpaTransactionManager 提交或回滚。TransactionTemplate#execute 方法。请特别注意上面第 7 步。当 KafkaTemplate 检测到已有 Kafka 事务正在进行(在步骤 3 中开始)时,它不会与 JPA 事务同步,尽管 KafkaTemplate 能够做到这一点。现有的 Kafka 事务将优先,并加入该事务。
尽管我们仍然有两个独立的事务,但从端到端事务的角度来看,事情是原子的。如果通过 StreamBridge 的 Kafka 发布操作失败,JPA 和 Kafka 事务都不会执行提交操作。两者都会回滚。同样,如果数据库操作失败,两个事务仍然会回滚。但是,总有可能一个事务提交而另一个回滚,因此应用程序代码必须处理记录的去重以实现容错。
在讨论 消费-处理-生产 模式时,另一个关键组件是生产者需要将消耗的记录的偏移量(除了消费者提交偏移量)发送到事务中。正如我们在本博客系列的 第一部分 中所看到的,有一个 Kafka Producer API 方法称为 sendOffsetToTransaction,其中生产者通过 OffsetMetadata 和 ConsumerGroupMetadata 为每个分区发送一个偏移量(当前消息的偏移量 + 1)。使用 Spring Cloud Stream 或 Spring for Apache Kafka 时,应用程序不需要调用此低级操作。Spring for Apache Kafka 中的 Kafka 消息侦听器容器会代表应用程序自动处理它。尽管框架在事务提交前调用了生产者上的 sendOffsetToTransaction,但在事务协调器提交事务时,发送偏移量到事务和实际的消费者偏移量提交是原子发生的。
通过这次讨论,我们探讨了编写必须与外部事务系统(如数据库)交互的事务性 Spring Cloud Stream 应用程序的各种选项,同时消费和生产到 Apache Kafka。
在系列的下一部分中,我们将研究事务回滚(编写事务系统时的另一个关键方面)以及如何在编写 Spring Cloud Stream Kafka 应用程序时访问各种 Spring 组件。