C# & RabbitMQ 之 Exchanges

Exchanges

在前面的学习中其实已经接触了exchanges,通常情况下,producer 不需要知道发送消息给哪一个queue,只需要发送messages给exchange就足以,exchange一端接受producers的messages,另一端push message到queues。为了准确的处理messages,定义了exchange type:direct, topic, headers ,fanout。

Fanout Exchange

fanout exchange将messages广播到到已经绑定了的queues中。每个queue都会收到producer发送的信息。


fanout exchange

fanout exchange代码:
producer:

static void Main(string[] args)
        {
            //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************fanout producer***************");
            Console.WriteLine("please Input send message:");
            //连接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一种方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二种方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //产生一个连接对象
            using (var conncetion = factory.CreateConnection())
            {
                //通过conncetion产生一个连接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代码实现 exchanges和Queues 
                    //定义exchanges
                    string exchangeName = "Efanout_test";
                    //设置类型 Fanout
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                    //定义Queues
                    string queueName1 = "qfanout_test1";
                    string queueName2 = "qfanout_test2";

                    bool durable = true;//设RabbitMQ置持久化

                    channel.QueueDeclare(queueName1, durable, false, false, null);
                    channel.QueueDeclare(queueName2, durable, false, false, null);
                    //绑定 queue 与exchange
                    channel.QueueBind(queueName1, exchangeName, "", null);
                    channel.QueueBind(queueName2, exchangeName, "", null);

                    for (int i = 0; i < 20; i++)
                    {
                        string message = i.ToString();

                        var body = Encoding.UTF8.GetBytes(message);
                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
                        channel.BasicPublish(exchange: "Efanout_test", routingKey: "", mandatory: false, basicProperties: properties, body: body);
                        Console.WriteLine("[producer] send : {0}", message);
                        Thread.Sleep(1000);
                    }
                }
            }
            Console.ReadLine();
        }

接受端代码:

  static void Main(string[] args)
        {
            //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************fanout c1***************");
            //连接MQ
            var factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //产生连接对象
            using (var connection = factory.CreateConnection())
            {
                //通道
                using (var channel = connection.CreateModel())
                {
                    //公平调用
                    //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                    //订阅方式获取message
                    var consumer = new EventingBasicConsumer(channel);
                    //实现获取message处理事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);


                        Console.WriteLine("[qfanout_test1] received : {0}", message);

                        //手动设置回复
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //设置手动回复认证 接受队列名称
                    channel.BasicConsume(queue: "qfanout_test1", autoAck: false, consumer: consumer);
                    //另一个的参数
                    //channel.BasicConsume(queue: "qfanout_test2", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }

web管理界面绑定关系:


绑定关系

运行结果:


fanout exchange

Direct exchange

前面的fanout 类型中,只要exchange与queue绑定,message 发送给所有与exchange有绑定关系的queue中,但是有时候不是我们只希望传递message到某些queue中时就需要用到direct exchange,在QueueBind时添加routing key来实现。


Direct exchange

代码实现:
Direct_P:

static void Main(string[] args)
        {
            //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************Direct P***************");
            Console.WriteLine("please Input send message:");
            //连接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一种方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二种方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //产生一个连接对象
            using (var conncetion = factory.CreateConnection())
            {
                //通过conncetion产生一个连接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代码实现 exchanges和Queues 
                    //定义exchanges
                    string exchangeName = "EDirect_test";
                    //设置类型 Fanout
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //定义Queues
                    string queueName1 = "qDirect_test1";
                    string queueName2 = "qDirect_test2";
                    bool durable = true;//设RabbitMQ置持久化


                    channel.QueueDeclare(queueName1, durable, false, false, null);
                    channel.QueueDeclare(queueName2, durable, false, false, null);
                    //绑定 queue 与exchange
                    //routingkey info  waring error 
                    channel.QueueBind(queueName1, exchangeName, "info", null);
                    channel.QueueBind(queueName2, exchangeName, "error", null);
                    channel.QueueBind(queueName2, exchangeName, "waring", null);


                    for (int i = 0; i < 20; i++)
                    {
                        string message = null;

                        var properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
                        string routingkey = null;

                        if (i%3 == 0)
                        {
                            message = "error";
                            routingkey = "error";
                        }
                        else if (i%3 == 1)
                        {
                            message = "waring";
                            routingkey = "waring";
                        }
                        else
                        {
                            message = "info";
                            routingkey = "info";
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish( "EDirect_test", routingkey,  false, properties,  body);
                        Console.WriteLine("[Direct P] send : {0}", message);
                        Thread.Sleep(1000);
                    }
                }
            }
            Console.ReadLine();
        }

Direct_C:

static void Main(string[] args)
        {
            //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************Direct_C2***************");
            //连接MQ
            var factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //产生连接对象
            using (var connection = factory.CreateConnection())
            {
                //通道
                using (var channel = connection.CreateModel())
                {
                    //公平调用
                    //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                    //订阅方式获取message
                    var consumer = new EventingBasicConsumer(channel);
                    //实现获取message处理事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[Direct_C2] received : {0}--routingkey {1}", message,routingKey);

                        //手动设置回复
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //设置手动回复认证
                    channel.BasicConsume(queue: "qDirect_test2", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }

代码实现的结构如图:



运行结果:


Direct exchange实现效果

Web后台绑定关系:
后台绑定关系

Topic exchange

简单的理解Topic exchange是在Direct exchange上的扩展,routing_key不局限于完全匹配,而是像一种正则的样子去匹配。不过只有两个特殊通配符:
* 号用来匹配一个单词,比如"quick.orange.rabbit" 就可以用*.orange. 匹配到*
#号用来匹配0到多个单词,比如“lazy.orange.male.rabbit”可以用lazy.# 匹配

Topic exchange

ETopic_P.cs

static void Main(string[] args)
        {
            //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************ETopic P***************");
            Console.WriteLine("please Input send message:");
            //连接到RabbitMQ

            var factory = new RabbitMQ.Client.ConnectionFactory();
            //第一种方式
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //第二种方式
            //factory.Uri = new Uri("amqp://admin:admin@10.19.52.80:5672/");
            //产生一个连接对象
            using (var conncetion = factory.CreateConnection())
            {
                //通过conncetion产生一个连接通道
                using (var channel = conncetion.CreateModel())
                {
                    //用代码实现 exchanges和Queues 
                    //定义exchanges
                    string exchangeName = "ETopic_test";
                    //设置类型 Fanout
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //定义Queues
                    string queueName1 = "qTopic_test1";
                    string queueName2 = "qTopic_test2";
                    string queueName3 = "qTopic_test3";
                    bool durable = true;//设RabbitMQ置持久化


                    channel.QueueDeclare(queueName1, durable, false, false, null);
                    channel.QueueDeclare(queueName2, durable, false, false, null);
                    channel.QueueDeclare(queueName3, durable, false, false, null);
                    //绑定 queue 与exchange
                    //routingkey info  waring error 
                    channel.QueueBind(queueName1, exchangeName, "log.#", null);
                    channel.QueueBind(queueName2, exchangeName, "*.error", null);
                    channel.QueueBind(queueName3, exchangeName, "*.waring", null);

                    for (int i = 0; i < 20; i++)
                    {
                        string message = null;

                        //var properties = channel.CreateBasicProperties();
                        //properties.Persistent = true;
                        string routingkey = null;

                        if (i % 3 == 0)
                        {
                            message = "error";
                            routingkey = "log.error";
                        }
                        else if (i % 3 == 1)
                        {
                            message = "waring";
                            routingkey = "log.waring";
                        }
                        else
                        {
                            message = "info";
                            routingkey = "log.waring.error";
                        }

                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish("ETopic_test", routingkey, false, null, body);
                        Console.WriteLine("[ETopic P] send : {0}", message);
                        Thread.Sleep(1000);
                    }
                }
            }
            Console.ReadLine();
        }

ETopic_C.cs

static void Main(string[] args)
        {
            //参考:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

            Console.WriteLine("********************Topic_C3***************");
            //连接MQ
            var factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = "10.19.52.80";

            //产生连接对象
            using (var connection = factory.CreateConnection())
            {
                //通道
                using (var channel = connection.CreateModel())
                {
                    //公平调用
                    //channel.BasicQos(prefetchSize:0,prefetchCount:1,global:false);

                    //订阅方式获取message
                    var consumer = new EventingBasicConsumer(channel);
                    //实现获取message处理事件
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        var routingKey = ea.RoutingKey;
                        Console.WriteLine("[Topic_C3] received : {0}--routingkey {1}", message, routingKey);

                        //手动设置回复
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //设置手动回复认证
                    channel.BasicConsume(queue: "qTopic_test3", autoAck: false, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
Topic exchange运行结果

Web后台绑定关系
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容

  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,096评论 3 51
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 3,003评论 3 41
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,672评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,365评论 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,925评论 2 11