深入Spring Kafka:消费者是如何创建的?

首页 编程分享 PHP丨JAVA丨OTHER 正文

杨同学technotes 转载 编程分享 2025-05-10 22:05:32

简介 Spring Kafka 在原生客户端基础上进行了深度封装,通过声明式注解显著简化了开发流程。这种简洁的语法背后,Spring Kafka 是如何创建这些消费者的呢?


在 Java 生态中,Apache Kafka 通过 kafka-clients.jar 提供了原生客户端支持。开发者需要手动创建 KafkaConsumer 实例并订阅指定主题(Topic)来实现消息消费。典型实现如下:

public void pollMessages() {
    // 1. 初始化消费者实例
    Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerConfig());
    // 2. 订阅主题并设置重平衡监听器
    consumer.subscribe(Collections.singleton(topic), new RebalanceListener());
    // 3. 轮询获取消息(超时时间1秒)
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    // 4. 同步提交偏移量
    consumer.commitSync();
}

Spring Kafka 在原生客户端基础上进行了深度封装,通过声明式注解显著简化了开发流程。例如,只需使用 @KafkaListener 注解即可实现消息监听:

@KafkaListener(id = "orderService", topics = "order.topic")
public void handleOrderEvent(ConsumerRecord<String, String> record) {
    // 业务处理逻辑
}

这种简洁的语法背后,Spring Kafka 实际上构建了一套完整的消费者(Consumer)管理机制。那么问题来了:Spring Kafka 是如何创建这些消费者的呢?

本文源码版本:spring-kafka v2.6.6

一、使用Spring Kafka消费消息

首先,我们通过一个完整的项目集成示例,具体说明其实现步骤。项目里要接入 Spring Kafka,通常需要经过以下几个步骤。

第一步:引入依赖

需在项目中声明 Spring Kafka Starter 依赖。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.6</version>
</dependency>

第二步:消费者配置

配置类上添加 @EnableKafka 注解,并初始化 ConcurrentKafkaListenerContainerFactory Bean,这是最常见的使用方式。

@Configuration
@EnableKafka
public class Config {
​
   /** 消费者工厂 */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(configs);
    }
    
    /** 监听容器工厂 */
    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String>
                        kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3); // 设置消费者线程数
        return factory;
    }
}

第三步:实现消息监听

在业务层方法上添加 @KafkaListener 注解,实现消息监听。

@Service
public class OrderMessageListener {
    @KafkaListener(id = "orderService", topics = "order.topic")
    public void handleOrderEvent(ConsumerRecord<String, String> record) {
        // 业务处理逻辑
    }
}

至此,我们已经完成 Spring Kafka 的基础集成。接下来将深入分析@KafkaListener注解背后的消费者创建过程,揭示 Spring 是如何构建 KafkaConsumer 实例的。

二、消费者的初始化过程

基于上面示例,我们以 @EnableKafka 注解为切入点,源码如下:

@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}
​
@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[] { KafkaBootstrapConfiguration.class.getName() };
    }
}

该注解的核心作用是通过KafkaBootstrapConfiguration向 Spring 容器注册两个关键 Bean。注册的核心 Bean 如下所示:

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        // 省略无关代码
        
        // 注册注解处理器
        // beanName: org.springframework.kafka.config.internalKafkaListenerAnnotationProcessor
        registry.registerBeanDefinition(
            KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
            new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
        
        // 注册监听器容器注册表
        // beanName: org.springframework.kafka.config.internalKafkaListenerEndpointRegistry
        registry.registerBeanDefinition(
            KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
            new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
    }
}

注解处理器(KafkaListenerAnnotationBeanPostProcessor)负责扫描和解析@KafkaListener及其派生注解,并将监听方法转换为可执行的端点描述符(KafkaListenerEndpointDescriptor)。

容器注册表(KafkaListenerEndpointRegistry)作为所有消息监听容器的中央仓库,实现了生命周期管理(启动/停止容器)。

代码阅读小记:

切入点: @EnableKafka
-> KafkaListenerConfigurationSelector
-> KafkaBootstrapConfiguration
 [注册Bean: KafkaListenerAnnotationBeanPostProcessor]
 [注册Bean: KafkaListenerEndpointRegistry]

接下来,我们就重点剖析一下这两个 Bean。

2.1 消费者注册流程剖析

1、注解扫描阶段

首先来看第一个 Bean: KafkaListenerAnnotationBeanPostProcessor,它通过 Spring 后置处理器机制(postProcessAfterInitialization)实现了注解扫描:

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
        implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        // 省略无关代码
        
        // 使用 MethodIntrospector 进行元数据查找
        // 查找被 @KafkaListener (及其派生注解) 标记的方法
        Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                    Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                    return (!listenerMethods.isEmpty() ? listenerMethods : null);
                });
        
        // 处理每个找到的监听方法
        for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
            Method method = entry.getKey();
            for (KafkaListener listener : entry.getValue()) {
                processKafkaListener(listener, method, bean, beanName);
            }
        }
        return bean;
    }
}

上述代码有两个关键点,第一是通过MetadataLookup支持派生注解;第二是处理 @KafkaListener 监听方法。

什么是MetadataLookup呢?

举个例子,我们定义了一个新的注解 @EventHandler ,并在该注解上标记 @KafkaListener

@KafkaListener
public @interface EventHandler {
    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String value();
    // 其他属性映射...
}

这种设计使得业务注解(如@EventHandler)可以透明地继承@KafkaListener的全部功能。

@Service
public class OrderMessageListener {
    @EventHandler("order.topic")
    public void handleOrderEvent(ConsumerRecord<String, String> record) {
        // 业务处理逻辑
    }
}

2、端点注册阶段

我们继续处理 KafkaListener 代码跟踪,现在来到了 KafkaListenerAnnotationBeanPostProcessorprocessListener() 方法。

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
        implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    // KafkaListener 注册器
    private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
    
    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {
        // 设置端点属性
        endpoint.setBean(bean);
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setTopics(resolveTopics(kafkaListener));
        // 委托注册器进行注册
        this.registrar.registerEndpoint(endpoint, factory);
    }
}

@KafkaListener 标记的方法会被封装为 MethodKafkaListenerEndpoint ,并由注册器 KafkaListenerEndpointRegistrar 进行注册,注册器内部维护了一个端点描述符列表:

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    private final List<KafkaListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
    
    public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        // 省略无关代码
        KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
        this.endpointDescriptors.add(descriptor);
    }
}

由此可见,KafkaListener 会被注册到 List 集合中。

代码阅读小记:

-> KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization()
-> processKafkaListener()
-> processListener()
-> KafkaListenerEndpointRegistrar#registerEndpoint()
-> endpointDescriptors [注册到容器里List]

到这里,BeanPostProcessorpostProcessAfterInitialization 方法已经执行完了,程序完成了 KafkaListener 的注册并存储至 endpointDescriptors 中。

3、容器实例化阶段

当所有 Bean 初始化完成后,接下来会通过afterSingletonsInstantiated 触发最终注册:

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
        implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    // KafkaListener 注册器
    private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
    
    @Override
    public void afterSingletonsInstantiated() {
        // 注册所有 KafkaListener
        this.registrar.afterPropertiesSet();
    }
}

注册器 KafkaListenerEndpointRegistrar 的注册逻辑如下。

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    
    private KafkaListenerEndpointRegistry endpointRegistry;
    
    @Override
    public void afterPropertiesSet() {
        registerAllEndpoints();
    }
​
    protected void registerAllEndpoints() {
        // 注册到注册表当中
        for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
            this.endpointRegistry.registerListenerContainer(
                    descriptor.endpoint, resolveContainerFactory(descriptor));
        }
    }
}

可见,注册器最终委托给了注册表处理,注册表中由一个 ConcurrentHashMap 进行保存。

public class KafkaListenerEndpointRegistry implements DisposableBean, 
        SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
            
    private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();
    
    // 注册表的注册逻辑
    public void registerListenerContainer(KafkaListenerEndpoint endpoint, 
                                          KafkaListenerContainerFactory<?> factory) {
        registerListenerContainer(endpoint, factory, false);
    }
    
    public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
            boolean startImmediately) {
        String id = endpoint.getId();
        // 通过工厂创建, 最终创建出来的 ConcurrentMessageListenerContainer
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
    }
}

在示例中,我们配置的是 ConcurrentKafkaListenerContainerFactory 来创建 KafkaListener 容器的,因此这里往注册表(KafkaListenerEndpointRegistry)里添加的是 ConcurrentMessageListenerContainer 对象实例。

代码阅读小记:

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated()
-> KafkaListenerEndpointRegistrar#afterPropertiesSet()
-> registerAllEndpoints()
-> KafkaListenerEndpointRegistry#registerListenerContainer()
-> [listenerContainers] [注册到容器里Map]

2.2 消费者启动机制

1、并发监听容器

再来看第二个 Bean: KafkaListenerEndpointRegistry。它实现了 Spring 生命周期 SmartLifecycle 接口,在程序启动时,会调用它的 start 方法。

public class KafkaListenerEndpointRegistry implements DisposableBean, 
        SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
            
    @Override
    public void start() {
        // ConcurrentMessageListenerContainer 实例
        for (MessageListenerContainer listenerContainer : getListenerContainers()) {
            startIfNecessary(listenerContainer);
        }
    }
    
    private void startIfNecessary(MessageListenerContainer listenerContainer) {
        if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
            listenerContainer.start();
        }
    }
}

注册表(KafkaListenerEndpointRegistry)维护的容器(MessageListenerContainer)实例分为两类:

  • ConcurrentMessageListenerContainer:多线程容器
  • KafkaMessageListenerContainer:单线程容器

ConcurrentMessageListenerContainer内部通过创建多个单线程容器实现并发:

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
​
    @Override
    protected void doStart() {
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> container =
                    constructContainer(containerProperties, topicPartitions, i);
            // 启动每个子容器
            container.start();
        }
    }
}

可见,ConcurrentMessageListenerContainer 通过委托给多个KafkaMessageListenerContainer实例从而实现多线程消费。

2、底层消费者创建

最终我们在 KafkaMessageListenerContainer 的内部类 ListenerConsumer 中发现了 kafka-clients.jar 中的 Consumer 接口类。它的创建过程是由 ConsumerFactory 代为创建,ConsumerFactory 是一个接口类,它只有一个实现:DefaultKafkaConsumerFactory

public class KafkaMessageListenerContainer<K, V>  extends AbstractMessageListenerContainer<K, V> {
    
    private volatile ListenerConsumer listenerConsumer;
    
    @Override
    protected void doStart() {
        this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    }
    
    private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
        // Consumer 是 kafka-clients.jar 中的接口类
        private final Consumer<K, V> consumer;
        // ConsumerFactory 是一个接口,只有一个实现类 DefaultKafkaConsumerFactory
        protected final ConsumerFactory<K, V> consumerFactory;
        
        ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
            Properties consumerProperties = propertiesFromProperties();
            this.consumer = this.consumerFactory.createConsumer(
                            this.consumerGroupId,
                            this.containerProperties.getClientId(),
                            KafkaMessageListenerContainer.this.clientIdSuffix,
                            consumerProperties);
            // 监听 Topic
            subscribeOrAssignTopics(this.consumer);
        }
    }
}

Consumer 消费者的创建代码如下。

public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
        implements ConsumerFactory<K, V>, BeanNameAware {
​
    protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
        return createRawConsumer(configProps);
    }
    protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
         // KafkaConsumer 是 kafka-clients.jar 中的类
        return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
                this.valueDeserializerSupplier.get());
    }
}

代码阅读小记:

KafkaListenerEndpointRegistry#start()
-> AbstractMessageListenerContainer#start()
-> ConcurrentMessageListenerContainer#doStart() (concurrency不能大于partitions)
-> KafkaMessageListenerContainer#start() -> doStart()
-> DefaultKafkaConsumerFactory#createRawConsumer()

三、总结

总结一下上文中各部分的代码阅读小记,得到如下代码链路:

切入点: @EnableKafka
-> KafkaListenerConfigurationSelector
-> KafkaBootstrapConfiguration
    [注册Bean:KafkaListenerAnnotationBeanPostProcessor]
    [注册Bean:KafkaListenerEndpointRegistry]
-> KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization()
-> processKafkaListener()
-> processListener()
-> KafkaListenerEndpointRegistrar#registerEndpoint()
-> endpointDescriptors [注册到容器里List]
(===== 此时,程序里已经有endpointDescriptor了 =====)
​
(===== 开始遍历endpointDescriptors =====)
KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated()
-> KafkaListenerEndpointRegistrar#afterPropertiesSet()
-> registerAllEndpoints()
-> KafkaListenerEndpointRegistry#registerListenerContainer()
-> [listenerContainers] [注册到容器里Map]
​
(===== 开始启动监听 =====)
KafkaListenerEndpointRegistry#start()
-> AbstractMessageListenerContainer#start()
-> ConcurrentMessageListenerContainer#doStart() (concurrency不能大于partitions)
-> KafkaMessageListenerContainer#start() -> doStart()
-> DefaultKafkaConsumerFactory#createRawConsumer()

封面

相关文章

转载链接:https://juejin.cn/post/7501264245377876007


Tags:


本篇评论 —— 揽流光,涤眉霜,清露烈酒一口话苍茫。


    声明:参照站内规则,不文明言论将会删除,谢谢合作。


      最新评论




ABOUT ME

Blogger:袅袅牧童 | Arkin

Ido:PHP攻城狮

WeChat:nnmutong

Email:nnmutong@icloud.com

标签云