RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现

​---实践是检验真理的唯一标准---

yml参数配置

这次我使用的是RabbitTemplate

rabbitmq:
    host: 192.168.225.136
    port: 5672
    username: thinking
    password: 123
    virtual-host: host1
    publisher-returns: true
    # 事务模式下这行需要删除
    publisher-confirm-type: correlated
    template:
      # 找不到路由规则的消息 是否保留
      mandatory: true

为什么Template不需要定义configuration文件来接收yml文件的参数?

这是个常识问题,我这里做个记录。。。

我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。

springboot中何时加载Template,可以仔细看看自动装配注解:EnableAutoConfiguration
这类Template模板的初始化有个Properties文件,不如:
RabbitProperties
RedisProperties
方法中注解:ConfigurationProperties  指定了默认取得yml格式内容
    至于具体的属性可以找set方法

我们的demo都是基于RabbitTemplate来写。。。

初始化数据

通过枚举ExchangeEnum、QueueEnum、BindingEnum动态维护和创建
1.初始化交换机

@Bean("createExchange")
public Object createExchange(RabbitAdmin rabbitAdmin) {
    // 遍历交换机枚举
    ExchangeEnum.toList().forEach(exchangeEnum -> {
        // 根据交换机模式 生成不同的交换机
        switch (exchangeEnum.getType()) {
            case fanout:
                rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
            case topic:
                rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
            case direct:
                rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
        }
    });
    return null;
}

2.初始化队列

@Bean("createQueue")
public Object createQueue(RabbitAdmin rabbitAdmin) {
    // 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
    QueueEnum.toList().forEach(queueEnum -> {
        rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
                queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
    });
    return null;
}

3.交换机和队列绑定

@Bean("createBinding")
public Object createBinding(RabbitAdmin rabbitAdmin) {
    // 遍历队列枚举 将队列绑定到指定交换机
    BindingEnum.toList().forEach(bindingEnum -> {
        // 交换机
        ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
        // queue
        QueueEnum queueEnum = bindingEnum.getQueueEnum();
        // 绑定
        rabbitAdmin.declareBinding(new Binding(
                // queue名称
                queueEnum.getName(),
                Binding.DestinationType.QUEUE,
                // exchange名称
                exchangeEnum.getExchangeName(),
                // queue的routingKey
                queueEnum.getRoutingKey(),
                // 绑定的参数
                bindingEnum.getArguments()));
    });
    return null;
}

延迟队列

1.定义队列

/**
 * 超时队列---不需要定义RabbitListener方法
 */
deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),
/**
 * 超时接收队列
 */
reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){
      // reply_to 队列
      Map<String,Object> map = new HashMap<>();
      //设置消息的过期时间 单位毫秒
      map.put("x-message-ttl",10000);
      //设置附带的死信交换机
      map.put("x-dead-letter-exchange","reply_exchange");
      //指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
      map.put("x-dead-letter-routing-key","reply.queue");
      return map;
  }

2.定义交换机

/**
 * 超时交换机
 */
deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),
/**
 * 超时接收交换机
 */
reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),

3.交换机和队列绑定

deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),
reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)

4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener

@RabbitListener(queues = {"reply_queue"})
@RabbitHandler
public void reply_queue(Message message, Channel channel) throws Exception {
    System.err.println("消费端-reply: " + new String(message.getBody(), "UTF-8"));
    Long deliveryTag = message.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag, false);
}

测试:

/**
 * 延迟队列测试
 */
public void deal_queue_test() {
    ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
    QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
    // 消息
    String message = "11111111111111111111111111111111111111";
    MessageProperties messageProperties = getMessageProperties();
    // 发送
    rabbitTemplate.convertSendAndReceive(
            exchangeEnum.getExchangeName(),
            queueEnum.getRoutingKey(),
            new Message(message.getBytes(), messageProperties));
}

异步队列

1.AsyncRabbitTemplate定义

/**
 * 异步队列
 * @param rabbitTemplate
 * @return
 */
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
    AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
    asyncRabbitTemplate.setReceiveTimeout(50000);
    return asyncRabbitTemplate;
}

2.测试

public void async() {
    System.err.println("---------------async--------------start---------");
    AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
    // 配置下面代码时 如果 队列监听中没有返回值时会报错
    future.addCallback(new ListenableFutureCallback<Object>() {
        @Override
        public void onFailure(Throwable ex) {
            ex.printStackTrace();
        }
        @Override
        public void onSuccess(Object result) {
            System.out.println("回调收到结果=> " + result);
        }
    });
    System.err.println("---------------async--------------end---------");
}

3.监听方法

@RabbitListener(queues = {"async_queue"})
@RabbitHandler
public Object async_queue(Message message, Channel channel) throws Exception {
    System.err.println("消费端-async: " + new String(message.getBody(), "UTF-8"));
    return "ok";
}

Java api

1.消息回退:

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

long deliveryTag:消息唯一标识,这是RabbitMQ自动生成的,不需要人为管理,只需要从message.getMessageProperties().getDeliveryTag() 就可以获得。
boolean multiple:是否批量退回,不开启就使用false,开启批量退回需要增加自己的业务判断逻辑(比如:攒够几条再批量回退,或者设置等待间隔等等)
boolean requeue:是否退回到消息队列,退回就使用true,就是交给其他消费者处理。

2.拒绝消息

void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的
requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息

3.确认ack

void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

4.创建一个队列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
​
durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列

5.启动一个消费者,并返回服务端生成的消费者标识

/**
 * queue:队列名
 * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
 * consumerTag:客户端生成的一个消费者标识
 * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
 * exclusive: 如果是单个消费者,则为true
 * arguments:消费的一组参数
 * deliverCallback: 当一个消息发送过来后的回调接口
 * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
 * shutdownSignalCallback: 当channel/connection 关闭后回调
 */
channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

6.取消消费者订阅

/**
* 取消消费者对队列的订阅关系
* consumerTag:服务器端生成的消费者标识
**/
void basicCancel(String consumerTag)

7.主动拉取队列中的一条消息

/**
 * 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅
 */
GetResponse response = channel.basicGet(QUEUE_NAME, true);
System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));

参数介绍

1.队列参数

x-dead-letter-exchange 死信交换机
x-dead-letter-routing-key 死信消息重定向路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为255的队列优先排序功能

2.消息参数

content-type 消息体的MIME类型,如application/json
content-encoding 消息的编码类型
message-id 消息的唯一性标识,由应用进行设置
correlation-id 一般用做关联消息的message-id,常用于消息的响应
timestamp 消息的创建时刻,整形,精确到秒

完整项目地址在微信公众中,谢谢大家支持

Java技术学习笔记
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,888评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,677评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,386评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,726评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,729评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,337评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,902评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,807评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,349评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,439评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,567评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,242评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,933评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,420评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,531评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,995评论 3 377
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,585评论 2 359

推荐阅读更多精彩内容