RabbitMQ 整合spring AMQP相关组件

RabbitMQ 整合spring AMQP相关组件

  1. RabbitAdmin 是对rabbitMQ的操作工作工具.比如声明交换机,声明队列,删除之类的操作。

    1. 核心配置写法为

       @Bean
          public ConnectionFactory connectionFactory() {
              CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
              //connectionFactory.setAddresses("127.0.0.1:5672")
              connectionFactory.setAddresses("127.0.0.1");
              connectionFactory.setPort(5672);
              connectionFactory.setPassword("admin");
              connectionFactory.setUsername("admin");
              connectionFactory.setVirtualHost("/");
              return connectionFactory;
          }
      
          @Bean
          public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
              return new RabbitAdmin(connectionFactory);
          }
      

      然后就可以直接注入一个RabbitAdmin对象了。RabbitAdmin源码如下:

      @ManagedResource(description = "Admin Tasks")
      public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
            BeanNameAware, InitializingBean {
      .......
      }
      

      这个类实现了一些接口,我先看一下InitializingBean,这个接口表明这个类会初始化后会调用afterPropertiesSet方法。

      @Override
        public void afterPropertiesSet() {
      
            synchronized (this.lifecycleMonitor) {
      
                if (this.running || !this.autoStartup) {
                    return;
                }
      
                if (this.retryTemplate == null && !this.retryDisabled) {
                    this.retryTemplate = new RetryTemplate();
                    this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(DECLARE_MAX_ATTEMPTS));
                    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
                    backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL);
                    backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER);
                    backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL);
                    this.retryTemplate.setBackOffPolicy(backOffPolicy);
                }
                if (this.connectionFactory instanceof CachingConnectionFactory &&
                        ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
                    this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
                    return;
                }
      
                // Prevent stack overflow...
                final AtomicBoolean initializing = new AtomicBoolean(false);
      
                this.connectionFactory.addConnectionListener(connection -> {
      
                    if (!initializing.compareAndSet(false, true)) {
                        // If we are already initializing, we don't need to do it again...
                        return;
                    }
                    try {
                        /*
                         * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than
                         * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network
                         * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
                         * declared for every connection. If anyone has a problem with it: use auto-startup="false".
                         */
                        if (this.retryTemplate != null) {
                            this.retryTemplate.execute(c -> {
                                initialize();
                                return null;
                            });
                        }
                        else {
                            initialize();
                        }
                    }
                    finally {
                        initializing.compareAndSet(true, false);
                    }
      
                });
      
                this.running = true;
      
            }
        }
      

      核心内容就是

      if (this.retryTemplate != null) {
                            this.retryTemplate.execute(c -> {
                                initialize();
                                return null;
                            });
                        }
                        else {
                            initialize();
                        }
      

      我们再看看initialize()方法中的实现

      @Override // NOSONAR complexity
        public void initialize() {
      
            if (this.applicationContext == null) {
                this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
                return;
            }
      
            this.logger.debug("Initializing declarations");
            Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
                    this.applicationContext.getBeansOfType(Exchange.class).values());
            Collection<Queue> contextQueues = new LinkedList<Queue>(
                    this.applicationContext.getBeansOfType(Queue.class).values());
            Collection<Binding> contextBindings = new LinkedList<Binding>(
                    this.applicationContext.getBeansOfType(Binding.class).values());
            Collection<DeclarableCustomizer> customizers =
                    this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();
      
            processDeclarables(contextExchanges, contextQueues, contextBindings);
      
            final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers);
            final Collection<Queue> queues = filterDeclarables(contextQueues, customizers);
            final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers);
      
            for (Exchange exchange : exchanges) {
                if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
                    this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
                            + exchange.getName()
                            + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
                            + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
                            + "reopening the connection.");
                }
            }
      
            for (Queue queue : queues) {
                if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
                    this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
                            + queue.getName()
                            + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
                            + queue.isExclusive() + ". "
                            + "It will be redeclared if the broker stops and is restarted while the connection factory is "
                            + "alive, but all messages will be lost.");
                }
            }
      
            if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
                this.logger.debug("Nothing to declare");
                return;
            }
            this.rabbitTemplate.execute(channel -> {
                declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
                declareQueues(channel, queues.toArray(new Queue[queues.size()]));
                declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
                return null;
            });
            this.logger.debug("Declarations finished");
      
        }
      
        private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
                Collection<Binding> contextBindings) {
      
            Collection<Declarables> declarables = this.applicationContext.getBeansOfType(Declarables.class, false, true)
                    .values();
            declarables.forEach(d -> {
                d.getDeclarables().forEach(declarable -> {
                    if (declarable instanceof Exchange) {
                        contextExchanges.add((Exchange) declarable);
                    }
                    else if (declarable instanceof Queue) {
                        contextQueues.add((Queue) declarable);
                    }
                    else if (declarable instanceof Binding) {
                        contextBindings.add((Binding) declarable);
                    }
                });
            });
        }
      

      声明了一些链表存放Exchange、Queue、Binding等对象。然后判断一下各种条件,讲这些对象加入进去,转换RabbitMQ client对象,进行交互。使用InitializingBean是在bean初始化后进行的。

  2. 在RabbitMQ中API中声明一个Exchange、绑定和队列。

    在原生RabbitMQ中使用的channel进行操作的。在使用AMQP之后可以注入一个Exchange的bean的方式来声明。

         @Bean  
        public TopicExchange exchange001() {  
            return new TopicExchange("topic001", true, false);  
        }  
    
        @Bean  
        public Queue queue001() {  
            return new Queue("queue001", true); //队列持久  
        }  
    
  3. RabbitTemplate:整合spring AMQP的发送消息的模板类。消息可靠性投递模板,ConfirmCalllBack和ReturnCallBack等接口。

      @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
         return rabbitTemplate;
        }
    
  4. SimpleMessageListenerContainer简单消息监听容器,监听多个队列、自动启动自动声明。设置事务、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等。设置消息的签收模式捕获消息异常的handle。设置消费者标签生成策略、是否独占模式、消费者属性。设置具体的监听器转换器等操作。可以动态的设置

         @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setQueues(queue001());
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(5);
            container.setDefaultRequeueRejected(false);
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setExposeListenerChannel(true);
            container.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString());
            return container;
        }
    
  5. MessageListenerAdapter 消息监听适配器,主要是做消息监听器的适配工作。比如指定转换器之类的操作。

    Message listener adapter that delegates the handling of messages to target listener methods via reflection, with flexible message type conversion. Allows listener methods to operate on message content types, completely independent from the Rabbit API.
    消息监听适配器(adapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。允许监听器方法对消息内容类型进行操作,完全独立于Rabbit API。

    • defaultListenerMethod默认监听方法,设置监听方法名。
    • deletage对象:真实委托对象,用于处理消息的对象。
    • queueOrTagToMethodName队列标识与方法名组成的集合。可以将队列与方法名进行匹配。队列名称与方法名称绑定,即指定的队列的消息会被绑定的方法接受处理。
             @Bean
        public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
         
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
         MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
          adapter.setDefaultListenerMethod("consumeMessage");
    
          //全局的转换器:
          ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
    
          TextMessageConverter textConvert = new TextMessageConverter();
          convert.addDelegate("text", textConvert);
          convert.addDelegate("html/text", textConvert);
          convert.addDelegate("xml/text", textConvert);
          convert.addDelegate("text/plain", textConvert);
          adapter.setMessageConverter(convert);
          container.setMessageListener(adapter);
         
         return container;
         }
    
  6. MessageConverter消息转换器接口,自定义消息转换器实现这个接口重写toMessage和fromMessage方法。

    public class TextMessageConverter implements MessageConverter {
        @Override
        public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
          return new Message(object.toString().getBytes(), messageProperties);
    }
       
           @Override
           public Object fromMessage(Message message) throws MessageConversionException {
               String contentType = message.getMessageProperties().getContentType();
               if (null != contentType && contentType.contains("text")) {
                   return new String(message.getBody());
               }
               return message.getBody();
           }
       }
       ```
       
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355