领先一步
VMware 提供培训和认证,助您加速进步。
了解更多本系列博客的其他部分
第一部分:Spring Cloud Stream Kafka 应用程序中的事务简介
本文是我们研究 Spring Cloud Stream 和 Apache Kafka 中事务的博客系列中的第 2 部分。在上一部分中,我们对事务进行了概述,并触及了基本概念。在本博客系列的这一部分中,我们将深入探讨一些实现细节及其实际应用。
在本文中,我们主要关注生产者端,以了解事务如何与 Spring Cloud Stream 和 Apache Kafka 一起工作。
在我们深入探讨生产者发起的事务之前,先回顾一些基础知识,了解一个简单的生产者。在 Spring Cloud Stream 中,有几种方法可以编写生产者(在消息传递领域也称为发布者)。如果你有一个需要按计划生成数据的用例,你可以编写一个 java.util.function.Supplier 方法,如下所示。
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
当将上述 Supplier 作为 Spring bean 提供时,如代码所示,Spring Cloud Stream 会将其视为发布者,并且,由于我们处于 Apache Kafka 的上下文中,它会将 POJO 记录发送到 Kafka 主题。
默认情况下,Spring Cloud Stream 每秒调用一次 Supplier,但你可以通过配置更改该计划。有关更多详细信息,请参阅 参考文档。
如果你不想轮询 Supplier,而是想控制发布频率,该怎么办?Spring Cloud Stream 通过 StreamOperations API 提供了一种便捷的方式,其开箱即用的实现称为 StreamBridge。这是一个示例。
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
在这种情况下,应用程序使用 REST 端点来触发通过 StreamBridge 发布数据。由于框架不会按计划调用该函数,因此任何外部方都可以通过调用 REST 端点来启动数据发布。
在这些基本生产者中使用事务是否合适?
现在我们已经了解了 Spring Cloud Stream 提供的两种发布记录的策略,让我们回到我们讨论的主要主题:事务性发布。假设一个场景,我们希望确保数据完整性并在使用一个或多个这些生产者时获得事务保证。在这种情况下,问题是我们是否需要首先使用事务来实现它们。在上面的两个示例中,你如何确保记录是事务性地发布的?简短的回答是,你应该避免在这些类型的发布用例中使用事务。这些示例中的记录发布是单次发送场景。使用同步生产者,我们可以实现相同的语义事务保证。默认情况下,生产者是异步的,当使其运行在同步模式下时,生产者会确保在将响应发送给客户端之前,它将记录写入 leader 和所有副本。可以通过将 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync 属性设置为 true 来启用同步发布。
总而言之,在设计仅生产者的应用程序时,请谨慎使用事务。如果不一次发送一条记录,而是使用 Supplier 或 StreamBridge,我们不建议使用事务,因为将生产者转换为同步模式运行可以达到相同的结果,而无需事务开销。然后,这个讨论引出了一个有趣的问题。对于仅生产者的应用程序,何时有必要使用事务并获得好处?正如在本系列博客的上一部分中所讨论的,这完全取决于应用程序的用例。在生产者的上下文中,这意味着只有在我们进行多个相关的发布,或者除了发布之外,还需要与外部事务管理器同步时,才需要使用事务。本文的下一部分将介绍前一种情况,而本博客系列的下一篇文章将介绍后一种情况。
为 Spring Cloud Stream 的 Kafka 绑定器启用事务的主要驱动程序是一个属性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix。当此属性具有有效的 prefix 字符串时,Spring Cloud Stream 中的 Kafka 绑定器会确保底层 KafkaTemplate 使用事务发布数据。顺便说一句,此属性还会提示 Spring Cloud Stream 在使用处理器模式(consume-process-produce 或 read-process-write 模式)时使消费者具有事务感知能力。
尽管这可能有些违反直觉,但让我们回到前面描述的单个 Supplier 或 StreamBridge 示例,并引入事务来理解事务组件的主要用途。如前所述,在这些情况下我们不需要使用事务,因为这会增加更多开销。但是,这样做有助于我们理解。
再次看代码
@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
}
现在我们提供所需的属性。
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-
由于我们在应用程序的配置中提供此属性,因此每次(通过框架)调用此示例中的 Supplier 时,或者当有人调用 StreamBridge#send 方法后面的 REST 端点时,底层到 Kafka 主题的发布都将是完全事务性的。
当调用 Supplier 时,Kafka 绑定器使用 KafkaTemplate 发布数据。当绑定器检测到应用程序提供了 transaction-id-prefix 属性时,每个 KafkaTemplate#send 调用都通过 KafkaTemplate#executeInTransaction 方法进行。因此,请放心,框架会以事务方式执行底层到 Kafka 主题的发布。从应用程序的角度来看,应用程序开发人员需要为事务提供的唯一内容是 transaction-id-prefix 属性。
在开发或调试事务性应用程序时,将日志级别设置为 TRACE 通常是值得的,这样相关的底层事务类就可以为我们提供有关正在发生的事情的详细信息。
例如,如果将以下软件包的日志级别设置为 TRACE,你将在日志中看到大量活动。
logging:
level:
org.springframework.transaction: TRACE
org.springframework.kafka.transaction: TRACE
org.springframework.kafka.producer: TRACE
org.springframework.kafka.core: TRACE
每次框架调用 Supplier 方法时,我们可以在日志中观察到以下内容
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = …
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()
从跟踪日志可以看出,每次以事务方式发布记录时,它会形成一个序列:beginTransaction、Sending、Sent 和 commitTransaction。如果你运行应用程序,你会发现这些序列每秒都会出现一次,因为这是 Spring Cloud Stream 调用 Supplier 方法的默认计划。
相同的事务流程也适用于 StreamBridge#send 的情况。当 Spring Cloud Stream 调用 send 方法时,输出绑定使用的底层 KafkaTemplate 会确保记录在事务中发布,因为我们提供了 transaction-id-prefix。
有了这些铺垫之后,我们来讨论一些使用事务更有意义的情况。如前所述,将多条记录作为单个原子单元发布是一个有效场景,在这种情况下使用事务变得必要。
我们来看下面的代码示例
public void publish(StreamBridge streamBridge {
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
如你所见,这是一个为了演示而刻意设计的示例。我们发布了多条记录,而不是只发布一条。发布到多个主题同样是这里一个有效的方法。我们可能会认为,通过设置 transaction-id-prefix 属性,我们可以快速将多条记录的发布包装在单个事务中。然而,仅凭这些还不足以帮助我们。我们仍然需要提供 prefix 属性。但是,仅仅这样,每次发送仍然发生在各自独立的事务中。为了确保所有五条记录的整个端到端发布都原子地发生,我们需要在方法上应用核心 Spring 框架的 @Transactional 注解。此外,我们必须提供一个事务管理器 bean - KafkaTransactionManager - 该 bean 使用 Spring Cloud Stream Kafka 绑定器创建的同一个生产者工厂。以下是我们的代码现在的样子以及应用程序的配置。
@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {
@Autowired
StreamBridge streamBridge;
@Autowired Sender sender;
@Autowired
DefaultBinderFactory binderFactory;
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamProducer.class, args);
}
@PostMapping("/send-data")
public void publishData() throws InterruptedException {
sender.send(streamBridge);
}
@Component
static class Sender {
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
}
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
return kafkaTransactionManager;
}
}
以及相应的配置
spring:
cloud:
stream:
bindings:
mySupplier-out-0:
destination: my-topic
kafka:
binder:
Transaction:
transaction-id-prefix: mySupplier-
producer:
configuration:
retries: 1
acks: all
请注意,前面代码中的事务性方法(用 @Transactional 注释的方法)必须与调用该方法的类不同。如果调用发生在同一类的方法之间或未被 Spring 管理的 bean 之间的不同类之间,则没有代理,事务拦截器也不会生效。JVM 在运行时不知道代理和拦截器机制。在方法上添加 @Transactional 注解时,Spring 会在后台为该方法创建一个事务性代理。当 Spring Cloud Stream 调用事务性方法时,代理会拦截该调用,然后通过代理对象实际调用。
我们提供的自定义 KafkaTransactionManager bean 有两个目的。首先,它使 Spring Boot 应用 @EnableTransactionManagerment。它还提供与绑定器内部使用的相同的生产者工厂,以便事务注解在使用事务时使用正确的资源。
当 Spring Boot 检测到可用的事务管理器 bean 时,它会自动为我们应用 @EnableTransactionManagement 注解,该注解负责检测 @Transactional 注解,然后通过 Spring AOP 代理和通知机制添加拦截器。换句话说,Spring AOP 为 @Transactional 方法创建一个代理,并包含 AOP 通知。如果没有应用 @EnableTransactionManagement 注解,Spring 将不会触发任何这些代理和拦截机制。由于 EnableTransactionManagement 注解因各种原因至关重要,因此我们必须提供一个事务管理器 bean。否则,方法上的 Transactional 注解将不起作用。
请注意,我们从绑定器中获取事务性生产者工厂,并在 KafkaTransactionManager 的构造函数中使用它。当此 bean 存在于应用程序中时,所有记录的整个发布现在都发生在一个事务的范围内。我们在跟踪日志中只看到一个 beginTransaction…commitTransaction 序列,这意味着只有一个正确的事务执行了所有发布操作。
在后台,这是事件序列
Transactional 注释的方法,事务拦截器就会通过 AOP 代理机制生效,并使用自定义 KafkaTransactionManager 启动新事务。StreamBridge#send 方法时,底层 KafkaTemplate 将使用自定义 KafkaTransactionManager 创建的同一个事务性资源。由于事务已在进行中,它不会启动另一个事务,而是通过同一个事务性生产者进行发布。send 方法时,它不会启动新事务。相反,它通过原始事务中使用的同一生产者资源进行发布。KafkaResourceHolder 的 commit 或 rollback 方法,这些方法调用 Kafka 生产者来 commit 或 rollback 事务。由于我们的示例中只有一个自定义 KafkaTransactionManager bean,我们可以简单地使用 Transactional 注解。另一方面,如果我们有多个自定义 KafkaTransactionManager bean,我们就必须使用正确的 bean 名称来限定 @Transactional 注解。
如果我们移除自定义 KafkaTransactionManager 并运行此应用程序,你会发现它创建了五个单独的事务,而不是一个事务。如果启用 TRACE 日志记录,你将在日志中看到五个 beginTransaction…commitTransaction 序列。
你可以通过编写一个事务性消费者 Spring Cloud Stream 应用程序并将其隔离级别设置为 read_committed 来验证此行为。你可以使用 spring.cloud.stream.kafka.binder.configuration.isolation.level 属性并将其值设置为 read_committed 来做到这一点。为方便测试,请添加 Thread.sleep 或其他等待机制来模拟 for 循环中每次 StreamBridge#send 后的行为。你会发现,每次 send 方法调用返回后,无论是否等待,消费者都会收到数据,从而证明整个操作不是由一个事务完成的,而是每次 send 都在自己的事务中进行。
我们看到每个 send 都有单独的事务,因为 Transactional 注解不起作用。Transactional 注解仅在存在事务管理器 bean 且其生产者工厂与绑定器使用的生产者工厂相同时才有效。
Spring Boot 会在配置中检测到 transaction-id-prefix 属性(通过 spring.kafka.producer.transaction-id-prefix)时自动配置一个 KafkaTransactionManager。然而,由于我们在 Spring Cloud Stream 的上下文中,我们必须使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix,因为这是我们向框架发出信号以创建绑定器及其关联的事务性生产者工厂的内部事务管理器的方式。如果我们提供正确的 spring.kafka 前缀,以便 Spring Boot 为我们自动配置一个 KakaTransactionManager,会怎么样?虽然这很诱人,但它不起作用,因为自动配置的事务管理器使用与绑定器使用的生产者工厂不同的生产者工厂。因此,我们必须提供一个自定义 KafkaTransactionManager,它使用绑定器使用的同一个生产者工厂。这正是我们上面所做的。
在本系列博客的下一部分,我们将学习如何同步到生产者和消费者发起的事务的外部事务管理器。