服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C# - C#通过rabbitmq实现定时任务(延时队列)

C#通过rabbitmq实现定时任务(延时队列)

2022-11-21 14:22初夏的阳光丶 C#

工作中经常会有定时任务的需求,常见的做法可以使用Timer、Quartz、Hangfire等组件,本文使用C#通过rabbitmq实现定时任务(延时队列),感兴趣的可以了解一下

本文主要讲解如何通过RabbitMQ实现定时任务(延时队列)

环境准备

需要在MQ中进行安装插件 地址链接
插件介绍地址:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

使用场景

作为一个新的预支付订单被初始化放置,如果该订单在指定时间内未进行支付,则将被认为超时订单进行关闭处理;电商系统中应用较多,用户购买商品产生订单,但未进行支付,订单产生30分钟内未支付将关闭订单(且满足该场景数量庞大),不可能采用人工干预。

代码介绍

生产者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
var factory = new ConnectionFactory()
         {
             Uri = new Uri("MQ地址")
         };
        
         using var connection = factory.CreateConnection();
         using var channel = connection.CreateModel();
 
 
         var exchangeName = "delay-exchange";
         var routingkey = "delay.delay";
         var queueName = "delay_queueName";
         //设置Exchange队列类型
         var argMaps = new Dictionary<string, object>()
         {
             {"x-delayed-type", "topic"}
         };
         //设置当前消息为延时队列
         channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
         channel.QueueDeclare(queueName, true, false, false, argMaps);
         channel.QueueBind(queueName, exchangeName, routingkey);
         for (int i = 0; i < 3; i++)
         {
             var time = 1000 * 5;
             var message = $@"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
             var body = Encoding.UTF8.GetBytes(message);
             var props = channel.CreateBasicProperties();
             //设置消息的过期时间
             props.Headers = new Dictionary<string, object>()
             {
                 "x-delay", 5000 }
             };
             channel.BasicPublish(exchange: exchangeName,
                 routingKey: routingkey,
                 basicProperties: props,
                 body: body);
             Console.WriteLine(message);
 
 
         }
         Console.ReadLine();

消费者(自动绑定队列写法)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var factory = new ConnectionFactory()
    {
        Uri = new Uri(MQ地址)
    };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();
    var queueName = "delay_queueName";
    channel.QueueDeclare(queueName, true, false, false, null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body);
        var routingKey = ea.RoutingKey;
        Console.WriteLine($@"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
    };
    channel.BasicConsume(queue: queueName,
        autoAck: true,
        consumer: consumer);
    Console.ReadLine();

消费者(手动绑定队列写法)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var factory = new ConnectionFactory()
          {
              Uri = new Uri(MQ地址)
          };
           using var connection = factory.CreateConnection();
          using var channel = connection.CreateModel();
          var exchangeName = "delay-exchange";
          var routingkey = "delay.delay";
          var queueName = "delay_queueName";
          var autoDelete = true;
 
          var argMaps = new Dictionary<string, object>()
          {
              {"x-delayed-type", "topic"}
          };
          channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
          channel.QueueDeclare(queueName, true, false, false, argMaps);
          channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
          //channel.QueueDeclare(queueName, true, false, false, null);
          var consumer = new EventingBasicConsumer(channel);
          consumer.Received += (model, ea) =>
          {
              var body = ea.Body;
              var message = Encoding.UTF8.GetString(body);
              var routingKey = ea.RoutingKey;
              Console.WriteLine($@"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
          };
          channel.BasicConsume(queue: queueName,
              autoAck: true,
              consumer: consumer);
          Console.ReadLine();

最终实现效果(两个消费者)

C#通过rabbitmq实现定时任务(延时队列)

在上述实现中,其实主要靠以下参数来帮我们实现当前功能

声明Exchange中的 type: "x-delayed-message" 这个表明当前队列为延时消息队列
声明Exchange中arguments中的 {"x-delayed-type", "topic"} 当前表明当前队列为Topic模式
最后 我们在CreateBasicProperties的Header中设置 { "x-delay", 5000 }来达到消息延时的功能(单位为ms)

建议

如果使用当前模式来做定时任务,在要求消息不丢失的前提下,需要运维同学提供稳定的MQ环境

到此这篇关于C#通过rabbitmq实现定时任务(延时队列)的文章就介绍到这了,更多相关C# rabbitmq定时任务内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/ancold/p/14705601.html

延伸 · 阅读

精彩推荐
  • C#C# 判断时间段是否相交的实现方法

    C# 判断时间段是否相交的实现方法

    这篇文章主要介绍了C# 判断时间段是否相交的实现方法的相关资料,希望通过本文能帮助到大家,让大家实现这样的功能,需要的朋友可以参考下...

    _iorilan9352022-01-25
  • C#Unity3D UGUI实现翻书特效

    Unity3D UGUI实现翻书特效

    这篇文章主要为大家详细介绍了Unity3D UGUI实现翻书特效,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    码农小飞飞3692022-08-10
  • C#c#定期删除文件的实操方法

    c#定期删除文件的实操方法

    在本篇文章里小编给大家分享了关于c#定期删除文件的方法和步骤,有需要的朋友们可以学习下。...

    C#教程网12062022-07-07
  • C#C#如何用ThoughtWorks生成二维码

    C#如何用ThoughtWorks生成二维码

    这篇文章主要介绍了C#如何用ThoughtWorks生成二维码,文中讲解非常细致,帮助大家更好的理解和学习,感兴趣的朋友可以了解下...

    彭泽09024542022-09-29
  • C#C#巧用DateTime预设可选的日期范围(如本年度、本季度、本月等)

    C#巧用DateTime预设可选的日期范围(如本年度、本季度、本月等)

    这篇文章主要介绍了C#巧用DateTime预设可选的日期范围,如本年度、本季度、本月等,感兴趣的小伙伴们可以参考一下...

    C#教程网4742021-11-19
  • C#asp.net(C#)清除全部Session与单个Session的方法

    asp.net(C#)清除全部Session与单个Session的方法

    下面小编就为大家带来一篇asp.net(C#)清除全部Session与单个Session的方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看...

    C#教程网10162021-12-14
  • C#PC蓝牙通信C#代码实现

    PC蓝牙通信C#代码实现

    这篇文章主要为大家详细介绍了PC蓝牙通信C#代码实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    名字好难4752021-12-07
  • C#c# 根据NPOI 读取一个excel 文件的多个Sheet

    c# 根据NPOI 读取一个excel 文件的多个Sheet

    这篇文章主要介绍了c# 根据NPOI 读取一个excel 文件的多个Sheet,帮助大家更好的利用c#处理excel表格,感兴趣的朋友可以了解下...

    风格不同3882022-10-21