博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[.NET] RabbitMQ 的行为艺术
阅读量:7060 次
发布时间:2019-06-28

本文共 9193 字,大约阅读时间需要 30 分钟。

RabbitMQ 的行为艺术

 

  好像,今天已经是 2 月 28 号了。

  听说,29、30、31 号放假。

  据说,有图,有真相。

 

目录

  • 简介

  • 环境搭建

  • 示例一:简单的 Hello World

  • 示例二:发布/订阅模式

  • 尝试发现 - 新物种 

 

简介

  RabbitMQ:一个消息系统,基于 AMQP 系统协议,由 Erlang 语言开发。

  优点:健壮、使用简单、开源和支持各种流行的语言(如 Python、java、.NET)等。

 

  MQ(Message Queue):消息队列的简称,是一种应用程序之间的通信机制。

  作用:将部分无需立即回调获取结果,并且耗时的操作,使用异步处理的方式提高服务器的吞吐量及性能。如:日志记录。

 

图:简单的通信方式,及加入 MQ 后的变化

 
     A 端:生产者将消息写(插)入队列;
     MQ(队列) :中间件,消息的载体;
     B 端:消费者从队列读(取)出消息。
 
   MQ 特点:消费者 - 生产者模型的一种表现形式。
 

环境搭建

  1.官网下载安装包: ;

  2.安装时会提示你下载 Erlang 语言环境;

  3.启动安装完的服务:RabbitMQ;

 

  4.在 cmd 中指向 sbin 目录,并输入以下命令,才能打开 WEB 管理界面:

rabbitmq-plugins enable rabbitmq_management

 

  5.默认 url:http://localhost:15672/#/

 

示例一:简单的 Hello World

 

  P(Producer):生产者,意味着发送;

  Queue:队列,本质上是一个无限的缓冲区,可以储存尽可能多的信息;

  C(Consumer):消费者,等待并接收消息。

  【备注】生产者和消费者不需要驻留在同一台服务器上。

 

  Producer.cs

1     public class Producer   2     { 3         public static void Send() 4         { 5             var factory = new ConnectionFactory { HostName = "localhost" }; 6  7             //创建连接对象,基于 Socket 8             using (var connection = factory.CreateConnection()) 9             {10                 //创建新的渠道、会话11                 using (var channel = connection.CreateModel())12                 {13                     //声明队列14                     channel.QueueDeclare(queue: "hello",    //队列名15                         durable: false,     //持久性16                         exclusive: false,   //排他性17                         autoDelete: false,  //自动删除18                         arguments: null);19 20                     const string message = "Hello World!";21                     var body = Encoding.UTF8.GetBytes(message);22 23                     channel.BasicPublish(exchange: "",  //交换机名24                         routingKey: "hello",    //路由键25                         basicProperties: null,26                         body: body);27                 }28             }29         }30     }

  【备注】队列名如果已存在,将不会重复创建。假设队列已存在,修改 channel.QueueDeclare() 方法内的参数后启动会出现异常。

  【备注】消息内容是一个字节数组。

 
  Consumer.cs
1     class Consumer 2     { 3         public static void Receive() 4         { 5             var factory = new ConnectionFactory() { HostName = "localhost" }; 6  7             using (var connection = factory.CreateConnection()) 8             { 9                 using (var channel = connection.CreateModel())10                 {11                     channel.QueueDeclare(queue: "hello",12                                          durable: false,13                                          exclusive: false,14                                          autoDelete: false,15                                          arguments: null);16 17                     //创建基于该队列的消费者,绑定事件18                     var consumer = new EventingBasicConsumer(channel);19                     consumer.Received += (model, ea) =>20                     {21                         var body = ea.Body;     //消息主体22                         var message = Encoding.UTF8.GetString(body);23                         Console.WriteLine(" [x] Received {0}", message);24                     };25 26                     //启动消费者27                     channel.BasicConsume(queue: "hello",    //队列名28                                          noAck: true,   //false:手动应答;true:自动应答29                                          consumer: consumer);30 31                     Console.Read();32                 }33             }34         }35     }

  【疑问】在消费者的类里面为什么会再次声明队列(channel.QueueDeclare())呢?-- 因为接收方可能会在发送方启动前启动,这是出于保险起见。

 

示例二:发布/订阅模式

 
 

  1.Exchange 交换机和 Exchange Type 交换类型  

  RabbitMQ 消息传递模型的核心思想是,生产者不会直接将消息发给队列。

  这里我们将引入新的名词 Exchange(交换机)。交换机传递消息的类型也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。

图:Direct

 

图:Fanout 

 

图:Topic

 --上述 3 张图来源:http://m.blog.csdn.net/article/details?id=52262850

  

  这里,创建一个名为 “logs” 的交换机,它的类型为广播类型(fanout:可以将收到的所有消息,广播给所有已知的队列)。

channel.ExchangeDeclare(exchange: "logs",   //交换机名                        type: "fanout");    //交换类型

  

  2.临时队列

  作为消费者,我们有时候只需要一些新的(或者空的)队列,此时,更好的方式就是让它自动生成一个随机名字的队列;其次,当队列连接中断时会选择自动删除对应的消费者。

  创建一个非持久,有排他性和自动删除特性的队列(无参时)。

var queueName = channel.QueueDeclare().QueueName;

 

  3.Binding 绑定

  【疑问】有了 Exchange 和 channel,这时,还需要什么东西呢?-- 我们要创建 Exchange 和 channel 关系的桥梁,这个桥梁称之为 Binding(绑定)。

channel.QueueBind(queue: queueName,                  exchange: "logs",                  routingKey: "");

 

 
1     class Producer 2     { 3         public static void Send() 4         { 5             var factory = new ConnectionFactory() 6             { 7                 HostName = "localhost", 8                 Port = 5672, 9                 UserName = "guest",10                 Password = "guest"11             };12 13             using (var connection = factory.CreateConnection())14             {15                 using (var channel = connection.CreateModel())16                 {17                     channel.ExchangeDeclare(exchange: "logs",   //交换机名18                         type: "fanout");    //交换类型19 20                     // Guid21                     var message = Guid.NewGuid().ToString();22                     var body = Encoding.UTF8.GetBytes(message);23                     channel.BasicPublish(exchange: "logs",24                                          routingKey: "",25                                          basicProperties: null,26                                          body: body);27 28                     Console.WriteLine(" [x] Sent {0}", message);29                 }30 31                 Console.WriteLine(" Press [enter] to exit.");32                 Console.ReadLine();33             }34         }35     }
Producer.cs //生产者
1     class Reciver 2     { 3         public static void Recive() 4         { 5             var factory = new ConnectionFactory() 6             { 7                 HostName = "localhost", 8                 Port = 5672, 9                 UserName = "guest",10                 Password = "guest"11             };12 13             using (var connection = factory.CreateConnection())14             using (var channel = connection.CreateModel())15             {16                 channel.ExchangeDeclare(exchange: "wen_logs",   //交换机名17                     type: "fanout");    //交换类型18 19                 //创建队列20                 var queueName = channel.QueueDeclare().QueueName;21                 channel.QueueBind(queue: queueName,22                                   exchange: "wen_logs",23                                   routingKey: "");24 25                 Console.WriteLine(" [*] Waiting for logs.");26 27                 var consumer = new EventingBasicConsumer(channel);28                 consumer.Received += (model, ea) =>29                 {30                     var body = ea.Body;31                     var message = Encoding.UTF8.GetString(body);32                     Console.WriteLine(" [x] {0}", message);33                 };34                 channel.BasicConsume(queue: queueName,35                                      noAck: true,36                                      consumer: consumer);37 38                 Console.WriteLine(" Press [enter] to exit.");39                 Console.ReadLine();40             }41         }42     }
Reciver.cs //接收者

 

尝试发现 - 新物种 

  这都不是事!EasyNetQ,看名字就知道,搞定 MQ,So easy!

 

  连接 RabbitMQ 代理:

var bus = RabbitHutch.CreateBus("host=localhost");

  发布:

bus.Publish(message);

  订阅:

bus.Subscribe
("my_subscription_id", msg => Console.WriteLine(msg.Text));

 

  下面我们通过 Demo 来感受一下 Easy 的程度吧,创建项目(效果图如下,附  下载):

  Wen.EasyNetQDemo.Model:类库

  Wen.EasyNetQDemo.Publisher,Wen.EasyNetQDemo.Subscriber:控制台应用程序,都使用 Nuget 直接安装 EasyNetQ 包,都引用类库 Model。

 
  Demo.cs
public class Demo    {        public string Message { get; set; }    }

 

  Publisher

1 using System; 2 using EasyNetQ; 3 using Wen.EasyNetQDemo.Model; 4  5 namespace Wen.EasyNetQDemo.Publisher 6 { 7     internal class Program 8     { 9         private static void Main(string[] args)10         {11             using (var bus = RabbitHutch.CreateBus("host=localhost"))12             {13                 string input;14                 Console.WriteLine("请输入信息。 如果是“esc” 将退出当前窗口。");15 16                 while ((input = Console.ReadLine()) != "esc")17                 {18                     bus.Publish(new Demo19                     {20                         Message = input21                     });22                 }23 24             }25         }26     }27 }

  【备注】RabbitHutch.CreateBus() 方法可以创建一个简单的发布/订阅和包含请求/响应 API 的消息总线。

 

  Subscriber

1 using System; 2 using EasyNetQ; 3 using Wen.EasyNetQDemo.Model; 4  5 namespace Wen.EasyNetQDemo.Subscriber 6 { 7     internal class Program 8     { 9         private static void Main(string[] args)10         {11             using (var bus = RabbitHutch.CreateBus("host=localhost"))12             {13                 bus.Subscribe
("test", HandleDemo);14 15 Console.WriteLine("监听信息中...输入“return”将退出当前窗口!");16 Console.ReadLine();17 }18 }19 20 private static void HandleDemo(Demo demo)21 {22 Console.ForegroundColor = ConsoleColor.Green;23 Console.WriteLine($"Got message: {demo.Message}");24 Console.ResetColor();25 }26 }27 }

 图:效果图

 

 

「世事洞明皆学问 人情练达即文章」

【博主】反骨仔
【原文】  

 

你可能感兴趣的文章
kettle API - Scala 调用示例
查看>>
HTML+CSS+JS面试题
查看>>
MongoDb 备份实践一下
查看>>
win7 蓝屏 0x000000c5
查看>>
HashMap
查看>>
IOS友盟统计Bug追踪
查看>>
利用spring session解决共享Session问题
查看>>
Haskell抽象概念Functor剖析
查看>>
JavaFX学习之道:JavaFX之TableView
查看>>
C++小记
查看>>
三大框架的整合struts2.2+hibernate3.2+spring2.5
查看>>
FR表单式主子报表的制作问题解决方案
查看>>
java使用post方式发送JSON数据乱码问题
查看>>
pring配置SessionFactory
查看>>
清除浮动
查看>>
WCF学习之旅----正式篇之基础框架
查看>>
最近研究如何加速UI界面开发,有点感觉了
查看>>
Linux+Xshell+Xftp+tomcat+jdk部署javaWeb项目
查看>>
c++ 设计模式 之 装饰模式
查看>>
jquery实现广告伸缩效果
查看>>