简书:亚武de小文 【原创:转载请注明出处】
头交换机模式(Headers)
RabbitMQ有以下几种工作模式 :
- Work queues
- Publish/Subscribe
- Routing
- Topic
- Headers
- RPC
Header
模型图
消息header数据里有一个特殊的键x-match
,它有两个值:
- all: 默认值。
一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
- any:
一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
头交换机和主题交换机类似,区别在于:Topic路由值是基于路由键,Headers的路由值基于消息的header数据。 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值。
参考代码
生产者
-
Producer.java
package com.yawu.xiaowen.header; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; /** * Header交换机 * 生产者 * * @author yawu * @date 2019.07.01 */ public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_header_exchange"; public static void execute(Map<String, Object> headerMap) { try { // RabbitMQ建立连接的管理器 ConnectionFactory factory = new ConnectionFactory(); // 设置服务器地址 factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); String message = "发送信息-headers交换机"; //声明一个Header类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); // 生成发送消息的属性 AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .headers(headerMap) .build(); // 向交换机发送消息 channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8")); LOGGER.info("消息发送成功:{}", message); channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费者
-
Consumer01.java
package com.yawu.xiaowen.header; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; /** * Header交换机 * 消费者 * * @author yawu * @date 2019.07.01 */ public class Consumer01 { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_header_exchange"; public static void execute(Map<String, Object> myHeaderMap) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个Headers类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); // 声明一个临时队列 String queue_name = channel.queueDeclare().getQueue(); // 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串"" channel.queueBind(queue_name, EXCHANGE_NAME, "", myHeaderMap); LOGGER.info("【Consumer01:" + myHeaderMap + "】 等待消息..."); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String message = new String(body, "UTF-8"); LOGGER.info("【Consumer01:" + myHeaderMap + "】接收到的消息 '" + properties.getHeaders() + "':'" + message + "'"); } }; // 队列一确认消息 channel.basicConsume(queue_name, true, consumer); } catch (Exception e) { LOGGER.error("an exception was occurred , caused by :{}", e.getMessage()); } } }
测试工具
-
HearTest.java
package com.yawu.xiaowen.header; import org.junit.Test; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class HeaderTest { private ExecutorService executorService = Executors.newFixedThreadPool(10); @Test public void header() throws InterruptedException { // 消费者1:绑定 health=Nice,mentality=Good executorService.submit(() -> { Map<String, Object> headerMap = new HashMap<>(); headerMap.put("health", "Nice"); headerMap.put("mentality", "Good"); headerMap.put("x-match", "all"); Producer.execute(headerMap); }); // 消费者2:绑定 health=Nice,mentality=Bad executorService.submit(() -> { Map<String, Object> headerMap = new HashMap<>(); headerMap.put("health", "Nice"); headerMap.put("mentality", "Bad"); headerMap.put("x-match", "any"); Producer.execute(headerMap); }); // 消费者3:绑定 health=Terrible,mentality=Good executorService.submit(() -> { Map<String, Object> headerMap = new HashMap<>(); headerMap.put("health", "Terrible"); headerMap.put("mentality", "Good"); headerMap.put("x-match", "all"); // headerMap.put("x-match","any"); Producer.execute(headerMap); }); Thread.sleep(2 * 1000); System.out.println("=============消息01==================="); // 生产者1 : health=Nice,mentality=Good,x-match=all executorService.submit(() -> { Map<String, Object> headerMap = new HashMap<>(); headerMap.put("health", "Nice"); headerMap.put("mentality", "Good"); // headerMap.put("x-match","all"); Consumer01.execute(headerMap); }); Thread.sleep(5 * 100); System.out.println("=============消息02==================="); // 生产者2 : health=Nice,x-match=any executorService.submit(() -> { Map<String, Object> headerMap = new HashMap<>(); headerMap.put("health", "Nice"); // headerMap.put("x-match","any"); Consumer01.execute(headerMap); }); Thread.sleep(5 * 100); System.out.println("=============消息03==================="); // 生产者1 : health=Terrible,mentality=Bad,x-match=all executorService.submit(() -> { Map<String, Object> headerMap = new HashMap<>(); headerMap.put("health", "Terrible"); headerMap.put("mentality", "Bad"); // headerMap.put("x-match","all"); Consumer01.execute(headerMap); }); // sleep 10s Thread.sleep(10 * 1000); } }
-
运行HeaderTest测试工具,结果如图:
至此,Headers交换机模式学习完毕