核心概念
- broker:简单来说就是消息队列服务器实体。
- vhost:虚拟主机,一个 broker 里可以开设多个 vhost,用作不同用户的权限分离。
- exchange:消息交换机,它指定消息按什么规则路由到哪个队列。
- queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
- routing key:路由关键字,exchange 根据这个关键字进行消息投递。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
- exchange type:包含四种消息交换机类型:fanout(把所有发送到该 exchange的消息路由到所有与它绑定的 queue 中)、direct(把消息路由到那些binding key与 routing key 完全匹配的 queue 中)、topic(将消息路由到binding key 与 routing key 相匹配的 queue 中)、headers(不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配)
使用理由
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。冗余
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。灵活性 & 峰值处理能力
当你的应用受欢迎程度大增时,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。可恢复性
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,消息队列提供了一个"只送达一次"保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。排序保证
在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。消息队列保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。理解数据流
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息队列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。异步通信
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
角色管理
RabbitMQ 的用户角色分为以下几种。
- none
- management
- policymaker
- monitoring
- administrator
权限管理
- config 用户对 exchange 和 queue 的配置权限
- write 发送队列消息的权限
- read 读取消息的权限
TTL设置
TTL(Time To Live)代表消息的生存时间,有两种方法可以设置消息过期时间,一种是设置队列属性,第二种是对每个消息单独设置。
工作模式
Rabbitmq提供以下几种常见的工作模式
演示代码
public static void Simple()
{
ConnectionFactory factory = new ConnectionFactory() { HostName = "host", UserName = "freeman", Password = "freeman", VirtualHost = "/" };
Task.Run(() =>
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var count = 1;
while (true)
{
string message = $"Say hello {count++}";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
Console.WriteLine($" [P] Sent {message}");
Thread.Sleep(1000);
}
}
});
Task.Run(() =>
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [C] Received {0}", message);
};
while (true)
{
channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
}
}
});
}
public static void CustomExchange()
{
var exchange = "hello-exchange";
var queue = "hello-queue";
var routingKey = "hello";
var type = "direct";
ConnectionFactory factory = new ConnectionFactory() { HostName = "host", UserName = "freeman", Password = "freeman", VirtualHost = "/" };
Task.Run(() =>
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: exchange, type: type, durable: false, autoDelete: false);
channel.QueueDeclare(queue: queue, durable: false, exclusive: false, autoDelete: false, arguments: null);
channel.QueueBind(queue: queue, exchange: exchange, routingKey: routingKey, arguments: null);
var count = 1;
while (true)
{
string message = $"Say hello {count++}";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchange, routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine($" [P] Sent {message}");
Thread.Sleep(1000);
}
}
});
Task.Run(() =>
{
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: queue, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [C] Received {0}", message);
};
while (true)
{
channel.BasicConsume(queue: queue, noAck: true, consumer: consumer);
}
}
});
}