服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C# - C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

2022-10-11 14:49做自己518 C#

这篇文章主要介绍了C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1:RabbitMQ是个啥?(专业术语参考自网络)

 RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

  RabbitMQ服务器是用Erlang语言编写的,Erlang是专门为高并发而生的语言,而集群和故障转移是构建在开发电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库

2:使用RabbitMQ有啥好处?

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。

对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。

RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式,

3:RabbitMq的安装以及环境搭建等:

网络上有很多关于怎么搭建配置RabbitMq服务环境的详细文章,也比较简单,这里不再说明,本人是Docker上面的pull RabbitMq 镜像来安装的!

3.1:运行容器的命令如下:

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用场景主要有哪些,啥时候用或者不用?

4.1什么时候使用MQ?

1)数据驱动的任务依赖

2)上游不关心多下游执行结果

3)异步返回执行时间长

4.2什么时候不使用MQ?

需要实时关注执行结果 (eg:同步调用)

5:具体C#怎么使用RabbitMq?下面直接上code和测试截图了(Demo环境是.NetCore3.1控制台+Docker上的RabbitMQ容器来进行的)

6:sample模式,就是简单地队列模式,一进一出的效果差不多,测试截图:

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

Code:

//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;

class Program
{
  static void Main(string[] args)
  {
      //就是简单的队列,生产者
      Console.WriteLine("====RabbitMqPublishDemo====");
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
      }
      Console.WriteLine("生成完毕!");
      Console.ReadLine();
  }
}
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

using (IConnection conn = connectionFactory.CreateConnection())
{
  using (IModel channel = conn.CreateModel())
  {
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    var msgBody = Encoding.UTF8.GetBytes(msg);
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
  }
}
}


//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;

class Program
{
  static void Main(string[] args)
  {
    Console.WriteLine("====RabbitMqConsumerDemo====");
    ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
    {
      Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
    });
    Console.ReadLine();
  }
}
}

 #region 简单生产者后端逻辑
  /// <summary>
  /// 简单消费者
  /// </summary>
  /// <param name="queueName">队列名称</param>
  /// <param name="isBasicNack">失败后是否自动放到队列</param>
  /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
  public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
  {
    Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
    IConnection conn = connectionFactory.CreateConnection();
    IModel channel = conn.CreateModel();
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (sender, ea) =>
    {
      byte[] bymsg = ea.Body.ToArray();
      string msg = Encoding.UTF8.GetString(bymsg);
      if (handleMsgStr != null)
      {
        handleMsgStr.Invoke(msg);
      }
      else
      {
        Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
      }
    };
    channel.BasicConsume(queueName, autoAck: true, consumer);
  }
  #endregion

7:Work模式

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

//简单生产端 ui调用者

using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;

class Program
{
  static void Main(string[] args)
  {
      //就是简单的队列,生产者
      Console.WriteLine("====RabbitMqPublishDemo====");
      for (int i = 0; i < 500; i++)
      {
        ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
      }
      Console.WriteLine("生成完毕!");
      Console.ReadLine();
  }
}
}

/// <summary>
/// 简单生产者 逻辑
/// </summary>
/// <param name="queueName"></param>
/// <param name="msg"></param>
public static void PublishSampleMsg(string queueName, string msg)
{

using (IConnection conn = connectionFactory.CreateConnection())
{
  using (IModel channel = conn.CreateModel())
  {
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    var msgBody = Encoding.UTF8.GetBytes(msg);
    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
  }
}
}


//简单消费端
using System;

namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;

class Program
{
  static void Main(string[] args)
  {
    Console.WriteLine("====RabbitMqConsumerDemo====");
    ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
    {
      Console.WriteLine($"订阅到消息:{DateTime.Now}:{handleMsgStr}");
    });
    Console.ReadLine();
  }
}
}

 #region 简单生产者后端逻辑
  /// <summary>
  /// 简单消费者
  /// </summary>
  /// <param name="queueName">队列名称</param>
  /// <param name="isBasicNack">失败后是否自动放到队列</param>
  /// <param name="handleMsgStr">有就自己对字符串的处理,如果要存储到数据库请自行扩展</param>
  public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false,
  {
    Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
    IConnection conn = connectionFactory.CreateConnection();
    IModel channel = conn.CreateModel();
    channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (sender, ea) =>
    {
      byte[] bymsg = ea.Body.ToArray();
      string msg = Encoding.UTF8.GetString(bymsg);
      if (handleMsgStr != null)
      {
        handleMsgStr.Invoke(msg);
      }
      else
      {
        Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
      }
    };
    channel.BasicConsume(queueName, autoAck: true, consumer);
  }
  #endregion

8:Fanout

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

Code:

//就如下的code, 多次生产,3个消费者都可以自动开始消费

//生产者
using System;
namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;
class Program
{
  static void Main(string[] args)
  {
    for (int i = 0; i < 500; i++)
    {
      ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :发布消息成功{i}");
    }
    Console.WriteLine("工作队列模式 生成完毕......!");
    Console.ReadLine();
  }
}
}

//生产者后端逻辑
public static void PublishWorkQueueModel(string queueName, string msg)
  {
    using (var connection = connectionFactory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
      channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
      var body = Encoding.UTF8.GetBytes(msg);
      var properties = channel.CreateBasicProperties();
      properties.Persistent = true;

      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
      Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
    }
  }

//work消费端
using System;

namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
  static void Main(string[] args)
  {
    Console.WriteLine("====Work模式开启了====");
    ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
    {
      Console.WriteLine($"work模式获取到消息{msg}");
    });
    Console.ReadLine();
  }
}
}

//work后端逻辑
  public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
  {
    var connection = connectionFactory.CreateConnection();
    var channel = connection.CreateModel();

    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

    var consumer = new EventingBasicConsumer(channel);
    Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");

    consumer.Received += (sender, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      if (handserMsg != null)
      {
        if (!string.IsNullOrEmpty(message))
        {
          handserMsg.Invoke(message);
        }
      }
      channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    };
    channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
  }

9:Direct

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)

Code:

//同一个消息会被多个订阅者消费

//发布者
using System;

namespace RabbitMqPublishDemo
{
using MyRabbitMqService;
using System.Runtime.CompilerServices;

class Program
{
  static void Main(string[] args)
  {

    #region 发布订阅模式,带上了exchange
    for (int i = 0; i < 500; i++)
    {
      ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"发布的消息是:{i}");
    }
    Console.WriteLine("发布ok!");
    #endregion
    Console.ReadLine();
  }
}
}
//发布者的后端逻辑 我在这里选择了扇形: ExchangeType.Fanout
public static void PublishExchangeModel(string exchangeName, string message)
  {
    using (var connection = connectionFactory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
      channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
      var body = Encoding.UTF8.GetBytes(message);
      channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
      Console.WriteLine($" Sent {message}");
    }
  }


//订阅者
using System;
namespace RabbitMqConsumerDemo
{
using MyRabbitMqService;
using System.Runtime.InteropServices;
class Program
{
  static void Main(string[] args)
  {

    #region 发布订阅模式 Exchange
    ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
    {
      Console.WriteLine($"订阅到消息:{msg}");
    });
    #endregion
    Console.ReadLine();
  }
}
}

//订阅者后端的逻辑
public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
  {
    var connection = connectionFactory.CreateConnection();
    var channel = connection.CreateModel();

    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉

    var queueName = channel.QueueDeclare().QueueName;
    channel.QueueBind(queue: queueName,
             exchange: exchangeName,
             routingKey: "");

    Console.WriteLine(" Waiting for msg....");

    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
      var body = ea.Body.ToArray();
      var message = Encoding.UTF8.GetString(body);
      if (handlerMsg != null)
      {
        if (!string.IsNullOrEmpty(message))
        {
          handlerMsg.Invoke(message);
        }
      }
      else
      {
        Console.WriteLine($"订阅到消息:{message}");
      }
    };
    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
  }

到此这篇关于C#使用RabbitMq队列(Sample,Work,Fanout,Direct等模式的简单使用)的文章就介绍到这了,更多相关C#使用RabbitMq队列内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/Fengge518/p/13826983.html

延伸 · 阅读

精彩推荐
  • C#C#给图片添加水印完整实例

    C#给图片添加水印完整实例

    这篇文章主要介绍了C#给图片添加水印的方法,以完整实例形式分析了C#实现文字及图像水印、缩略图、图片剪切等相关技巧,需要的朋友可以参考下...

    思齐_6932021-11-05
  • C#C#中值类型和引用类型解析

    C#中值类型和引用类型解析

    这篇文章主要为大家详细介绍了C#中值类型和引用类型的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Amedeo12032022-01-24
  • C#C# 实现窗口无边框,可拖动效果

    C# 实现窗口无边框,可拖动效果

    这篇文章主要介绍了C# 实现窗口无边框,可拖动效果,本文通过实例代码给大家介绍的非常详细,需要的朋友参考下吧...

    逐梦科技5822022-02-21
  • C#C#使用Socket实现局域网聊天

    C#使用Socket实现局域网聊天

    这篇文章主要为大家详细介绍了C#使用Socket实现局域网聊天的方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    田埂上的梦想3492022-02-23
  • C#unity实现流光效果

    unity实现流光效果

    这篇文章主要为大家详细介绍了unity实现流光效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    神码编程7212022-09-06
  • C#vscode设置Fira_Code字体及改变编辑器字体、背景颜色的代码详解

    vscode设置Fira_Code字体及改变编辑器字体、背景颜色的代码详解

    这篇文章主要介绍了vscode设置Fira_Code字体及改变编辑器字体、背景颜色,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借...

    CodeJasmine6752022-10-07
  • C#C#中利用Lotus notes公共邮箱发送邮件的方法

    C#中利用Lotus notes公共邮箱发送邮件的方法

    这篇文章主要给大家介绍了关于C#中利用Lotus notes公共邮箱发送邮件的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定参考学习价...

    Lionel Andrés Messi7742022-02-20
  • C#Unity键盘WASD实现物体移动

    Unity键盘WASD实现物体移动

    这篇文章主要为大家详细介绍了Unity键盘WASD实现物体移动,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    weixin_3852769711942022-03-10