安装
安装过程跳过,但是安装RabbitMq前需要安装Erlang,安装结束还需要开启管理页面
创建连接
在 C# 中,你可以使用 ConnectionFactory
类来创建与 RabbitMQ 服务器的连接
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
IConnection conn = factory.CreateConnection();
连接多个endpoint ,避免某个节点访问不了
ConnectionFactory factory = new ConnectionFactory();
factory.UserName = "username";
factory.Password = "s3Kre7";
var endpoints = new System.Collections.Generic.List<AmqpTcpEndpoint> {
new AmqpTcpEndpoint("hostname"),
new AmqpTcpEndpoint("localhost")
};
IConnection conn = factory.CreateConnection(endpoints);
为了安全考虑 guest只能本地访问
连接是设计成长期lived的,不能频繁创建连接
可以创建一个别名,来告诉rabbitmq是谁连接了,日志也会记录
ConnectionFactory factory = new ConnectionFactory();
// "guest"/"guest" by default, limited to localhost connections
factory.UserName = user;
factory.Password = pass;
factory.VirtualHost = vhost;
factory.HostName = hostName;
// this name will be shared by all connections instantiated by
// this factory
factory.ClientProvidedName = "app:audit component:event-consumer";
IConnection conn = factory.CreateConnection();
创建通道
IModel channel = conn.CreateModel();
通道也是设计成长期lived的,但是由于通道可能会发生很多异常,所以通道的生命周期可以比connection断一些
创建Exchanges,创建Queues,绑定
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
channel.QueueDeclare(queueName, false, false, false, null);
channel.QueueBind(queueName, exchangeName, routingKey, null);
生产者和消费者都需要声明Exchange和Queues,以防止有一方先启动。如果已经有该对象,那么声明不会生效(所以如果交换机已经存在,再去声明一个同名的不同类型的交换机不会生效)
被动声明:(passive declare)
检查某个队列是否存在,如果不存在不会做任何操作而是抛出一个异常,如果存在则返回这个队列,得到队列的消息数量和消费者数量
(直接声明的话,如果队列不存在的话会创建一个队列)
var response = channel.QueueDeclarePassive("queue-name");
// returns the number of messages in Ready state in the queue
response.MessageCount;
// returns the number of consumers the queue has
response.ConsumerCount;
交换机的被动声明 类似
清空Queue
channel.QueuePurge("queue-name");
发送消息
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,
routingKey: "hello",
basicProperties: null,
body: body);
消费者:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
关闭连接
channel.Close();
connection.Close();