服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - SpringBoot整合RabbitMQ处理死信队列和延迟队列

SpringBoot整合RabbitMQ处理死信队列和延迟队列

2022-12-29 16:31IT利刃出鞘 Java教程

这篇文章将通过示例为大家详细介绍SpringBoot整合RabbitMQ时如何处理死信队列和延迟队列,文中的示例代码讲解详细,需要的可以参考一下

简介

说明

本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列。

RabbitMQ消息简介

RabbitMQ的消息默认不会超时。 

什么是死信队列?什么是延迟队列?

死信队列:

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。

以下几种情况会导致消息变成死信:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
  • 消息过期;
  • 队列达到最大长度。

延迟队列:

延迟队列用来存放延迟消息。延迟消息:指当消息被发送以后,不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

相关网址

详解RabbitMQ中死信队列和延迟队列的使用详解

实例代码

路由配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.example.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitRouterConfig {
    public static final String EXCHANGE_TOPIC_WELCOME   = "Exchange@topic.welcome";
    public static final String EXCHANGE_FANOUT_UNROUTE  = "Exchange@fanout.unroute";
    public static final String EXCHANGE_TOPIC_DELAY     = "Exchange@topic.delay";
 
    public static final String ROUTINGKEY_HELLOS        = "hello.#";
    public static final String ROUTINGKEY_DELAY         = "delay.#";
 
    public static final String QUEUE_HELLO              = "Queue@hello";
    public static final String QUEUE_HI                 = "Queue@hi";
    public static final String QUEUE_UNROUTE            = "Queue@unroute";
    public static final String QUEUE_DELAY              = "Queue@delay";
 
    public static final Integer TTL_QUEUE_MESSAGE       = 5000;
 
    @Autowired
    AmqpAdmin amqpAdmin;
 
    @Bean
    Object initBindingTest() {
        amqpAdmin.declareExchange(ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT_UNROUTE).durable(true).autoDelete().build());
        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_DELAY).durable(true).autoDelete().build());
        amqpAdmin.declareExchange(ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_WELCOME)
                .durable(true)
                .autoDelete()
                .withArgument("alternate-exchange", EXCHANGE_FANOUT_UNROUTE)
 
                .build());
 
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HI).build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_HELLO)
                .withArgument("x-dead-letter-exchange", EXCHANGE_TOPIC_DELAY)
                .withArgument("x-dead-letter-routing-key", ROUTINGKEY_DELAY)
                .withArgument("x-message-ttl", TTL_QUEUE_MESSAGE)
                .build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_UNROUTE).build());
        amqpAdmin.declareQueue(QueueBuilder.durable(QUEUE_DELAY).build());
 
        amqpAdmin.declareBinding(new Binding(QUEUE_HELLO, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_WELCOME, ROUTINGKEY_HELLOS, null));
        amqpAdmin.declareBinding(new Binding(QUEUE_UNROUTE, Binding.DestinationType.QUEUE,
                EXCHANGE_FANOUT_UNROUTE, "", null));
        amqpAdmin.declareBinding(new Binding(QUEUE_DELAY, Binding.DestinationType.QUEUE,
                EXCHANGE_TOPIC_DELAY, ROUTINGKEY_DELAY, null));
 
        return new Object();
    }
}

控制器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.controller;
 
import com.example.config.RabbitRouterConfig;
import com.example.mq.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.time.LocalDateTime;
 
@RestController
public class HelloController {
    @Autowired
    private Sender sender;
 
    @PostMapping("/hi")
    public void hi() {
        sender.send(RabbitRouterConfig.QUEUE_HI, "hi1 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/hello1")
    public void hello1() {
        sender.send("hello.a", "hello1 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/hello2")
    public void hello2() {
        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "hello.b", "hello2 message:" + LocalDateTime.now());
    }
 
    @PostMapping("/ae")
    public void aeTest() {
        sender.send(RabbitRouterConfig.EXCHANGE_TOPIC_WELCOME, "nonono", "ae message:" + LocalDateTime.now());
    }
}

发送器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.example.mq;
 
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send(String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(routingKey, message);
    }
 
    public void send(String exchange, String routingKey, String message) {
        this.rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

接收器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.example.mq;
 
import com.example.config.RabbitRouterConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
public class Receiver {
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_HI)
    public void hi(String payload) {
        System.out.println ("Receiver(hi) : "  + payload);
    }
 
    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_HELLO)
    // public void hello(String hello) throws InterruptedException {
    //     System.out.println ("Receiver(hello) : "  + hello);
    //     Thread.sleep(5 * 1000);
    //     System.out.println("(hello):sleep over");
    // }
    //
    // @RabbitListener(queues = RabbitRouterConfig.QUEUE_UNROUTE)
    // public void unroute(String hello) throws InterruptedException {
    //     System.out.println ("Receiver(unroute) : "  + hello);
    //     Thread.sleep(5 * 1000);
    //     System.out.println("(unroute):sleep over");
    // }
 
    @RabbitListener(queues = RabbitRouterConfig.QUEUE_DELAY)
    public void delay(String hello) throws InterruptedException {
        System.out.println ("Receiver(delay) : "  + hello);
        Thread.sleep(5 * 1000);
        System.out.println("(delay):sleep over");
    }
}

application.yml

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
#  port: 9100
  port: 9101
spring:
  application:
#    name: demo-rabbitmq-sender
    name: demo-rabbitmq-receiver
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
#    virtualHost: /
    publisher-confirms: true
    publisher-returns: true
#    listener:
#      simple:
#        acknowledge-mode: manual
#      direct:
#        acknowledge-mode: manual

实例测试

分别启动发送者和接收者。

访问:http://localhost:9100/hello2

五秒钟后输出:

Receiver(delay) : hello2 message:2020-11-27T09:30:51.548
(delay):sleep over

以上就是SpringBoot整合RabbitMQ处理死信队列和延迟队列的详细内容,更多关于SpringBoot RabbitMQ死信队列 延迟队列的资料请关注服务器之家其它相关文章!

原文链接:https://blog.csdn.net/feiying0canglang/article/details/124955698

延伸 · 阅读

精彩推荐
  • Java教程[Spring MVC]-详解SpringMVC的各种参数绑定方式

    [Spring MVC]-详解SpringMVC的各种参数绑定方式

    本篇文章主要介绍了SpringMVC的各种参数绑定方式 ,具有一定的参考价值,有需要的可以了解一下。...

    横渡5122020-07-12
  • Java教程Java实现文件变化监控

    Java实现文件变化监控

    这篇文章主要介绍了Java实现文件变化监控的实现代码,代码附有注释,分步骤介绍的非常详细,非常不错,具有参考借鉴价值,,需要的朋友可以参考下 ...

    大饼酥1822020-06-05
  • Java教程Spring Cloud Gateway入门解读

    Spring Cloud Gateway入门解读

    本篇文章主要介绍了Spring Cloud Gateway入门解读,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    Aoho''s Blog12262021-04-24
  • Java教程java中Hashmap的get方法使用

    java中Hashmap的get方法使用

    这篇文章主要介绍了java中Hashmap的get方法使用,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...

    會挖番薯的pank6502021-12-29
  • Java教程JDK源码之PriorityQueue解析

    JDK源码之PriorityQueue解析

    这篇文章主要为大家详细介绍了JDK源码之PriorityQueue,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    _fred2422020-09-18
  • Java教程springboot框架的全局异常处理方案详解

    springboot框架的全局异常处理方案详解

    这篇文章主要介绍了springboot框架的全局异常处理方案,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...

    一起随缘7592021-08-23
  • Java教程使用Java编写控制JDBC连接、执行及关闭的工具类

    使用Java编写控制JDBC连接、执行及关闭的工具类

    这篇文章主要介绍了如何使用Java来编写控制JDBC连接、执行及关闭的程序,包括一个针对各种数据库通用的释放资源的工具类的写法,需要的朋友可以参考下...

    leizhimin2402020-04-09
  • Java教程Java框架之Maven SSM集合

    Java框架之Maven SSM集合

    本篇文章主要介绍了基于maven的ssm框架整合的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    箱水母10692021-12-30