服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - SpringBoot+RabbitMQ实现消息可靠传输详解

SpringBoot+RabbitMQ实现消息可靠传输详解

2022-12-24 15:59小码code Java教程

消息的可靠传输是面试必问的问题之一,保证消息的可靠传输主要在生产端开启 comfirm 模式,RabbitMQ 开启持久化,消费端关闭自动 ack 模式。本文将详解SpringBoot整合RabbitMQ如何实现消息可靠传输,需要的可以参考一下

环境配置

SpringBoot整合RabbitMQ实现消息的发送。

1.添加maven依赖

       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
      </dependency>

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>

2.添加 application.yml 配置文件

spring:
rabbitmq:
  host: 192.168.3.19
  port: 5672
  username: admin
  password: xxxx

3.配置交换机、队列以及绑定

    @Bean
  public DirectExchange myExchange() {
      DirectExchange directExchange = new DirectExchange("myExchange");
      return directExchange;
  }

  @Bean
  public Queue myQueue() {
      Queue queue = new Queue("myQueue");
      return queue;
  }

  @Bean
  public Binding binding() {
      return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
  }

4.生产发送消息

    @Autowired
  private RabbitTemplate rabbitTemplate;

  @GetMapping("/send")
  public String send(String message) {
      rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);
      System.out.println("【发送消息】" + message)
      return "【send message】" + message;
  }

5.消费者接收消息

    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
  public void process(String msg, Channel channel, Message message) {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      Date date = new Date();
      String time = sdf.format(date);
      System.out.println("【接收信息】" + msg + " 当前时间" + time);

6.调用生产端发送消息hello,控制台输出:

【发送消息】hello
【接收信息】hello 当前时间2022-05-12 10:21:14

说明消息已经被成功接收。

 

消息丢失分析

SpringBoot+RabbitMQ实现消息可靠传输详解

一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

  • 生产端丢失: 生产者无法传输到RabbitMQ
  • 存储端丢失:RabbitMQ存储自身挂了
  • 消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

RabbitMQ从生产端、储存端、消费端都对可靠性传输做很好的支持。

 

生产阶段

生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

配置application.yml

spring:
rabbitmq:
  # 消息确认机制 生产者 -> 交换机
  publisher-confirms: true
  # 消息返回机制  交换机 -> 队列
  publisher-returns: true

配置

@Configuration
@Slf4j
public class RabbitConfig {

  @Autowired
  private ConnectionFactory connectionFactory;

  @Bean
  public RabbitTemplate rabbitTemplate() {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
          @Override
          public void confirm(CorrelationData correlationData, boolean ack, String cause) {
              log.info("【correlationData】:" + correlationData);
              log.info("【ack】" + ack);
              log.info("【cause】" + cause);
              if (ack) {
                  log.info("【发送成功】");
              } else {
                  log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);
              }
          }
      });
      rabbitTemplate.setMandatory(true);
      rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
          @Override
          public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
              log.warn("【消息发送失败】");
              log.info("【message】" + message);
              log.info("【replyCode】" + replyCode);
          }
      });

      return rabbitTemplate;
  }
}

消息从生产者交换机, 有confirmCallback确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据ack判断消息是否发送成功。

消息从交换机队列,有returnCallback退回模式。

发送消息product message控制台输出如下:

【发送消息】product message
【接收信息】product message 当前时间2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【发送成功】

生产端模拟消息丢失

这里有两个方案:

  • 发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
  • 发送不存在的交换机:
// myExchange 修改成 myExchangexxxxx
rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

结果:

【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【发送失败】

当发送失败可以对消息进行重试

交换机正确,发送不存在的队列:

交换机接收到消息,返回成功通知,控制台输出:

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【发送成功】

交换机没有找到队列,返回失败信息:

【消息发送失败】
【message】product message
【replyCode】312

 

RabbitMQ

开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里

修改队列的持久化,修改成非持久化:

    @Bean
  public Queue myQueue() {
      Queue queue = new Queue("myQueue",false);
      return queue;
  }

发送消息之后,消息存放在队列中,然后重启RabbitMQ,消息不存在了。
设置队列持久化:

    @Bean
  public Queue myQueue() {
      Queue queue = new Queue("myQueue",true);
      return queue;
  }

重启之后,队列的消息还存在。

 

消费端

消费端默认开始ack自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:

修改application.yml文件

spring:
rabbitmq:
  # 手动消息确认
  listener:
    simple:
      acknowledge-mode: manual

消费接收消息之后需要手动确认:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    @RabbitListener(queuesToDeclare = @Queue("myQueue"))
  public void process(String msg, Channel channel, Message message) {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      Date date = new Date();
      String time = sdf.format(date);
      System.out.println("【接收信息】" + msg + " 当前时间" + time);
      System.out.println(message.getMessageProperties().getDeliveryTag());
      try {
          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      } catch (IOException e) {
          e.printStackTrace();
      }

  }

如果不添加:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

发送两条消息

消息被接收后,没有确认,重新放到队列中:

SpringBoot+RabbitMQ实现消息可靠传输详解

重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。

加上channel.basicAck之后,再重启项目

SpringBoot+RabbitMQ实现消息可靠传输详解

队列消息就被删除了

basicAck方法最后一个参数multiple表示是删除之前的队列。

multiple设置成true,把后面的队列都清理掉了

SpringBoot+RabbitMQ实现消息可靠传输详解

到此这篇关于SpringBoot+RabbitMQ实现消息可靠传输详解的文章就介绍到这了,更多相关SpringBoot RabbitMQ消息可靠传输内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/jeremylai7/p/16307934.html

延伸 · 阅读

精彩推荐