服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Java RabbitMQ的持久化和发布确认详解

Java RabbitMQ的持久化和发布确认详解

2022-09-02 11:31江海i Java教程

这篇文章主要为大家详细介绍了RabbitMQ的持久化和发布确认,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能够给你带来帮助

1. 持久化

当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。

1.1 实现持久化

1.队列持久化:在创建队列时将channel.queueDeclare();第二个参数改为true。

2.消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN表示持久化消息。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * @Description 持久化MQ
 * @date 2022/3/7 9:14
 */
public class Producer3 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 持久化队列
        channel.queueDeclare(LONG_QUEUE,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            // 持久化消息
            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:'" + msg + "'成功");
        }
    }
}

但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

1.2 不公平分发

轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

在消费者处修改channel.basicQos(1);表示开启不公平分发

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * @Description 不公平分发消费者
 * @date 2022/3/7 9:27
 */
public class Consumer2 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模拟并发沉睡三十秒
            try {
                Thread.sleep(30000);
                System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        // 设置不公平分发
        channel.basicQos(1);
        channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消费者取消消费");
                });
    }
}

1.3 测试不公平分发

测试目的:是否能实现能者多劳。

测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

先启动生产者创建队列,再分别启动两个消费者。

生产者按照顺序发四条消息:

Java RabbitMQ的持久化和发布确认详解

睡眠时间短的线程A接收到了三条消息

Java RabbitMQ的持久化和发布确认详解

而睡眠时间长的线程B只接收到的第二条消息:

Java RabbitMQ的持久化和发布确认详解

因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

实验成功!

1.4 预取值

消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

1.4.1 代码测试

测试方法:

1.新建两个不同的消费者分别给定预期值5个2。

2.给睡眠时间长的指定为5,时间短的指定为2。

3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

代码根据上述代码修改预期值即可。

2. 发布确认

发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();

2.1 单个确认发布

是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
 * @Description 确认发布——单个确认
 * @date 2022/3/7 14:49
 */
public class SoloProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_solo";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 单个发布确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("发送消息:" + i);
            }
        }
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
}

2.2 批量确认发布

一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * @Description 确认发布——批量确认
 * @date 2022/3/7 14:49
 */
public class BatchProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_batch";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 设置一个多少一批确认一次。
        int batchSize = MESSAGE_COUNT / 10;
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 批量发布确认
            if (i % batchSize == 0){
                if (channel.waitForConfirms()){
                    System.out.println("发送消息:" + i);
                }
            }
        }
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

显然效率要比单个确认发布的高很多。

2.3 异步确认发布

在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
 * @Description 确认发布——异步确认
 * @date 2022/3/7 14:49
 */
public class AsyncProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
            System.out.println("确认成功消息:" + deliveryTab);
        };
        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            System.out.println("未确认的消息:" + deliveryTab);
        };
        // 消息监听器
        /**
         * addConfirmListener:
         *                  1. 确认成功的消息;
         *                  2. 确认失败的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
        }
 
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

2.4 处理未确认的消息

最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。

处理方式:

1.记录要发送的全部消息;

2.在发布成功确认处删除;

3.打印未确认的消息。

使用一个哈希表存储消息,它的优点:

可以将需要和消息进行关联;轻松批量删除条目;支持高并发。

?
1
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
 * @Description 异步发布确认,处理未发布成功的消息
 * @date 2022/3/7 18:09
 */
public class AsyncProducerRemember {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async_remember";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 线程安全有序的一个hash表,适用与高并发
        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
            //2. 在发布成功确认处删除;
            // 批量删除
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                confirmMap.clear();
            }else {
                // 单独删除
                map.remove(deliveryTab);
            }
            System.out.println("确认成功消息:" + deliveryTab);
        };
        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            // 3. 打印未确认的消息。
            System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
        };
        // 消息监听器
        /**
         * addConfirmListener:
         *                  1. 确认成功的消息;
         *                  2. 确认失败的消息。
         */
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 1. 记录要发送的全部消息;
            map.put(channel.getNextPublishSeqNo(),msg);
        }
 
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

总结

显然来说,异步处理除了在编码处有些麻烦,在处理时间效率和可用性上都是比单处理和批处理好很多。

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注服务器之家的更多内容!    

原文链接:https://blog.csdn.net/weixin_44289860/article/details/123314490

延伸 · 阅读

精彩推荐
  • Java教程mybatis中resultMap 标签的使用教程

    mybatis中resultMap 标签的使用教程

    resultMap 标签用来描述如何从数据库结果集中来加载对象,这篇文章重点给大家介绍mybatis中resultMap 标签的使用,感兴趣的朋友一起看看吧...

    Mr_晋11492021-05-18
  • Java教程Java中stream处理中map与flatMap的比较和使用案例

    Java中stream处理中map与flatMap的比较和使用案例

    这篇文章主要介绍了Java中stream处理中map与flatMap的比较和使用案例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,...

    Listen-Y5652021-08-26
  • Java教程Spring Boot2.0中SpringWebContext找不到无法使用的解决方法

    Spring Boot2.0中SpringWebContext找不到无法使用的解决方法

    这篇文章主要给大家介绍了关于Spring Boot2.0中SpringWebContext找不到无法使用的解决方法,文中通过示例代码介绍的非常详细,需要的朋友可以参考借鉴,下面...

    iquanzhan5432021-06-18
  • Java教程Java JSch远程执行Shell命令的方法

    Java JSch远程执行Shell命令的方法

    本文主要介绍了Java JSch远程执行Shell命令,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    qhh02055942022-07-24
  • Java教程java开发实现五子棋游戏

    java开发实现五子棋游戏

    这篇文章主要为大家详细介绍了java开发实现五子棋游戏,具有双人对战和人机对战功能,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小...

    冷丁_8552022-01-07
  • Java教程java驼峰转换的方法

    java驼峰转换的方法

    这篇文章主要为大家详细介绍了java驼峰转换的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    小爷胡汉三10772021-05-14
  • Java教程SSH 框架简介

    SSH 框架简介

    SSH是 struts+spring+hibernate的一个集成框架,是目前较流行的一种web应用程序开源框架。本文给大家详细看一下组成SSH的这三个框架...

    动力节点6642021-01-01
  • Java教程Java函数式编程(十二):监控文件修改

    Java函数式编程(十二):监控文件修改

    这篇文章主要介绍了Java函数式编程(十二):监控文件修改,本文是系列文章的第12篇,其它文章请参阅本文底部的相关文章,需要的朋友可以参考下 ...

    有孚5412019-12-01