RabbitMQ 的行为艺术
序
好像,今天已经是 2 月 28 号了。
听说,29、30、31 号放假。
据说,有图,有真相。
目录
-
简介
-
环境搭建
-
示例一:简单的 Hello World
-
示例二:发布/订阅模式
-
尝试发现 - 新物种
简介
RabbitMQ:一个消息系统,基于 AMQP 系统协议,由 Erlang 语言开发。
优点:健壮、使用简单、开源和支持各种流行的语言(如 Python、java、.NET)等。
MQ(Message Queue):消息队列的简称,是一种应用程序之间的通信机制。
作用:将部分无需立即回调获取结果,并且耗时的操作,使用异步处理的方式提高服务器的吞吐量及性能。如:日志记录。
图:简单的通信方式,及加入 MQ 后的变化
环境搭建
1.官网下载安装包: ;
2.安装时会提示你下载 Erlang 语言环境;
3.启动安装完的服务:RabbitMQ;
4.在 cmd 中指向 sbin 目录,并输入以下命令,才能打开 WEB 管理界面:
rabbitmq-plugins enable rabbitmq_management
5.默认 url:http://localhost:15672/#/
示例一:简单的 Hello World
P(Producer):生产者,意味着发送;
Queue:队列,本质上是一个无限的缓冲区,可以储存尽可能多的信息;
C(Consumer):消费者,等待并接收消息。
【备注】生产者和消费者不需要驻留在同一台服务器上。
Producer.cs
1 public class Producer 2 { 3 public static void Send() 4 { 5 var factory = new ConnectionFactory { HostName = "localhost" }; 6 7 //创建连接对象,基于 Socket 8 using (var connection = factory.CreateConnection()) 9 {10 //创建新的渠道、会话11 using (var channel = connection.CreateModel())12 {13 //声明队列14 channel.QueueDeclare(queue: "hello", //队列名15 durable: false, //持久性16 exclusive: false, //排他性17 autoDelete: false, //自动删除18 arguments: null);19 20 const string message = "Hello World!";21 var body = Encoding.UTF8.GetBytes(message);22 23 channel.BasicPublish(exchange: "", //交换机名24 routingKey: "hello", //路由键25 basicProperties: null,26 body: body);27 }28 }29 }30 }
【备注】队列名如果已存在,将不会重复创建。假设队列已存在,修改 channel.QueueDeclare() 方法内的参数后启动会出现异常。
【备注】消息内容是一个字节数组。
1 class Consumer 2 { 3 public static void Receive() 4 { 5 var factory = new ConnectionFactory() { HostName = "localhost" }; 6 7 using (var connection = factory.CreateConnection()) 8 { 9 using (var channel = connection.CreateModel())10 {11 channel.QueueDeclare(queue: "hello",12 durable: false,13 exclusive: false,14 autoDelete: false,15 arguments: null);16 17 //创建基于该队列的消费者,绑定事件18 var consumer = new EventingBasicConsumer(channel);19 consumer.Received += (model, ea) =>20 {21 var body = ea.Body; //消息主体22 var message = Encoding.UTF8.GetString(body);23 Console.WriteLine(" [x] Received {0}", message);24 };25 26 //启动消费者27 channel.BasicConsume(queue: "hello", //队列名28 noAck: true, //false:手动应答;true:自动应答29 consumer: consumer);30 31 Console.Read();32 }33 }34 }35 }
【疑问】在消费者的类里面为什么会再次声明队列(channel.QueueDeclare())呢?-- 因为接收方可能会在发送方启动前启动,这是出于保险起见。
示例二:发布/订阅模式
![](https://images2015.cnblogs.com/blog/711762/201702/711762-20170227134022095-1694636522.png)
1.Exchange 交换机和 Exchange Type 交换类型
RabbitMQ 消息传递模型的核心思想是,生产者不会直接将消息发给队列。
这里我们将引入新的名词 Exchange(交换机)。交换机传递消息的类型也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。
图:Direct
图:Fanout
图:Topic
--上述 3 张图来源:http://m.blog.csdn.net/article/details?id=52262850
这里,创建一个名为 “logs” 的交换机,它的类型为广播类型(fanout:可以将收到的所有消息,广播给所有已知的队列)。
channel.ExchangeDeclare(exchange: "logs", //交换机名 type: "fanout"); //交换类型
2.临时队列
作为消费者,我们有时候只需要一些新的(或者空的)队列,此时,更好的方式就是让它自动生成一个随机名字的队列;其次,当队列连接中断时会选择自动删除对应的消费者。
创建一个非持久,有排他性和自动删除特性的队列(无参时)。
var queueName = channel.QueueDeclare().QueueName;
3.Binding 绑定
【疑问】有了 Exchange 和 channel,这时,还需要什么东西呢?-- 我们要创建 Exchange 和 channel 关系的桥梁,这个桥梁称之为 Binding(绑定)。
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
![](https://images2015.cnblogs.com/blog/711762/201702/711762-20170227201318438-416350106.png)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
![](https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif)
1 class Producer 2 { 3 public static void Send() 4 { 5 var factory = new ConnectionFactory() 6 { 7 HostName = "localhost", 8 Port = 5672, 9 UserName = "guest",10 Password = "guest"11 };12 13 using (var connection = factory.CreateConnection())14 {15 using (var channel = connection.CreateModel())16 {17 channel.ExchangeDeclare(exchange: "logs", //交换机名18 type: "fanout"); //交换类型19 20 // Guid21 var message = Guid.NewGuid().ToString();22 var body = Encoding.UTF8.GetBytes(message);23 channel.BasicPublish(exchange: "logs",24 routingKey: "",25 basicProperties: null,26 body: body);27 28 Console.WriteLine(" [x] Sent {0}", message);29 }30 31 Console.WriteLine(" Press [enter] to exit.");32 Console.ReadLine();33 }34 }35 }
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
![](https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif)
1 class Reciver 2 { 3 public static void Recive() 4 { 5 var factory = new ConnectionFactory() 6 { 7 HostName = "localhost", 8 Port = 5672, 9 UserName = "guest",10 Password = "guest"11 };12 13 using (var connection = factory.CreateConnection())14 using (var channel = connection.CreateModel())15 {16 channel.ExchangeDeclare(exchange: "wen_logs", //交换机名17 type: "fanout"); //交换类型18 19 //创建队列20 var queueName = channel.QueueDeclare().QueueName;21 channel.QueueBind(queue: queueName,22 exchange: "wen_logs",23 routingKey: "");24 25 Console.WriteLine(" [*] Waiting for logs.");26 27 var consumer = new EventingBasicConsumer(channel);28 consumer.Received += (model, ea) =>29 {30 var body = ea.Body;31 var message = Encoding.UTF8.GetString(body);32 Console.WriteLine(" [x] {0}", message);33 };34 channel.BasicConsume(queue: queueName,35 noAck: true,36 consumer: consumer);37 38 Console.WriteLine(" Press [enter] to exit.");39 Console.ReadLine();40 }41 }42 }
尝试发现 - 新物种
这都不是事!EasyNetQ,看名字就知道,搞定 MQ,So easy!
连接 RabbitMQ 代理:
var bus = RabbitHutch.CreateBus("host=localhost");
发布:
bus.Publish(message);
订阅:
bus.Subscribe("my_subscription_id", msg => Console.WriteLine(msg.Text));
下面我们通过 Demo 来感受一下 Easy 的程度吧,创建项目(效果图如下,附 下载):
Wen.EasyNetQDemo.Model:类库
Wen.EasyNetQDemo.Publisher,Wen.EasyNetQDemo.Subscriber:控制台应用程序,都使用 Nuget 直接安装 EasyNetQ 包,都引用类库 Model。
![](https://images2015.cnblogs.com/blog/711762/201702/711762-20170227231239766-615244215.png)
public class Demo { public string Message { get; set; } }
Publisher
1 using System; 2 using EasyNetQ; 3 using Wen.EasyNetQDemo.Model; 4 5 namespace Wen.EasyNetQDemo.Publisher 6 { 7 internal class Program 8 { 9 private static void Main(string[] args)10 {11 using (var bus = RabbitHutch.CreateBus("host=localhost"))12 {13 string input;14 Console.WriteLine("请输入信息。 如果是“esc” 将退出当前窗口。");15 16 while ((input = Console.ReadLine()) != "esc")17 {18 bus.Publish(new Demo19 {20 Message = input21 });22 }23 24 }25 }26 }27 }
【备注】RabbitHutch.CreateBus() 方法可以创建一个简单的发布/订阅和包含请求/响应 API 的消息总线。
Subscriber
1 using System; 2 using EasyNetQ; 3 using Wen.EasyNetQDemo.Model; 4 5 namespace Wen.EasyNetQDemo.Subscriber 6 { 7 internal class Program 8 { 9 private static void Main(string[] args)10 {11 using (var bus = RabbitHutch.CreateBus("host=localhost"))12 {13 bus.Subscribe("test", HandleDemo);14 15 Console.WriteLine("监听信息中...输入“return”将退出当前窗口!");16 Console.ReadLine();17 }18 }19 20 private static void HandleDemo(Demo demo)21 {22 Console.ForegroundColor = ConsoleColor.Green;23 Console.WriteLine($"Got message: {demo.Message}");24 Console.ResetColor();25 }26 }27 }
图:效果图