Rabbitmq打怪升级之路(十二)Headers-头交换机模式

简书:亚武de小文 【原创:转载请注明出处】

头交换机模式(Headers)

LengToo上学.png
RabbitMQ有以下几种工作模式 :
  • Work queues
  • Publish/Subscribe
  • Routing
  • Topic
  • Headers
  • RPC

Header
模型图
[亚武de小文]Headers模型图.png

消息header数据里有一个特殊的键x-match,它有两个值:

  • all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
  • any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

头交换机和主题交换机类似,区别在于:Topic路由值是基于路由键,Headers的路由值基于消息的header数据。 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值。

参考代码
生产者
  • Producer.java
    package com.yawu.xiaowen.header;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    /**
     * Header交换机
     * 生产者
     *
     * @author yawu
     * @date 2019.07.01
     */
    public class Producer {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_header_exchange";
    
        public static void execute(Map<String, Object> headerMap) {
            try {
                // RabbitMQ建立连接的管理器
                ConnectionFactory factory = new ConnectionFactory();
                // 设置服务器地址
                factory.setHost("127.0.0.1");
                factory.setUsername("guest");
                factory.setPassword("guest");
    
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel();
    
                String message = "发送信息-headers交换机";
    
                //声明一个Header类型的交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
                // 生成发送消息的属性
                AMQP.BasicProperties props = new AMQP.BasicProperties
                        .Builder()
                        .headers(headerMap)
                        .build();
    
                // 向交换机发送消息
                channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8"));
    
                LOGGER.info("消息发送成功:{}", message);
                channel.close();
                connection.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
消费者
  • Consumer01.java
    package com.yawu.xiaowen.header;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Map;
    
    /**
     * Header交换机
     * 消费者
     *
     * @author yawu
     * @date 2019.07.01
     */
    public class Consumer01 {
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
        private static final String EXCHANGE_NAME = "mq_header_exchange";
    
        public static void execute(Map<String, Object> myHeaderMap) {
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("127.0.0.1");
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
    
                //声明一个Headers类型的交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
                // 声明一个临时队列
                String queue_name = channel.queueDeclare().getQueue();
    
                // 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串""
                channel.queueBind(queue_name, EXCHANGE_NAME, "", myHeaderMap);
    
                LOGGER.info("【Consumer01:" + myHeaderMap + "】 等待消息...");
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        final String message = new String(body, "UTF-8");
                        LOGGER.info("【Consumer01:" + myHeaderMap + "】接收到的消息 '" + properties.getHeaders() + "':'" + message + "'");
                    }
                };
    
                // 队列一确认消息
                channel.basicConsume(queue_name, true, consumer);
    
            } catch (Exception e) {
                LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
            }
        }
    }
    
    
测试工具
  • HearTest.java

    package com.yawu.xiaowen.header;
    
    
    import org.junit.Test;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class HeaderTest {
    
        private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
        @Test
        public void header() throws InterruptedException {
    
            // 消费者1:绑定 health=Nice,mentality=Good
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Good");
                headerMap.put("x-match", "all");
                Producer.execute(headerMap);
            });
    
            // 消费者2:绑定  health=Nice,mentality=Bad
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Bad");
                headerMap.put("x-match", "any");
                Producer.execute(headerMap);
            });
    
            // 消费者3:绑定  health=Terrible,mentality=Good
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Terrible");
                headerMap.put("mentality", "Good");
                headerMap.put("x-match", "all");
    //            headerMap.put("x-match","any");
                Producer.execute(headerMap);
            });
    
            Thread.sleep(2 * 1000);
            System.out.println("=============消息01===================");
            // 生产者1 : health=Nice,mentality=Good,x-match=all
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
                headerMap.put("mentality", "Good");
    //            headerMap.put("x-match","all");
                Consumer01.execute(headerMap);
            });
    
            Thread.sleep(5 * 100);
            System.out.println("=============消息02===================");
            // 生产者2 : health=Nice,x-match=any
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Nice");
    //            headerMap.put("x-match","any");
                Consumer01.execute(headerMap);
            });
    
            Thread.sleep(5 * 100);
            System.out.println("=============消息03===================");
            // 生产者1 : health=Terrible,mentality=Bad,x-match=all
            executorService.submit(() -> {
                Map<String, Object> headerMap = new HashMap<>();
                headerMap.put("health", "Terrible");
                headerMap.put("mentality", "Bad");
    //            headerMap.put("x-match","all");
                Consumer01.execute(headerMap);
            });
    
            // sleep 10s
            Thread.sleep(10 * 1000);
        }
    }
    
  • 运行HeaderTest测试工具,结果如图:


    Headers模式测试结果.png
  • 至此,Headers交换机模式学习完毕

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352