服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

2023-02-28 12:16russle Java教程

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况,这篇文章主要介绍了Spring Boot 中使用@KafkaListener并发批量接收消息,需要的朋友可以参考下

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用)。

完整的代码在这里,欢迎加星号、fork。

官方文档在https://docs.spring.io/spring-kafka/reference/html/_reference.html

###第一步,并发消费###
先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

?
1
2
3
4
5
6
7
8
9
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(4);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

###第二步,批量消费###
然后是批量消费。重点是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是"The maximum number of records returned in a single call to poll().", 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是"The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. ";

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Bean
 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(consumerFactory());
     factory.setConcurrency(4);
     factory.setBatchListener(true);
     factory.getContainerProperties().setPollTimeout(3000);
     return factory;
 }
 
@Bean
 public Map<String, Object> consumerConfigs() {
     Map<String, Object> propsMap = new HashMap<>();
     propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
     propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
     propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
     propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
     propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
     propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
     propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
     propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
     return propsMap;
 }

启动日志截图

Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

关于max.poll.records和max.poll.interval.ms官方解释截图:

Spring Boot 中使用@KafkaListener并发批量接收消息的完整代码

###第三步,分区消费###
对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MyListener {
    private static final String TPOIC = "topic02";
 
    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());
 
        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }
 
    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());
 
        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

关于分区和消费者关系,后面会补充,先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,总结,如果我们的topic有多个分区,经过以上步骤可以很好的加快消息消费。如果只有一个分区,因为已经有一个同名group id在消费了,新启动的一个基本上没有作用(本人测试结果)。

具体代码在这里,欢迎加星号,fork。

到此这篇关于Spring Boot 中使用@KafkaListener并发批量接收消息的文章就介绍到这了,更多相关Spring Boot 使用@KafkaListener并发批量接收消息内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/russle/article/details/81258590

延伸 · 阅读

精彩推荐
  • Java教程java之swing表格实现方法

    java之swing表格实现方法

    这篇文章主要介绍了java之swing表格实现方法,以实例形式分析了swing构建表格的方法,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    cj_gameboy4092020-01-05
  • Java教程iReport使用指南及常见功能示例详解

    iReport使用指南及常见功能示例详解

    这篇文章主要介绍了iReport使用指南及常见功能,本文以iReport 3.5.1为例,通过示例演示给大家介绍的非常详细,需要的朋友可以参考下...

    GGdido6882022-03-01
  • Java教程Spring自带的校验框架Validation的使用实例

    Spring自带的校验框架Validation的使用实例

    今天小编就为大家分享一篇关于Spring自带的校验框架Validation的使用实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起...

    Satisfy_5554782021-07-25
  • Java教程Java由浅入深细数数组的操作下

    Java由浅入深细数数组的操作下

    数组对于每一门编程语言来说都是重要的数据结构之一,当然不同语言对数组的实现及处理也不尽相同。Java 语言中提供的数组是用来存储固定大小的同类...

    星鸦wyk11472022-11-24
  • Java教程Java中流的有关知识点详解

    Java中流的有关知识点详解

    今天小编就为大家分享一篇关于Java中流的有关知识点详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看...

    mumu199810552021-07-14
  • Java教程Java数据结构与算法之稀疏数组与队列深入理解

    Java数据结构与算法之稀疏数组与队列深入理解

    这篇文章主要介绍了Java数据结构与算法之稀疏数组与队列深入理解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...

    威斯布鲁克.猩猩6482021-12-29
  • Java教程IDEA2019.3在Plugins中搜索不到translation的解决

    IDEA2019.3在Plugins中搜索不到translation的解决

    这篇文章主要介绍了IDEA2019.3在Plugins中搜索不到translation的解决,文中通过图文的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们...

    脚印3922020-06-15
  • Java教程基于SpringBoot使用MyBatis插件的问题

    基于SpringBoot使用MyBatis插件的问题

    MyBatis-Plus并不能为我们解决所有问题,例如一些复杂的SQL,多表联查,我们就需要自己去编写代码和SQL语句,我们该如何快速的解决这个问题呢,这个时候...

    HDLaZy10802022-10-27