服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 详解SpringBoot整合RabbitMQ如何实现消息确认

详解SpringBoot整合RabbitMQ如何实现消息确认

2022-12-24 16:20IT利刃出鞘 Java教程

这篇文章主要介绍了SpringBoot整合RabbitMQ是如何实现消息确认的,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

简介

本文介绍SpringBoot整合RabbitMQ如何进行消息的确认。

生产者消息确认

介绍

发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递。

如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出;如果是镜像队列,所有镜像接受成功后发确认消息。

流程

  • 如果消息没有到达exchange,则confirm回调,ack=false
  • 如果消息到达exchange,则confirm回调,ack=true
  • exchange到queue成功,则不回调return
  • exchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,这样消息就丢了)

配置

application.yml

?
1
2
3
4
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true

ConfirmCallback

ConfirmCallback:消息只要被 RabbitMQ broker 接收到就会触发confirm方法。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>发送到broker失败\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>发送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
}

correlationData:对象内部有id (消息的唯一性)和Message。

(若ack为false,则Message不为null,可将Message数据 重新投递;若ack是true,则correlationData为null)

ack:消息投递到exchange 的状态,true表示成功。

cause:表示投递失败的原因。 (若ack为false,则cause不为null;若ack是true,则cause为null)

给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败时可以知道哪个消息失败。

?
1
2
3
4
5
6
7
8
9
10
public void send(String dataId, String exchangeName, String rountingKey, String message){
    CorrelationData correlationData = new CorrelationData();
    correlationData.setId(dataId);
 
    rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
}
 
public String receive(String queueName){
    return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
}

2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback。

?
1
2
3
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

ReturnCallback

ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。

若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

注意:需要rabbitTemplate.setMandatory(true);

当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。

代码:

?
1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。 

注册ConfirmCallback和ReturnCallback

整合后的写法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.example.config;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.PostConstruct;
 
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }
 
    // 下边这样写也可以   
    // @Autowired
    // private RabbitTemplate rabbitTemplate;
    // @PostConstruct
    // public void init() {
    //     rabbitTemplate.setMandatory(true);
    //     rabbitTemplate.setReturnCallback(this);
    //     rabbitTemplate.setConfirmCallback(this);
    // }
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>发送到broker失败\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        } else {
            log.info("confirm==>发送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                            correlationData, ack, cause);
        }
    }
 
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                        message, replyCode, replyText, exchange, routingKey);
    }
}

消费者消息确认

介绍

确认方式 简介 详述
auto(默认) 根据消息消费的情况,智能判定 若消费者抛出异常,则mq不会收到确认消息,mq会一直此消息发出去
若消费者没有抛出异常,则mq会收到确认消息,mq不会再次将此消息发出去。
若消费者在消费时所在服务挂了,mq不会再次将此消息发出去。
none mq发出消息后直接确认消息  
manual 消费端手动确认消息 消费者调用 ack、nack、reject 几种方法进行确认,可以在业务失败后进行一些操作,如果消息未被 ACK 则消息还会存在于MQ,mq会一直将此消息发出去。
如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限。

只要消息没有被消费者确认(包括没有自动确认),会导致消息一直被失败消费,死循环导致消耗大量资源。正确的处理方式是:发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。

消息确认三种方式配置方法

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring.rabbitmq.listener.direct.acknowledge-mode=manual

手动确认三种方式

basicAck,basicNack,basicReject

basicAck

含义

表示成功确认,使用此回执方法后,消息会被RabbitMQ broker 删除。

函数原型

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag

  • 消息投递序号
  • 每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。

multiple

  • 是否批量确认
  • 值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

示例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

实例

?
1
2
3
4
@RabbitHandler
public void process(String content, Channel channel, Message message){
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

basicNack

含义

表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

函数原型

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:表示消息投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 消息将重新入队列。

basicReject

含义

拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

函数原型

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:表示消息投递序号。
  • requeue:值为 true 消息将重新入队列。

以上就是详解SpringBoot整合RabbitMQ如何实现消息确认的详细内容,更多关于SpringBoot RabbitMQ消息确认的资料请关注服务器之家其它相关文章!

原文链接:https://blog.csdn.net/feiying0canglang/article/details/124955565

延伸 · 阅读

精彩推荐
  • Java教程Java ArrayList 实现实例讲解

    Java ArrayList 实现实例讲解

    ArrayList是基于数组实现的,是一个动态数组,其容量能自动增长,类似于C语言中的动态申请内存,动态增长内存。这篇文章主要介绍了java ArrayList 实现的相...

    有女朋友的程序猿4762020-06-30
  • Java教程Java详解使用线程池处理任务方法

    Java详解使用线程池处理任务方法

    java中经常需要用到多线程来处理,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上...

    遇安.11210012022-12-05
  • Java教程一篇文章带你入门Java变量

    一篇文章带你入门Java变量

    这篇文章主要介绍了Java变量,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习...

    编程界明世隐11542021-11-08
  • Java教程浅析JAVA 循环结构

    浅析JAVA 循环结构

    这篇文章主要介绍了JAVA 循环结构的相关资料,文中讲解的非常细致,示例代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下...

    菜鸟教程4622020-07-19
  • Java教程Java导出oracle表结构实例详解

    Java导出oracle表结构实例详解

    这篇文章主要介绍了 Java导出oracle表结构实例详解的相关资料,需要的朋友可以参考下 ...

    Java之家3242020-08-23
  • Java教程完美解决Java中的线程安全问题

    完美解决Java中的线程安全问题

    下面小编就为大家带来一篇完美解决Java中的线程安全问题。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    Java之家4002020-12-12
  • Java教程Java多线程并发开发之DelayQueue使用示例

    Java多线程并发开发之DelayQueue使用示例

    这篇文章主要为大家详细介绍了Java多线程并发开发之DelayQueue使用示例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    生活半篇记6092021-01-06
  • Java教程深入探讨JAVA中的异常与错误处理

    深入探讨JAVA中的异常与错误处理

    这篇文章详细介绍了JAVA中的异常与错误处理,有需要的朋友可以参考一下 ...

    java技术网4492019-10-14