RabbitMQ 消息队列之应用 (三)

生产者:

复制代码

1                //direct类型  路由模式  1对1匹配

2                //生产者发送消息时需要指定一个路由键(routingKey),交换机只会把消息转发给包含该路由键的队列

3                //string exchange = "TestMq_Exchange";  //交换机

4                //string routingKey = "TestMq_RoutingKey";  //路由键

5

6                string queueName = "TestMq";  //队列名

7                for (int i = 0; i < 10; i++)

8                {

9                    string message = "Hello World:" + i;

10                    RabbitMqConfig rabbitMqConfig = RabbitMqConfig.Init();  //初始化配置文件

11                    ConnectionFactory connFactory = new ConnectionFactory

12                    {

13                        Uri = new Uri(url),

14                        RequestedConnectionTimeout = rabbitMqConfig.RequestedConnectionTimeout,

15                        RequestedChannelMax = rabbitMqConfig.RequestedChannelMax,

16                        RequestedHeartbeat = rabbitMqConfig.RequestedHeartbeat,

17                        AutomaticRecoveryEnabled = false

18                    };

19                    string exchange = queueName + "_Exchange";    //交换机

20                    string routingKey = queueName + "_RoutingKey";    //路由键

21                    string exchangeType = ExchangeType.Direct;  //类型 direct

22

23                    IModel model = this._conn.CreateModel();

24                    model.ExchangeDeclare(exchange, exchangeType, true, false, null); //声明交换机 

25                    model.QueueDeclare(queueName, true, false, false, null);  //声明队列

26                    model.QueueBind(queueName, exchange, routingKey, null);  //绑定

27

28                    IBasicProperties basicProperties = model.CreateBasicProperties();

29                    basicProperties.Persistent = true;    //消息持久化

30                    basicProperties.DeliveryMode = 2;    //消息持久化, 默认为1 非持久化

31                    byte[] bytes = Encoding.UTF8.GetBytes(message);

32                    model.BasicPublish(exchange, routingKey, basicProperties, bytes);

33

34                    Console.WriteLine(DateTime.Now);

35                }

复制代码

初始化配置文件,在config当中配置

复制代码

1 ConfigurationManager.GetSection("RabbitMqConfig"); //读取配置文件

2

3  //config配置文件

4  <configSections>

5    <section name="RabbitMqConfig" type="Rabbit.Common.RabbitMqConfig,Rabbit.Common" />

6  </configSections>

7  <RabbitMqConfig RequestedHeartbeat="60" RequestedConnectionTimeout="300" RequestedChannelMax="500" Uri="amqp://admin:123456@127.0.0.1:5672//" />

复制代码

消费者1:

复制代码

1            //消费者1

2            string queueName = "TestMq";  //队列名

3            string exchange = queueName + "_Exchange"; //交换器

4            string routingKey = queueName + "_RoutingKey"; //路由关键字

5

6            var rabbitMq = RabbitMqConfig.Init(); //获取Rabbit队列配置

7            var rm = new ConnectionFactory()

8            {

9                Uri = new Uri(rabbitMq.Uri),

10                RequestedConnectionTimeout = rabbitMq.RequestedConnectionTimeout,

11                RequestedChannelMax = rabbitMq.RequestedChannelMax,

12                RequestedHeartbeat = rabbitMq.RequestedHeartbeat,

13            };

14

15            var connection = rm.CreateConnection(); //创建连接

16            var channel = connection.CreateModel(); //创建通道

17            //开启队列持久化(durable = true),不自动删除(autoDelete = false),是否专属(exclusive: false)

18            channel.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null); //声明一个交换器

19            channel.QueueDeclare(queueName, true, false, false, null); //声明一个队列

20            channel.QueueBind(queueName, exchange, routingKey, null); //绑定交换器和路由

21            channel.BasicQos(0, 1, false);  //每次只接收1个,处理完后再接收下一个

22            var consumer = new EventingBasicConsumer(channel);

23

24            consumer.Received += (model, ea) =>

25            {

26                var message = Encoding.UTF8.GetString(ea.Body);  //消息主体

27

28                //处理消息逻辑,可以使用异步处理

29                Console.WriteLine(DateTime.Now + " 收到消息:" + message);

30                Thread.Sleep(3000);  //模拟消耗延时

31

32                channel.BasicAck(ea.DeliveryTag, false); //返回确认状态  该条消息将会从队列当中移除

33

34            };

35            //监听队列,手动返回完成 第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。

36            channel.BasicConsume(queueName, false, consumer);

复制代码

消费者2:

复制代码

            //消费者2

            string queueName = "TestMq2";  //队列名2

            string exchange = "TestMq_Exchange"; //交换器          TestMq_Exchange

            string routingKey = "TestMq_RoutingKey"; //路由关键字    TestMq_RoutingKey

            。。。

            。。。

复制代码

下图所示: 消费者1 完全匹配, 消费者2的队列名称为:TestMq2,与生产者不匹配

2,广播模式 / 分发模式 ( fanout ) 

这种模式下,消息会被所有消费者消费.也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息.

fanout 类型的发送规则非常简单,它会把所有发送到该交换机Exchange的消息路由到所有与它绑定的Queue中;

也就是说:在fanout模式下,只跟 交换机Exchange有关系,跟路由key无关。

复制代码

1            string queueName = "TestMq";

2            string exchange = "TestMq_Exchange";  //只需要声明交换机 即可

3            string routingKey = "";  //路由键

4            string exchangeType = ExchangeType.Fanout;  //类型 Fanout

5            for (int i = 0; i < 10; i++)

6            {

7                string message = "Hello World:" + i;

8                RabbitMqConfig rabbitMqConfig = RabbitMqConfig.Init();  //初始化配置文件

9                ConnectionFactory connFactory = new ConnectionFactory

10                {

11                    Uri = new Uri(url),

12                    RequestedConnectionTimeout = rabbitMqConfig.RequestedConnectionTimeout,

13                    RequestedChannelMax = rabbitMqConfig.RequestedChannelMax,

14                    RequestedHeartbeat = rabbitMqConfig.RequestedHeartbeat,

15                    AutomaticRecoveryEnabled = false

16                };

17

18                IModel model = this._conn.CreateModel();

19                model.ExchangeDeclare(exchange, exchangeType, true, false, null); //声明交换机 

20                model.QueueDeclare(queueName, true, false, false, null);  //声明队列

21                model.QueueBind(queueName, exchange, routingKey, null);  //绑定

22

23                IBasicProperties basicProperties = model.CreateBasicProperties();

24                basicProperties.Persistent = true;    //消息持久化

25                basicProperties.DeliveryMode = 2;    //消息持久化, 默认为1 非持久化

26                byte[] bytes = Encoding.UTF8.GetBytes(message);

27                model.BasicPublish(exchange, routingKey, basicProperties, bytes);

28

29                Console.WriteLine(DateTime.Now + ":" + message);

30            }

复制代码

消费者1: 这里设置: queueName = TestMq

复制代码

        string queueName = "TestMq";  //队列名

        string exchange = "TestMq_Exchange";  //交换机

        string routingKey = "";  //路由键

        ....

        ....


        channel.ExchangeDeclare(exchange, ExchangeType.Fanout, true, false, null); //声明一个交换器

复制代码

消费者2: 这里设置: queueName = TestMq

复制代码

1        string queueName = "TestMq";  //队列名

2        string exchange = "TestMq_Exchange";  //交换机

3        string routingKey = "";  //路由键

4

5        ....

6        ....

7       

8        channel.ExchangeDeclare(exchange, ExchangeType.Fanout, true, false, null); //声明一个交换器

复制代码

上边示例当中使用相同的  queueName = "TestMq";  会出现如下图所示的情况,各消费一半的情况,因为系统认为是一个队列

下边我们修改一下队列名称:

1            //消费者1

2            string queueName = "TestMq1";  //队列名1

3            string exchange = "TestMq_Exchange";  //交换机

4            string routingKey = "";  //路由键

1            //消费者2

2            string queueName = "TestMq2";  //队列名2

3            string exchange = "TestMq_Exchange";  //交换机

4            string routingKey = "";  //路由键

亚马逊测评 www.yisuping.com

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • 本章我们重点学习一下Rabbit里面的exchange(交换器)的知识。 交换器分类 RabbitMQ的Excha...
    Java大生阅读 388评论 0 1
  • 摘要:RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingK...
    请叫wo小爷阅读 1,284评论 0 2
  • 什么叫消息队列? 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复...
    Agile_dev阅读 2,371评论 0 24
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,231评论 0 11
  • 去幼儿园的路上母子两个聊天,他他说:“老师让我们睡午觉的时候不要做小动作。” 妈妈问:“那你有做小动作吗? 他他:...
    黄小她阅读 386评论 0 0