服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - spring-Kafka中的@KafkaListener深入源码解读

spring-Kafka中的@KafkaListener深入源码解读

2023-02-28 12:19柏油 Java教程

本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程,从spring启动开始处理@KafkaListener,本文结合实例流程图给大家讲解的非常详细,需要的朋友参考下

前言

本文主要通过深入了解源码,梳理从spring启动到真正监听kafka消息的这套流程

一、总体流程

从spring启动开始处理@KafkaListener,到start消息监听整体流程图

spring-Kafka中的@KafkaListener深入源码解读

二、源码解读

1、postProcessAfterInitialization

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        
        // 扫描@KafkaListener注解
        Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
        
        ......
        
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(bean.getClass());
            this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
        }
        else {
            // Non-empty set of methods
            for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                Method method = entry.getKey();
                // 遍历扫描到的所有@KafkaListener注解并开始处理
                for (KafkaListener listener : entry.getValue()) {
                    processKafkaListener(listener, method, bean, beanName);
                }
            }
            this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                        + beanName + "': " + annotatedMethods);
        }
        // 处理在类上的@KafkaListener注解
        if (hasClassLevelListeners) {
            processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
        }
    }
    return bean;
}

1.1、processKafkaListener

KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

?
1
2
3
4
5
6
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    Method methodToUse = checkProxy(method, bean);
    MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
    endpoint.setMethod(methodToUse);
    processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}

1.2、processListener

KafkaListenerAnnotationBeanPostProcessor#processListener

将每个kafkaListener转变成MethodKafkaListenerEndpoint并注册到KafkaListenerEndpointRegistrar容器,方便后续统一启动监听

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
        Object bean, Object adminTarget, String beanName) {
 
    String beanRef = kafkaListener.beanRef();
    if (StringUtils.hasText(beanRef)) {
        this.listenerScope.addListener(beanRef, bean);
    }
    endpoint.setBean(bean);
    endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    endpoint.setId(getEndpointId(kafkaListener));
    endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
    endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
    endpoint.setTopics(resolveTopics(kafkaListener));
    endpoint.setTopicPattern(resolvePattern(kafkaListener));
    endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
    String group = kafkaListener.containerGroup();
 
    ......
  
    // 注册已经封装好的消费端-endpoint
    this.registrar.registerEndpoint(endpoint, factory);
    
    if (StringUtils.hasText(beanRef)) {
        this.listenerScope.removeListener(beanRef);
    }
}

1.3、registerEndpoint

KafkaListenerEndpointRegistrar#registerEndpoint

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    
    ......
    
    KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
    synchronized (this.endpointDescriptors) {
        // 如果到了需要立即启动监听的阶段就直接注册并监听(也就是创建消息监听容器并启动)
        if (this.startImmediately) { // Register and start immediately
            this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                    resolveContainerFactory(descriptor), true);
        }
        else {
            // 一般情况都先走这一步,添加至此列表,待bean后续的生命周期 统一注册并启动
            this.endpointDescriptors.add(descriptor);
        }
    }
}
 
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
        boolean startImmediately) {
 
    ......
    
    synchronized (this.listenerContainers) {
    
        ......
        
        // 1.创建消息监听容器
        MessageListenerContainer container = createListenerContainer(endpoint, factory);
        this.listenerContainers.put(id, container);
        if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
            List<MessageListenerContainer> containerGroup;
            if (this.applicationContext.containsBean(endpoint.getGroup())) {
                containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
            }
            else {
                containerGroup = new ArrayList<MessageListenerContainer>();
                this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
            }
            containerGroup.add(container);
        }
        
        // 2.是否立即启动消息监听
        if (startImmediately) {
            startIfNecessary(container);
        }
    }
}

1.4、startIfNecessary

KafkaListenerEndpointRegistry#startIfNecessary
启动消息监听

?
1
2
3
4
5
6
7
8
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
        // 启动消息监听
        // 到这一步之后,消息监听以及处理都是KafkaMessageListenerContainer的逻辑
        // 到此也就打通了@KafkaListener到MessageListenerContainer消息监听容器的逻辑
        listenerContainer.start();
    }
}

2、afterSingletonsInstantiated

这一步是实例化(此处的实例化是已经创建对象并完成了初始化操作)之后,紧接着的操作

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void afterSingletonsInstantiated() {
    this.registrar.setBeanFactory(this.beanFactory);
 
    // 对"注册员"信息的完善
    if (this.beanFactory instanceof ListableBeanFactory) {
        Map<String, KafkaListenerConfigurer> instances =
                ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
        for (KafkaListenerConfigurer configurer : instances.values()) {
            configurer.configureKafkaListeners(this.registrar);
        }
    }
 
    if (this.registrar.getEndpointRegistry() == null) {
        if (this.endpointRegistry == null) {
            Assert.state(this.beanFactory != null,
                    "BeanFactory must be set to find endpoint registry by bean name");
            this.endpointRegistry = this.beanFactory.getBean(
                    KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                    KafkaListenerEndpointRegistry.class);
        }
        this.registrar.setEndpointRegistry(this.endpointRegistry);
    }
 
    ......
 
    // Actually register all listeners
    // 整个方法这里才是关键
    // 创建MessageListenerContainer并注册
    this.registrar.afterPropertiesSet();
}

2.1、afterPropertiesSet

KafkaListenerEndpointRegistrar#afterPropertiesSet

?
1
2
3
public void afterPropertiesSet() {
    registerAllEndpoints();
}

2.2、registerAllEndpoints

KafkaListenerEndpointRegistrar#registerAllEndpoints

?
1
2
3
4
5
6
7
8
9
10
11
protected void registerAllEndpoints() {
    synchronized (this.endpointDescriptors) {
        for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
            // 这里是真正的创建ListenerContainer监听对象并注册
            this.endpointRegistry.registerListenerContainer(
                    descriptor.endpoint, resolveContainerFactory(descriptor));
        }
        // 启动时所有消息监听对象都注册之后,便将参数置为true
        this.startImmediately = true// trigger immediate startup
    }
}

总结

以上便是整个流程,总体感觉就是将kafka消息监听融入到spring生命周期中,并完美契合

调试及相关源码版本:

?
1
2
org.springframework.boot::2.3.3.RELEASE
spring-kafka:2.5.4.RELEASE

 

 

相关参考:

 

spring-kafka官方文档
spring容器之refresh方法

到此这篇关于spring-Kafka中的@KafkaListener深入源码解读的文章就介绍到这了,更多相关spring-Kafka @KafkaListener内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/ldw201510803006/article/details/115578280

延伸 · 阅读

精彩推荐