RabbitMQ可以做什么?
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
应用场景
我们知道现在很多APP都可以推送。在管理员选定内容进行推送这个行为中。
系统往往需要执行俩个操作:推送消息;记录是谁在什么时候推送的。
但是对于管理员来说,他只关心推送完成了没有,而不关心是否产生了日志。
传统的做法有2种:
串行:推送消息,然后在记录日志,在俩个操作都完成之后告诉管理员推送完成。
并行:推送消息的同时记录日志,在俩个操作都完成之后告诉管理员推送完成。
OK。我们现在引入消息队列。
引入消息队列之后,我们只需要在推送,然后日志放入消息队列中。然后就可以告诉管理员消息推送完成。
而日志信息会存放在消息队列之中。消息队列的消费者会在系统不繁忙的时候进行处理。
RabbitMQ术语
生产者:
消息发送者
消费者:
等待消息的程序
Queue:
队列,存放消息的
Simple
RabbitMQ四种exchange
如果发生紧急情况,我们的服务器宕掉的话,消息队列里的信息没了怎么办?
消息持久化
RabbitMQ为我们提供了消息持久化的手段
首先是队列持久化,然后在是消息持久化
如果消费者在消费当前消息的时候,突然崩掉,那么这条消息还在消息队列中吗?还是已经被消费掉了?
消息响应机制
RabbitMQ为我们提供了消息响应机制
啰里啰唆半天,下边是代码。
.net版本。
//生产者
var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
using (IConnection conn = factory.CreateConnection())
{
using (IModel im = conn.CreateModel())
{
im.ExchangeDeclare("rabbitmq_route", ExchangeType.Direct);
im.QueueDeclare("rabbitmq_query", true, false, false, null);//第二个参数队列持久化
im.QueueBind("rabbitmq_query", "rabbitmq_route", ExchangeType.Direct, null);
for (int i = 0; i < 5; i++)
{
var props = im.CreateBasicProperties();
props.SetPersistent(true);//消息持久化
byte[] message = Encoding.UTF8.GetBytes("Hello " + i);
im.BasicPublish("rabbitmq_route", ExchangeType.Direct, props, message);
Console.WriteLine("send:" + i);
}
}
}
//消费者
var factory = new ConnectionFactory(){ HostName = "localhost", UserName = "guest", Password = "" };
using (IConnection conn = factory.CreateConnection())
{
using (IModel im = conn.CreateModel())
{
while (true)
{
BasicGetResult res = channel.BasicGet("rabbitmq_query", false);
if (res != null)
{
Console.WriteLine("receiver:" + UTF8Encoding.UTF8.GetString(res.Body));
}
Thread.Sleep(5000);
channel.BasicAck(res.DeliveryTag, true);//消息响应
Console.WriteLine("basiack end");
}
}
}
//using (IConnection conn = factory.CreateConnection())
//{
// using (IModel im = conn.CreateModel())
// {
// im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);// 路由
// int i = 0;
// while (true)
// {
// Thread.Sleep(1000);
// ++i;
// byte[] message = Encoding.UTF8.GetBytes(i.ToString());
// im.BasicPublish("rabbitmq_route_Fanout", "", null, message);
// Console.WriteLine("send:" + i.ToString());
// }
// }
//}
//using (IConnection conn = factory.CreateConnection())
//{
// using (IModel im = conn.CreateModel())
// {
// im.ExchangeDeclare("rabbitmq_route_Fanout", ExchangeType.Fanout);
// var queueOk = im.QueueDeclare();//1
// im.QueueBind(queueOk.QueueName, "rabbitmq_route_Fanout", "");//2
// var consumer = new QueueingBasicConsumer(im);//3
// im.BasicConsume(queueOk.QueueName, true, consumer);//4
// while (true)
// {
// var _result = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//5
// var body = _result.Body;
// var message = Encoding.UTF8.GetString(body);
// Console.WriteLine("received:{0}", message);
// }
// }
//}
RabbitMQ.Client.dil 版本5.1.0-rc1