RabbitMQ(一)

一、RabbitMQ 简介

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(也可以称为:面向消息的中间件)。

RabbitMQ 服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。

特性:

可伸缩性:集群服务

消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存。

二、RabbitMQ安装

1.1安装条件:

1、 OTP 24.1 Windows 64-bit Binary File 需要先装erlang 下载地址:https://www.erlang.org/downloads

2、下载 rabbitmq-server-3.8.3.exe 安装包

1.2、装完了erlang,回去找到刚才第一步下载的rabbitMQ的安装包双击安装,一样的一直下一步就行了(中间遇到需要给它网络点确定就可以了)

1、打开cmd界面进入rabbitMQ的安装目录下的sbin目录

2、根据官网步骤执行命令

rabbitmq-plugins enable rabbitmq_management

1.3、安装成功找到安装目录找到rabbitmq-server.bat双击运行(如果有错就右键以管理员身份运行)

1、访问地址:http://localhost:15672/

默认登录名:guest

登录密码:guest

三、RabbitMQ 的基本使用

新建生产者控制台项目:RabbitMQ_Producer

通过nuget安装:

RabbitMQ.Client

代码中的用户名和密码不能使用默认的要不然会报错,在Admin中添加一个用户,如图:

1.png

添加之后默认没有连接权限需要去设置,点击新建用户,如图:

2.png

生产者代码如下:

static void Main(string[] args)
{
string host = "192.168.1.4"; //IP 地址
int port = 5672; //端口号
string userName = "admin"; //RabbitMQ 用户名
string userPw = "admin"; //RabbitMQ 密码
string virtualHost = "/";

//创建工厂
var conFactory = new ConnectionFactory();
//赋值
conFactory.HostName = host;
conFactory.Port = port;
conFactory.UserName = userName;
conFactory.Password = userPw;
conFactory.VirtualHost = virtualHost;

//创建连接
var connection = conFactory.CreateConnection();
//队列名称
string queuesName = "test";
//创建通道
var channel = connection.CreateModel();
//给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,特别是param参数,否则会报错
var param = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queuesName,durable:true,exclusive:false,arguments:param);
//发送队列
for (int i = 0; i < 5; i++)
{
var buffer = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish("", queuesName,null,buffer);
Console.WriteLine("发送消息:"+i);
}
connection.Close();
Console.ReadKey();
}

消费者代码:

static void Main(string[] args)
{
string host = "192.168.1.4";
int port = 5672;
string userName = "admin";
string userPw = "admin";
string virtualHost = "/";

//创建工厂
var conFactory = new ConnectionFactory();
//赋值
conFactory.HostName = host;
conFactory.Port = port;
conFactory.UserName = userName;
conFactory.Password = userPw;
conFactory.VirtualHost = virtualHost;

//创建连接
var connection = conFactory.CreateConnection();
//队列名称
string queuesName = "test";
//创建通道
var channel = connection.CreateModel();
//给通道绑定一个队列,队列如果不存在,则会创建新队列,如果队列已存在,那么参数一定要正确,param参数,否则会报错
var param = new Dictionary<string, object>() { { "x-queue-type", "classic" } };
channel.QueueDeclare(queue: queuesName, durable: true, exclusive: false, arguments: param);
//创建消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

consumer.Received += (sender, e) =>
{
var body = e.Body.Span;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"接收到消息:{message}");

Thread.Sleep(500);//暂停一下

//通知消息已被处理,如果没有,那么消息将会被重复消费
channel.BasicAck(e.DeliveryTag, false);
};
//ack设置成false,表示不自动提交,那么就需要在消息被消费后,手动调用BasicAck去提交消息
channel.BasicConsume(queuesName, false, consumer);

Console.ReadKey();
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容