Spring for Apache Kafka 中引入共享消费者支持 (Kafka 队列)

工程 | Soby Chacko | 2025年10月14日 | ...

在我们的 Road to GA 系列中,本周我们将探讨 Apache Kafka 4.0.0 中的共享组及其在 Spring for Apache Kafka 4.0.0 中的集成——这一功能从根本上扩展了我们从 Kafka 主题消费消息的方式。

当我们首次使用Kafka时,其心智模型通常很简单:主题存储消息,消费者读取它们,并且处理在分区内按顺序进行。这种基于分区的模型为无数应用程序提供了良好的服务,提供了带有强保证的有序处理。然而,某些用例涉及创建数百个分区的主题,其主要目的是为了实现更高的并行性,而不是因为任何排序要求。分区数量和消费者并行性之间的关系在我们确实需要排序保证时非常有效,但当处理不需要保留序列的独立事件时,它就成为一个限制。

Apache Kafka 4.0.0引入了共享组(也称为“Kafka队列”)作为一种补充的消费模型。这一新增功能不会取代传统的消费者组,但为那些记录级别分发比分区级别分配更有意义的场景提供了替代方案。Spring for Apache Kafka 4.0.0全面支持共享组,本文将探讨它们的工作原理以及何时适合我们的应用程序需求。

请注意,共享组目前在Kafka 4.1.0中处于预览状态,预计将在Kafka 4.2.0中达到生产就绪状态。

两种模型满足不同需求

传统消费者组将整个分区分配给消费者。在任何给定时间,每个分区都只属于组中的一个消费者,这为我们提供了该分区内的有序处理。

共享组通过分发单个记录而不是整个分区来采取不同的方法。代理协调共享组中可用消费者之间的记录分发,允许任何消费者接收任何记录,无论它来自哪个分区。

关键的权衡是:传统消费者组通过分区分配提供排序保证,而共享组通过记录级别分发提供扩展灵活性。

何时使用共享组

当满足以下条件时选择共享组:

  • 处理大量独立事件,其中吞吐量比顺序更重要。示例包括图像处理管道、通知服务和作业协调系统,其中每个任务都是独立的。
  • 工作负载具有全天波动或遵循季节性模式的可变需求模式。共享组允许您动态扩展消费者,而无需为高峰容量预配数百个分区。

在以下情况下继续使用传统消费者组:

  • 顺序很重要。如果您正在处理事件流(用户会话、金融交易、状态转换),其中顺序很重要,那么具有排序保证的分区分配是正确的模型。
  • 构建维护聚合或窗口的状态处理。这些场景需要传统消费者组提供的分区亲和性。

共享组的工作原理

让我们简要地了解共享组的机制,以理解幕后发生了什么变化。

当我们在共享组中创建一个消费者时,它会连接到代理并请求记录。代理通过一个名为共享协调器(Share Coordinator)的组件来维护协调,该组件跟踪哪些记录已分配给哪些消费者。当消费者请求记录时,代理会从主题的分区中选择未分配的记录并将其发送给该消费者。这些记录现在处于“已获取”状态——已分配给该特定消费者进行处理。

已获取状态带有基于时间的锁(默认为30秒,可通过group.share.record.lock.duration.ms配置)。如果消费者在此超时时间内未确认记录,代理会自动将其返回到可用池中,供其他消费者处理。这种获取锁提供了自动故障恢复,无需在消费者崩溃或无响应时进行手动干预。

消费者处理记录并发送回确认。有三种可能的确认类型:ACCEPT(成功处理)、RELEASE(返回池中重试)和REJECT(标记为永久失败)。根据确认,代理更新记录的状态并继续。

这种协调发生在代理级别,这与传统消费者组的工作方式不同,传统消费者组中消费者直接跟踪它们的偏移量。

代理还提供内置的重试语义。每次将记录交付给消费者时,代理都会增加内部交付计数。默认情况下,在5次交付尝试(可通过group.share.delivery.attempt.limit配置)后,代理会将记录移动到存档状态。这为我们提供了“毒药消息”保护,而无需应用程序级别的重试逻辑,尽管我们仍然可以在需要更多控制时实现自己的重试策略。

共享组入门

Spring for Apache Kafka中共享组的编程模型与我们已知的模型保持一致。我们有两种主要方法来设置共享消费者:编程容器创建和注解驱动的监听器(使用@KafkaListener)。

我们首先配置一个ShareConsumerFactory而不是常规的ConsumerFactory

@Configuration
public class ShareConsumerConfig {

    @Bean
    public ShareConsumerFactory<String, String> shareConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                  StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                  StringDeserializer.class);
        return new DefaultShareConsumerFactory<>(props);
    }

    @Bean
    public ShareKafkaListenerContainerFactory<String, String>
            shareKafkaListenerContainerFactory(
                ShareConsumerFactory<String, String> shareConsumerFactory) {
        return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    }
}

此配置遵循我们用于传统消费者的相同工厂模式。我们定义一个创建共享消费者的工厂和一个管理监听器生命周期的容器工厂。Spring for Apache Kafka的抽象在两种消费模型中保持一致。

编程容器创建

我们可以通过编程方式创建一个容器并设置消息监听器

@Bean
public ShareKafkaMessageListenerContainer<String, String> imageProcessingContainer(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("image-processing");
    containerProps.setGroupId("image-processors");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            imageService.process(record.value());
            // Implicit ACCEPT when method completes successfully
        }
    });

    return container;
}

这使我们能够对容器创建和配置进行细粒度控制。我们使用主题和组ID创建一个ContainerProperties实例,使用工厂和属性实例化容器,并附加我们的消息监听器。

注解驱动的监听器

对于大多数用例,使用@KafkaListener的注解驱动方法提供了更清晰的编程模型

@KafkaListener(
    topics = "image-processing",
    groupId = "image-processors",
    containerFactory = "shareKafkaListenerContainerFactory"
)
public void processImage(String imageUrl) {
    // Process the image
    imageService.process(imageUrl);
    // Implicit ACCEPT when method completes successfully
}

containerFactory属性告诉Spring使用我们的ShareKafkaListenerContainerFactory,它创建一个共享消费者而不是传统消费者。groupId现在指的是共享组而不是消费者组,但注解结构保持不变。

当此方法成功完成时,Spring for Apache Kafka会自动向代理发送ACCEPT确认。如果抛出异常,它会发送REJECT,这会将记录标记为永久失败并阻止进一步的交付尝试。这种隐式确认模式适用于简单处理场景,其中成功或失败清晰地映射到方法完成或异常。如果您需要临时故障来触发重试(使用RELEASE),您将需要使用显式确认模式以进行更细粒度的控制。

用于细粒度控制的显式确认

有时我们需要对记录的确认方式进行更多控制。我们可能希望明确拒绝我们已知无效的“毒药消息”,或者我们可能需要在处理逻辑中的特定点进行确认,而不是在方法完成时进行。

我们可以在不同级别启用显式确认。最常见的方法是在工厂级别进行配置

@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
	return new DefaultShareConsumerFactory<>(props);
}

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
		ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
	// The factory will detect the explicit acknowledgment mode from the consumer factory configuration
	return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
}

启用显式确认后,我们的监听器方法会接收一个ShareAcknowledgment参数,该参数赋予我们直接控制权

@KafkaListener(
    topics = "payment-processing",
    groupId = "payment-processors",
    containerFactory = "shareKafkaListenerContainerFactory"
)
public void processPayment(PaymentEvent event,
                          ShareAcknowledgment acknowledgment) {
    try {
        if (!isValid(event)) {
            // Permanently reject invalid events
            acknowledgment.reject();
            return;
        }

        paymentService.process(event);
        acknowledgment.acknowledge();

    } catch (TransientException e) {
        // Release for retry with another consumer
        acknowledgment.release();
    } catch (PermanentException e) {
        // Don't retry unrecoverable errors
        acknowledgment.reject();
    }
}

三种确认类型使我们能够精确控制记录结果。调用acknowledge()会告诉代理记录已成功处理并可以存档。调用release()会将记录返回到池中供其他消费者处理,这对于临时故障(如临时网络问题或资源不可用)非常有用。调用reject()会将记录标记为永久失败并阻止进一步的交付尝试。

显式确认模式的一个重要限制是:在所有先前交付的记录都已确认之前,消费者无法轮询新记录。这可以防止未处理的记录压垮消费者,但也意味着我们必须确保每条记录都收到确认(接受、释放或拒绝),以避免阻塞消费者线程。Spring for Apache Kafka通过在记录未确认30秒后(可通过shareAcknowledgmentTimeout配置)记录警告来帮助调试。

请记住,每次release()都会增加代理的内部交付计数,因此即使消费者不断调用release(),代理最终也会在达到配置限制后存档记录。

通过并发进行扩展

传统Kafka消费者按顺序处理记录——每个消费者实例从其分配的分区中轮询记录并一次处理一个。当我们需要更高的并行性时,我们通常会添加更多的消费者实例,这通常意味着更多的应用程序实例或进程。

共享组启用了一种不同的扩展方法,因为记录级别分发消除了分区分配的限制。Spring for Apache Kafka利用这一点,直接向ShareKafkaMessageListenerContainer添加了并发支持。

我们可以在单个容器中配置多个消费者线程

@Bean
public ShareKafkaListenerContainerFactory<String, String>
        shareKafkaListenerContainerFactory(
            ShareConsumerFactory<String, String> shareConsumerFactory) {
    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    factory.setConcurrency(10); // 10 concurrent consumer threads
    return factory;
}

这会创建一个包含10个线程的容器,每个线程运行自己的共享消费者轮询循环。所有10个线程都从同一个共享组中拉取记录,并在同一个JVM中并发处理它们。如果我们在5个应用程序实例上运行此操作,那就是50个并发消费者处理记录流。

这种并发模型为我们提供了扩展的灵活性。我们可以通过增加并发性(每个实例更多线程)进行垂直扩展,或者通过添加更多实例进行水平扩展,或者两者兼而有之。对于需求可变的工作负载,我们可以调整并发设置或实例计数,而无需更改主题配置或担心分区重新平衡。

当前状态和兼容性

共享组在Kafka 4.0.0中作为早期访问功能引入,在Kafka 4.1.0中进入预览阶段,并预计在Kafka 4.2.0中达到生产就绪状态。Spring for Apache Kafka 4.0.0(随Spring Boot 4.0.0发布)支持Kafka 4.1.0版本中实现的共享组。

有一个重要的兼容性考虑:Kafka 4.0.0和4.1.0对于共享组不兼容。这些版本之间的协议有所演变,因此在使用共享组时,客户端和代理需要处于相同的次要版本。这在代理和客户端库可能在不同时间升级的环境中尤其重要。

总结

共享组通过添加记录级别分发作为分区级别分配的替代方案,扩展了Kafka的消费模型。这两种模型都服务于重要的目的——具有排序保证的传统消费者组对于有状态处理和事件排序仍然至关重要,而共享组则为独立事件的高吞吐量处理提供了优势。

关键在于将消费模型与我们的应用程序要求相匹配。当顺序很重要时,分区分配提供了我们所需的保证。当吞吐量和扩展灵活性比顺序更重要时,记录级别分发可以简化我们的架构和资源管理。

Spring for Apache Kafka实现支持KIP-932并增加了Spring特有的增强。@KafkaListener集成与我们用于传统消费者的编程模型保持一致。内置的并发支持提供了在单个应用程序实例中进行扩展的选项。超时检测和优雅关闭等功能有助于生产部署处理操作问题。

Spring for Apache Kafka 4.0.0通过与现有@KafkaListener模型保持一致,使共享组的使用感觉自然。我们可以增量地采用共享组,将其用于特定用例,同时继续将传统消费者组用于其他用例。这两种模型可以在同一个Spring for Apache Kafka应用程序中并存而不会发生冲突。

随着共享组在Kafka 4.2.0中趋向生产就绪,值得评估它们是否适合我们当前或计划中的任何用例。如果我们一直主要为了并行性而不是排序而预配大量分区,或者如果我们正在处理使容量规划变得困难的可变工作负载,那么共享组可能会提供一种更简单的方法。

有关Spring for Apache Kafka中共享组的更多详细信息,请查看参考文档

我们欢迎您在应用程序中探索共享组时提供反馈。如果您遇到问题或有改进建议,请在Spring for Apache Kafka GitHub存储库上提出问题。您的意见有助于我们在GA版本发布及以后改进框架。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有