springboot整合rabbitmq(rabbitadmin)

http://blog.csdn.net/u011493599/article/details/62892490

引入jar包

[java] view plain copy

    

org.springframework.boot    

spring-boot-starter-amqp    


1在resource下创建rabbitmq.properties


[java] view plain copy

#是访问port不是15672,15672是api和管理界面的port  

spring.rabbitmq.addresses=localhost:5672  

spring.rabbitmq.username=admin  

spring.rabbitmq.password=123456  

#如果要进行消息回调,则这里必须要设置为true  

spring.rabbitmq.publisherconfirms=true  

2创建rabbitmq对象RabbitMq

[java] view plain copy

package com.demo.model;  


import lombok.Getter;  

import lombok.Setter;  

import org.springframework.boot.context.properties.ConfigurationProperties;  

import org.springframework.context.annotation.Configuration;  


/**

 * Created by huguoju on 2017/3/2.

 * rabbitmq配置文件

 */  

@Configuration  

@Getter  

@Setter  

@ConfigurationProperties(locations = "classpath:rabbitmq/rabbitmq.properties",prefix = "spring.rabbitmq")  

public class RabbitMq{  


private String addresses;  

private String username;  

private String password;  

private Boolean publisherconfirms;  

}  

3生产者配置

   3.1通用性基础配置


[java] view plain copy

package com.demo.rabbitmq.sender;  



import com.demo.model.RabbitMq;  

import lombok.extern.slf4j.Slf4j;  

import org.springframework.amqp.core.Message;  

import org.springframework.amqp.core.MessageListener;  

import org.springframework.amqp.core.Queue;  

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  

import org.springframework.amqp.rabbit.connection.ConnectionFactory;  

import org.springframework.amqp.rabbit.core.RabbitAdmin;  

import org.springframework.amqp.rabbit.core.RabbitTemplate;  

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  

import org.springframework.amqp.rabbit.support.CorrelationData;  

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.beans.factory.config.ConfigurableBeanFactory;  

import org.springframework.context.annotation.Bean;  

import org.springframework.context.annotation.Configuration;  

import org.springframework.context.annotation.Scope;  

import org.springframework.messaging.converter.MappingJackson2MessageConverter;  


/**

 * Created by huguoju on 2017/3/2.

 * 创建消息生产者

 */  

@Configuration  

@Slf4j  

public class AmqpConfig {  

@Autowired  

private RabbitMq rabbitMq;  


/**

     * 连接rabbitmq

     * @return

     */  

@Bean  

public ConnectionFactory connectionFactory(){  

CachingConnectionFactory connectionFactory=new CachingConnectionFactory();  

        connectionFactory.setAddresses(rabbitMq.getAddresses());  

        connectionFactory.setUsername(rabbitMq.getUsername());  

        connectionFactory.setPassword(rabbitMq.getPassword());  

/**

         * 对于每一个RabbitTemplate只支持一个ReturnCallback。

         * 对于返回消息,模板的mandatory属性必须被设定为true,

         * 它同样要求CachingConnectionFactory的publisherReturns属性被设定为true。

         * 如果客户端通过调用setReturnCallback(ReturnCallback callback)注册了RabbitTemplate.ReturnCallback,那么返回将被发送到客户端。

         * 这个回调函数必须实现下列方法:

         *void returnedMessage(Message message, intreplyCode, String replyText,String exchange, String routingKey);

         */  

// connectionFactory.setPublisherReturns(true);  

/**

         * 同样一个RabbitTemplate只支持一个ConfirmCallback。

         * 对于发布确认,template要求CachingConnectionFactory的publisherConfirms属性设置为true。

         * 如果客户端通过setConfirmCallback(ConfirmCallback callback)注册了RabbitTemplate.ConfirmCallback,那么确认消息将被发送到客户端。

         * 这个回调函数必须实现以下方法:

         * void confirm(CorrelationData correlationData, booleanack);

         */  

        connectionFactory.setPublisherConfirms(rabbitMq.getPublisherconfirms());  

return connectionFactory;  

    }  


/**

     * rabbitAdmin代理类

     * @return

     */  

@Bean  

public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){  

return new RabbitAdmin(connectionFactory);  

    }  


/**

     * 创建rabbitTemplate 消息模板类

     * prototype原型模式:每次获取Bean的时候会有一个新的实例

     *  因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置

     * @return

     */  

@Bean  

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  

public RabbitTemplate rabbitTemplate(){  

RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory());  

// rabbitTemplate.setMandatory(true);//返回消息必须设置为true  

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//数据转换为json存入消息队列  

//  rabbitTemplate.setReplyAddress(replyQueue().getName());  

//  rabbitTemplate.setReplyTimeout(100000000);  

//发布确认  

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  

//消息发送到queue时就执行  

@Override  

public void confirm(CorrelationData correlationData, boolean b, String s) {  

log.debug(correlationData+"//////");  

if (!b){  

log.debug("发送到queue失败");  

throw new RuntimeException("send error " + s);  

                }  

            }  

        });  

return rabbitTemplate;  

    }  

}  

 3.2创建exchange


[java] view plain copy

package com.demo.rabbitmq.sender;  


/**

 * Created by huguoju on 2017/3/2.

 * exchange交换机配置

 */  

public interface RabbitMqExchange {  

final String CONTRACT_FANOUT = "CONTRACT_FANOUT";  

final String CONTRACT_TOPIC = "CONTRACT_TOPIC";  

final String CONTRACT_DIRECT = "CONTRACT_DIRECT";  

}  

3.3创建queue


[java] view plain copy

package com.demo.rabbitmq.sender;  


/**

 * Created by huguoju on 2017/3/2.

 * 消息队列配置

 */  

public interface RabbitMqQueue {  

final String CONTRACE_SELF ="CONTRACT_SELF";  

final String CONTRACE_TENANT ="CONTRACT_TENANT";  

}  

3.4针对rabbitmq服务性的配置,配置queue和交换机并绑定

[java] view plain copy

package com.demo.rabbitmq.sender;  


import org.springframework.amqp.core.*;  

import org.springframework.amqp.rabbit.core.RabbitAdmin;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.beans.factory.annotation.Qualifier;  

import org.springframework.context.annotation.Bean;  

import org.springframework.context.annotation.Configuration;  


/**

 * Created by huguoju on 2017/3/2.

 * 交换机配置并绑定queue

 */  

@Configuration  

public class ContractExchangeConfig {  

@Autowired  

private RabbitAdmin rabbitAdmin;  


/**

     * 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

     * @return

     */  

//    @Bean  

//    FanoutExchange contractFanoutExchange(){  

//        FanoutExchange fanoutExchange=new FanoutExchange(RabbitMqExchange.CONTRACT_FANOUT);  

//        rabbitAdmin.declareExchange(fanoutExchange);  

//        return fanoutExchange;  

//    }  


/**

     *  将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”

     *  默认:, durable = true, autoDelete = false

     * @return

     */  

@Bean  

    TopicExchange contractTopicExchangeDurable(){  

TopicExchange contractTopicExchange=new TopicExchange(RabbitMqExchange.CONTRACT_TOPIC);  

        rabbitAdmin.declareExchange(contractTopicExchange);  

return contractTopicExchange;  

    }  


/**

     *  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog

     * @return

     */  

@Bean  

    DirectExchange contractDirectExchange(){  

DirectExchange contractDirectExchange=new DirectExchange(RabbitMqExchange.CONTRACT_DIRECT);  

        rabbitAdmin.declareExchange(contractDirectExchange);  

return contractDirectExchange;  

    }  

@Bean  

    Queue queueContract(){  

Queue queue=new Queue(RabbitMqQueue.CONTRACE_SELF,true);  

        rabbitAdmin.declareQueue(queue);  

return queue;  

    }  

@Bean  

    Queue queueTenant(){  

Queue queue=new Queue(RabbitMqQueue.CONTRACE_TENANT,true);  

        rabbitAdmin.declareQueue(queue);  

return queue;  

    }  


//    @Bean  

//    Binding bindingExchangeContract(Queue queueContract,FanoutExchange exchange){  

//        Binding binding=BindingBuilder.bind(queueContract).to(exchange);  

//        rabbitAdmin.declareBinding(binding);  

//        return binding;  

//    }  

@Bean  

    Binding bindingExchangeContract(Queue queueContract,TopicExchange exchange){  

        Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);  

        rabbitAdmin.declareBinding(binding);  

return binding;  

    }  

//    @Bean  

//    Binding bindingExchangeContract(Queue queueContract,DirectExchange exchange){  

//        Binding binding=BindingBuilder.bind(queueContract).to(exchange).with(RabbitMqQueue.CONTRACE_SELF);  

//        rabbitAdmin.declareBinding(binding);  

//        return binding;  

//    }  

@Bean  

    Binding bindingExchangeTenant(Queue queueTenant, TopicExchange exchange) {  

        Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);  

        rabbitAdmin.declareBinding(binding);  

return binding;  

    }  


//    @Bean  

//    Binding bindingExchangeTenant(Queue queueTenant, DirectExchange exchange) {  

//        Binding binding = BindingBuilder.bind(queueTenant).to(exchange).with(RabbitMqQueue.CONTRACE_TENANT);  

//        rabbitAdmin.declareBinding(binding);  

//        return binding;  

//    }  


}  

3.5创建消息体

[java] view plain copy

package com.demo.rabbitmq.sender;  



import lombok.Builder;  

import lombok.Data;  

import lombok.Getter;  


import java.util.Date;  

import java.util.List;  


/**  

* Created by huguoju on2017/3/3.  

[java] view plain copy

 *不能用@Builder,因为json反编译的时候需要set方法,builder没有set方法  

 * 合同消息载体  

 */  

//@Builder  

//@Getter  

@Data  

public class ContractRabbitMq {  

private String id;  

private String name;  

private List testList;  

private Date createDate;  

}  

[java] view plain copy

package com.demo.rabbitmq.sender;  



import lombok.Builder;  

import lombok.Getter;  


/**

 * Created by huguoju on 2017/3/3.

 * tenant消息载体

 */  

@Builder  

@Getter  

public class TenantRabbitMq {  

private String id;  

private String name;  

}  

4消费者配置,实际使用时应该和生产者不在一个项目里,这里只是演示,所有放在了一个项目里,很多公用的文件在实际开发中可以打jar用

4.1消息的监听的代理类


[java] view plain copy

package com.demo.rabbitmq.consumer;  


import com.demo.model.RabbitMq;  

import com.demo.rabbitmq.sender.RabbitMqExchange;  

import com.demo.rabbitmq.sender.RabbitMqQueue;  

import org.springframework.amqp.core.*;  

import org.springframework.amqp.rabbit.annotation.EnableRabbit;  

import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;  

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;  

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  

import org.springframework.amqp.rabbit.connection.ConnectionFactory;  

import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.context.annotation.Bean;  

import org.springframework.context.annotation.Configuration;  

import org.springframework.messaging.converter.MappingJackson2MessageConverter;  

import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;  


/**

 * Created by huguoju on 2017/3/3.

 * 接收方配置

 * 消息的监听的代理类

 */  

@Configuration  

@EnableRabbit  

public class ConsumerConfig implements RabbitListenerConfigurer {  

@Autowired  

    ReceiverService receiverService;  

@Autowired  

private RabbitMq rabbitMq;  

@Autowired  

private ConnectionFactory connectionFactory;  

@Bean  

public DefaultMessageHandlerMethodFactory handlerMethodFactory(){  

DefaultMessageHandlerMethodFactory factory=new DefaultMessageHandlerMethodFactory();  

factory.setMessageConverter(new MappingJackson2MessageConverter());  

return factory;  

    }  

//    @Bean  

//    public SimpleMessageListenerContainer messageContainer() {  

//        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  

//        container.setQueues(queueContract());  

//      //  container.setExposeListenerChannel(true);  

//        container.setMaxConcurrentConsumers(1);  

//        container.setConcurrentConsumers(1);  

//        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  

//        container.setMessageListener(new MessageListener() {  

//  

//            @Override  

//            public void onMessage(Message message) {  

//                byte[] body = message.getBody();  

//                System.out.println("receive msg : " + new String(body));  

//               // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  

//            }  

//        });  

//        return container;  

//    }  

@Bean  

public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(){  

SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();  

        factory.setConnectionFactory(connectionFactory);  

        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);  

return factory;  

    }  


@Override  

public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {  

        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());  

    }  

}  

4.2消费者监听

[java] view plain copy

package com.demo.rabbitmq.consumer;  


import com.demo.rabbitmq.sender.ContractRabbitMq;  

import com.demo.rabbitmq.sender.RabbitMqQueue;  

import com.demo.rabbitmq.sender.TenantRabbitMq;  

import com.fasterxml.jackson.databind.ObjectMapper;  

import org.springframework.amqp.rabbit.annotation.RabbitHandler;  

import org.springframework.amqp.rabbit.annotation.RabbitListener;  

import org.springframework.stereotype.Component;  


import java.io.IOException;  


/**

 * Created by huguoju on 2017/3/3.

 */  

@Component  

public class ReceiverService {  

@RabbitListener(queues = RabbitMqQueue.CONTRACE_SELF)  

@RabbitHandler  

public void receiveContractQueue(ContractRabbitMq contract) {  

ObjectMapper objectMapper=new ObjectMapper();  

try {  

System.out.println("Received contract<" + objectMapper.writeValueAsString(contract) + ">");  

}catch (IOException e) {  

            e.printStackTrace();  

        }  

    }  


@RabbitListener(queues = RabbitMqQueue.CONTRACE_TENANT)  

public void receiveTenantQueue(TenantRabbitMq tenant) {  

ObjectMapper objectMapper=new ObjectMapper();  

try {  

System.out.println("Received contract<" + objectMapper.writeValueAsString(tenant) + ">");  

}catch (IOException e) {  

            e.printStackTrace();  

        }  

    }  

}  

以上就完成了。

创建测试controller

[java] view plain copy

package com.demo.controller;  


import com.demo.rabbitmq.sender.ContractRabbitMq;  

import com.demo.service.rabbitMq.ContractRabbitmqService;  

import com.google.common.collect.Lists;  

import io.swagger.annotations.Api;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.web.bind.annotation.RequestMapping;  

import org.springframework.web.bind.annotation.RequestMethod;  

import org.springframework.web.bind.annotation.RestController;  


import java.util.Date;  


/**

 * Created by huguoju on 2017/3/6.

 */  

@RestController  

@RequestMapping("rabbitmq")  

@Api(value = "测试rabbitmq",tags = "测试rabbitmq")  

public class RabbitMqController {  


@Autowired  

public ContractRabbitmqService contractRabbitmqService;  

@RequestMapping(value = "contract/topic",method = {RequestMethod.POST,RequestMethod.GET})  

public void contractTopic(){  

ContractRabbitMq mq=new ContractRabbitMq();  

mq.setId("15");  

mq.setName("测试");  

mq.setTestList(Lists.newArrayList("111","222"));  

mq.setCreateDate(new Date());  

        contractRabbitmqService.sendContractRabbitmqTopic(mq);  

    }  

}  

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容