Work Queues介绍
Work Queues简而言之就是Producer将Message发送到Queues中,公平调度的发送到个个worker处理。
值得注意的地方,在消息接受过程中,worker会遇到异常而崩溃,导致接收到的消息处理失败,但是Queues发送Message并不知道这个是否已经正确处理而自动删除这条message。这样会导致Message的丢失,所以需要实现手动Message acknowledgment。当处理成功是告知RabbitMQ 这条message处理OK并删除。
除此之外还有一个Message的丢失风险,就是当RabbitMQ 退出或者异常崩溃时,会导致queue和message的丢失,所以也要配置Message durability(持久化)。
公平调度(Fair dispatch),RabbitMQ默认是平均分配message到各个worker。为防止出现某些worker因为处理比较复杂,大量的数据而一直处理繁忙状态,其他的worker却处于闲置状态,还不停的进行调度繁忙的worker,需要使用basicQos 方法设置 prefetchCount = 1 ,就是告知RabbitMQ 不要同时的给一个worker大于1条Message。
演示代码
producer:
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************producer***************");
Console.WriteLine("please Input send message:");
//连接到RabbitMQ
var factory = new RabbitMQ.Client.ConnectionFactory();
//第一种方式
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//第二种方式
//factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
//产生一个连接对象
using (var conncetion = factory.CreateConnection())
{
//通过conncetion产生一个连接通道
using (var channel = conncetion.CreateModel())
{
//用代码实现 exchanges和Queues
//定义exchanges
string exchangeName = "Ewrokqueues";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
//定义Queues
string queueName = "Qwrokqueues";
bool durable = true;//设RabbitMQ置持久化
channel.QueueDeclare(queueName, durable, false, false, null);
//绑定exchanges 和Queues
string routingKey = "task_queue";
channel.QueueBind(queueName, exchangeName, "", null);
//简单设置队列方式
//channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,
// arguments: null);
for (int i =0;i<20;i++)
{
string message = i.ToString();
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "Ewrokqueues", routingKey: "", mandatory: false, basicProperties: properties, body: body);
Console.WriteLine("[producer] send : {0}", message);
Thread.Sleep(1000);
}
}
}
Console.ReadLine();
}
worker 代码:
static void Main(string[] args)
{
//参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Console.WriteLine("********************worker1(sleep 5s)***************");
//连接MQ
var factory = new ConnectionFactory();
factory.UserName = "admin";
factory.Password = "admin";
factory.VirtualHost = "/";
factory.HostName = "10.19.52.80";
//产生连接对象
using (var connection = factory.CreateConnection())
{
//通道
using (var channel = connection.CreateModel())
{
//公平调用
channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);
//订阅方式获取message
var consumer = new EventingBasicConsumer(channel);
//实现获取message处理事件
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
//睡眠5s 另一个是1s
Thread.Sleep(5000);
Console.WriteLine("[worker1] received : {0}", message);
//手动设置回复
channel.BasicAck(deliveryTag:ea.DeliveryTag,multiple:false);
};
//设置手动回复认证
channel.BasicConsume(queue: "Qwrokqueues", autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
P会循环发送20次,每秒发送一个数字到queue中,两个worker接受message。最后从运行结果可以看到整个分配情况,worker1第一个接受到“0”,在5秒处理完成后才接受“5”,而worker2会一直在处理,而不是出于等待闲置。