RabbitMQ入门学习系列(三).消息发送接收

快速阅读

​ 用Rabitmq的队列管理,以及如何保证消息在队列中不丢失。通过ack的消息确认和持久化进行操作。 以及Rabbit中如何用Web面板进行管理队列。消费者如何处理耗时的任务

生产者代码

创建链接=》创建信道=》声明队列 。连续生产10条消息供消费者消费

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(queue: "hello",
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);
        for (var i = 0; i < 10; i++) //连续生产10条消息,让消费者消费
        {
            string message = "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "",
                                 routingKey: "hello",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }


    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
} 

消费者代码

创建链接=》创建信道=》声明队列 =>创建EventingBasicConsumer=》接收消息进行处理。

如果挂断,消息会丢失。

static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "hello",
                                 durable: false, exclusive: false, autoDelete: false, arguments: null);

            //以下是区别生产者的
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received {0}", message);
                Thread.Sleep(3000);//模拟耗时任务 ,
                Console.WriteLine("Received over");

            };
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine("");
            Console.ReadLine();
        }

    }

}

测试结果

image

从中我们可以看到,消费者每3秒消费一个任务 。

消息确认

如果一个消费者挂掉以后,怎么办呢?

正常逻辑是RabbitMq把消费发送给消费者以后,会把消费从队列中删除 。

但是如果消费者挂掉以后怎么办呢?因为这个时候消息已经发送出去,

假如这个消息 在被消费者处理前挂掉了,我们就会丢失这个消费,

为了避免这种问题的出现, 我们要用到消息确认机制,**就是当消费者处理完消息以后,再给rabbitmq一个确认信息,告诉他我已经处理好了,你可以删除了,RabbitMQ接收到以后,会从队列中把这个消息删除, 这就保证了消息会不会因消费者挂掉而丢失没有处理的消息 **。 **如果Rabbit没有接收到消息确认的通知(在超时之前) ,则会把这个消息再放到队列中,发送给另外的消费者。****

我们把你代码改一下

消费者代码中,加入ack发送的标志

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (sender, e) =>
{
    var body = e.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine("Received {0}", message);
    Thread.Sleep(3000);//模拟耗时任务 ,
    Console.WriteLine("Received over");
    channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
};

发送者代码中加入发送的消息标识

 for (var i = 0; i < 10; i++)
 {
     string message = "Hello World!this is message "+i;
     var body = Encoding.UTF8.GetBytes(message);
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true;

     channel.BasicPublish(exchange: "",
                          routingKey: "hello",
                          basicProperties: null,
                          body: body);
     Console.WriteLine(" [x] Sent {0},id={1}", message,i);
     Thread.Sleep(1000);
 }

启动了三个消费者进程 ,但是发现队列中的任务 没有被消费完

image

还有id为6,7,8,9没有被消费, 这个时候是再重启一个消费者才可以消费完。

image

有点奇怪了。先放这里吧,做一个问题记录一下

=》更新下进展

晚上的时候查了一下。

经常测试发现 要把autoAck设置为false才可以。

channel.BasicConsume(queue: "HelloDurable1", autoAck: false, consumer: consumer);  //这个是正常的
channel.BasicConsume(queue: "HelloDurable1", autoAck: true, consumer: consumer); //这个只能消费一部分,还需要重启才可以再消费
  • 经查autoAck 是否自动确认消息,true自动确认,false 不自动要手动调用,建立设置为false

启动三个消费者测试发现正常 。

1562162168225

消息持久性

我们还需要考虑到当RabbitMq.server挂掉的时候,消息也会丢失。

为了避免此类问题:需要把消息和队列都标识为持久性。

当我们标识为以后,重启程序时,发现报错了。

image

根据提示可以看出, 队列hello先前没有被标记为持久化,但已经存在了,我们不能改变他的属性,

我们可以新建一个新的队列 。比如HelloDurable,就可以了。

生产者和消费者两端都要修改。

或者打开Rabbitmq的监控把队列进行删除

RabbitMq监控

先开始管理程序

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins en
able rabbitmq_management

[图片上传失败...(image-c6edc9-1562227373030)]

查看安装

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin>rabbitmq-plugins.bat list

[图片上传失败...(image-b4f05f-1562227373030)]

输入管理面板地址

http://127.0.0.1:15672/

用户名:guest ;密码 guest

[图片上传失败...(image-3df944-1562227373030)]

登陆进去以后,找到队列列表,删除相应的队列就可以了。

1562162380181
1562162391890

队列持久化的声明

 channel.QueueDeclare(queue: "HelloDurable",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

消费持久化的声明

var properties = channel.CreateBasicProperties();
 properties.Persistent = true;

这样即使服务器重启消息也不会丢失的。

消息负载均衡

为了避免有些消费者不能获得资源,有些消费者获得资源过多的情况,我们要做如下配置

在消费者代码中增加

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

表示每次取一个消息。

通过使用消息确认标识和配置消息持久性,让我们的消息可以持久化和不会被丢失。

友情提示

我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题。可以直接在公众号《爱码农爱生活 》留言。必定会再次复查原因。让每一篇 文章都能顺利实现。道理讲明白 。原理讲清楚。代码必实现

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容