服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - spring kafka框架中@KafkaListener 注解解读和使用案例

spring kafka框架中@KafkaListener 注解解读和使用案例

2023-02-28 12:18占星安啦 Java教程

Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一,这篇文章主要介绍了kafka @KafkaListener 注解解读,需要的朋友可以参考下

简介

Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用,也是目前最流行的消息队列系统之一。因此,也越来越多的框架对 kafka 做了集成,比如本文将要说到的 spring-kafka。

Kafka 既然作为一个消息发布订阅系统,就包括消息生成者和消息消费者。本文主要讲述的 spring-kafka 框架的 kafkaListener 注解的深入解读和使用案例。

解读

源码解读

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
 
@Retention(RetentionPolicy.RUNTIME)
 
@MessageMapping
 
@Documented
 
@Repeatable(KafkaListeners.class)
 
public @interface KafkaListener {
   /**
 
    * 消费者的id,当GroupId没有被配置的时候,默认id为GroupId
 
    */
 
   String id() default "";
   /**
 
    * 监听容器工厂,当监听时需要区分单数据还是多数据消费需要配置containerFactory      属性
 
    */
 
   String containerFactory() default "";
   /**
 
    * 需要监听的Topic,可监听多个,和 topicPattern 属性互斥
*/
 
   String[] topics() default {};
   /**
 
    * 需要监听的Topic的正则表达。和 topics,topicPartitions属性互斥
    */
 
   String topicPattern() default "";
   /**
 
    * 可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听,可配置该参数, 和 topicPattern 属性互斥
    */
 
   TopicPartition[] topicPartitions() default {};
   /**
 
    *侦听器容器组
 
    */
 
   String containerGroup() default "";
   /**
 
    * 监听异常处理器,配置BeanName
 
    */
 
   String errorHandler() default "";
   /**
 
    * 消费组ID
 
    */
 
   String groupId() default "";
   /**
 
    * id是否为GroupId
 
    */
 
   boolean idIsGroup() default true;
   /**
 
    * 消费者Id前缀
 
    */
 
   String clientIdPrefix() default "";
   /**
 
    * 真实监听容器的BeanName,需要在 BeanName前加 "__"
 
    */
 
   String beanRef() default "__listener";
}

使用案例

ConsumerRecord 类消费

使用 ConsumerRecord 类接收有一定的好处,ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。如果使用具体的类型接收消息体则更加方便,比如说用 String 类型去接收消息体。

这里我们编写一个 Listener 方法,监听 "topic1"Topic,并把 ConsumerRecord 里面所包含的内容打印到控制台中:

?
1
2
3
4
5
6
7
8
9
10
11
12
@Component
 
public class Listener {
    private static final Logger log = LoggerFactory.getLogger(Listener.class);
    @KafkaListener(id = "consumer", topics = "topic1")
 
    public void consumerListener(ConsumerRecord record) {
 
        log.info("topic.quick.consumer receive : " + record.toString());
 
    }
}

批量消费

批量消费在现实业务场景中是很有实用性的。因为批量消费可以增大 kafka 消费吞吐量, 提高性能。

批量消费实现步骤:

1、重新创建一份新的消费者配置,配置为一次拉取 10 条消息

2、创建一个监听容器工厂,命名为:batchContainerFactory,设置其为批量消费并设置并发量为 5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。

3、创建一个分区数为 8 的 Topic。

4、创建监听方法,设置消费 id 为 “batchConsumer”,clientID 前缀为“batch”,监听“batch”,使用“batchContainerFactory” 工厂创建该监听容器。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Component
 
public class BatchListener {
    private static final Logger log= LoggerFactory.getLogger(BatchListener.class);
    private Map consumerProps() {
 
        Map props = new HashMap<>();
 
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 
        //一次拉取消息数量
 
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 
                NumberDeserializers.IntegerDeserializer.class);
 
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 
                StringDeserializer.class);
 
        return props;
 
    }
    @Bean("batchContainerFactory")
 
    public ConcurrentKafkaListenerContainerFactory listenerContainer() {
 
        ConcurrentKafkaListenerContainerFactory container
 
                = new ConcurrentKafkaListenerContainerFactory();
 
        container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
 
        //设置并发量,小于或等于Topic的分区数
 
        container.setConcurrency(5);
 
        //必须 设置为批量监听
 
        container.setBatchListener(true);
 
        return container;
 
    }
    @Bean
 
    public NewTopic batchTopic() {
 
        return new NewTopic("topic.batch", 8, (short) 1);
 
    }
    @KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"
 
            ,topics = {"topic.batch"},containerFactory = "batchContainerFactory")
 
    public void batchListener(List data) {
 
        log.info("topic.batch  receive : ");
 
        for (String s : data) {
 
            log.info(  s);
 
        }
 
    }
 
}

监听 Topic 中指定的分区

使用 @KafkaListener 注解的 topicPartitions 属性监听不同的 partition 分区。

@TopicPartition:topic-- 需要监听的 Topic 的名称,partitions – 需要监听 Topic 的分区 id。

partitionOffsets – 可以设置从某个偏移量开始监听,@PartitionOffset:partition – 分区 Id,非数组,initialOffset – 初始偏移量。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Bean
 
public NewTopic batchWithPartitionTopic() {
 
    return new NewTopic("topic.batch.partition", 8, (short) 1);
 
}
@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",
 
        topicPartitions = {
 
                @TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),
 
                @TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},
 
                        partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))
 
        }
 
)
 
public void batchListenerWithPartition(List data) {
 
    log.info("topic.batch.partition  receive : ");
 
    for (String s : data) {
 
        log.info(s);
 
    }
 
}

注解方式获取消息头及消息体

当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式。。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为 List 即可,比如 List data , List key。

@Payload:获取的是消息的消息体,也就是发送内容

@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的 key

@Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的

@Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的 TopicName

@Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@KafkaListener(id = "params", topics = "topic.params")
 
public void otherListener(@Payload String data,
 
                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
 
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
 
                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
 
                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
 
    log.info("topic.params receive : \n"+
 
            "data : "+data+"\n"+
 
            "key : "+key+"\n"+
 
            "partitionId : "+partition+"\n"+
 
            "topic : "+topic+"\n"+
 
            "timestamp : "+ts+"\n"
 
    );
 
}

使用 Ack 机制确认消费

Kafka 是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,当第一条消息未被确认,而第二条消息被确认的时候,Kafka 会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。Kafka 的 ack 机制可以有效的确保消费不被丢失。因为自动提交是在 kafka 拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。

使用 Kafka 的 Ack 机制比较简单,只需简单的三步即可:

  • 设置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交
  • 设置 AckMode=MANUAL_IMMEDIATE
  • 监听方法加入 Acknowledgment ack 参数

4.使用 Consumer.seek 方法,可以指定到某个偏移量的位置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Component
 
public class AckListener {
 
    private static final Logger log = LoggerFactory.getLogger(AckListener.class);
    private Map consumerProps() {
 
        Map props = new HashMap<>();
 
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
 
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
 
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
        return props;
 
    }
    @Bean("ackContainerFactory")
 
    public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
 
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
 
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
 
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
 
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
 
        return factory;
 
    }
    @KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")
 
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
 
        log.info("topic.quick.ack receive : " + record.value());
 
        ack.acknowledge();
 
    }
 
}

解决重复消费

上一节中使用 ack 手动提交偏移量时,假如 consumer 挂了重启,那它将从 committed offset 位置开始重新消费,而不是 consume offset 位置。这也就意味着有可能重复消费。

在 0.9 客户端中,有 3 种 ack 策略:

策略 1: 自动的,周期性的 ack。

策略 2:consumer.commitSync(),调用 commitSync,手动同步 ack。每处理完 1 条消息,commitSync 1 次。

策略 3:consumer. commitASync(),手动异步 ack。、

那么使用策略 2,提交每处理完 1 条消息,就发送一次 commitSync。那这样是不是就可以解决 “重复消费” 了呢?如下代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
while (true) {
 
        List buffer = new ArrayList<>();
 
        ConsumerRecords records = consumer.poll(100);
 
        for (ConsumerRecord record : records) {
 
            buffer.add(record);
 
        }
 
        insertIntoDb(buffer);    //消除处理,存到db
 
        consumer.commitSync();   //同步发送ack
 
        buffer.clear();
 
    }
 
}

答案是否定的!因为上面的 insertIntoDb 和 commitSync 做不到原子操作:如果在数据处理完成,commitSync 的时候挂了,服务器再次重启,消息仍然会重复消费。

     那么如何解决重复消费的问题呢?答案是自己保存 committed offset,而不是依赖 kafka 的集群保存 committed offset,把消息的处理和保存 offset 做成一个原子操作,并且对消息加入唯一 id, 进行判重。

依照官方文档, 要自己保存偏移量, 需要:

  • enable.auto.commit=false, 禁用自动 ack。
  • 每次取到消息,把对应的 offset 存下来。
  • 下次重启,通过 consumer.seek 函数,定位到自己保存的 offset,从那开始消费。
  • 更进一步处理可以对消息加入唯一 id, 进行判重。

到此这篇关于kafka @KafkaListener 注解解读的文章就介绍到这了,更多相关@KafkaListener 注解内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/u012796085/article/details/118273689

延伸 · 阅读

精彩推荐
  • Java教程java之swing表格实现方法

    java之swing表格实现方法

    这篇文章主要介绍了java之swing表格实现方法,以实例形式分析了swing构建表格的方法,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    cj_gameboy4092020-01-05
  • Java教程Java由浅入深细数数组的操作下

    Java由浅入深细数数组的操作下

    数组对于每一门编程语言来说都是重要的数据结构之一,当然不同语言对数组的实现及处理也不尽相同。Java 语言中提供的数组是用来存储固定大小的同类...

    星鸦wyk11472022-11-24
  • Java教程Java中流的有关知识点详解

    Java中流的有关知识点详解

    今天小编就为大家分享一篇关于Java中流的有关知识点详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看...

    mumu199810552021-07-14
  • Java教程iReport使用指南及常见功能示例详解

    iReport使用指南及常见功能示例详解

    这篇文章主要介绍了iReport使用指南及常见功能,本文以iReport 3.5.1为例,通过示例演示给大家介绍的非常详细,需要的朋友可以参考下...

    GGdido6882022-03-01
  • Java教程Spring自带的校验框架Validation的使用实例

    Spring自带的校验框架Validation的使用实例

    今天小编就为大家分享一篇关于Spring自带的校验框架Validation的使用实例,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起...

    Satisfy_5554782021-07-25
  • Java教程IDEA2019.3在Plugins中搜索不到translation的解决

    IDEA2019.3在Plugins中搜索不到translation的解决

    这篇文章主要介绍了IDEA2019.3在Plugins中搜索不到translation的解决,文中通过图文的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们...

    脚印3922020-06-15
  • Java教程基于SpringBoot使用MyBatis插件的问题

    基于SpringBoot使用MyBatis插件的问题

    MyBatis-Plus并不能为我们解决所有问题,例如一些复杂的SQL,多表联查,我们就需要自己去编写代码和SQL语句,我们该如何快速的解决这个问题呢,这个时候...

    HDLaZy10802022-10-27
  • Java教程Java数据结构与算法之稀疏数组与队列深入理解

    Java数据结构与算法之稀疏数组与队列深入理解

    这篇文章主要介绍了Java数据结构与算法之稀疏数组与队列深入理解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋...

    威斯布鲁克.猩猩6482021-12-29