springboot rabbitmq入门使用

RabbitMQ作为一款能实现高性能存储分发消息的分布式中间件,具有异步通信、服务解耦、接口限流、消息分发和业务延迟处理等功能,在实际生产环境中具有很广泛的应用。

为了能在项目中使用RabbitMQ,需要在本地安装RabbitMQ并能进行简单的使用。可参考改教程安装RabbitMQ:安装教程

一、总的代码目录结构如下:
rabbitmq使用代码目录结构.png
  • entity包实体类order,作为消息载体;
  • ackmodel包是写消息确认消费机制,自动确认消费和手工确认消费;
  • delay包是写延迟队列,在业务延迟处理时可用到,如订单30分钟内未付款自动取消订单相关业务;
  • exchangemodel包是写不同类型交换机下的消息模型,常见的三种消息模型是:direct-直接传输、fanout-广播、topic-主题消息模型;
  • springevent包是spring事件驱动模型的demo,要了解消息队列,先了解一下spring的事件驱动会更好;
  • 数据库是用mysql,项目启动需要连接本地数据库,新建一个exercisegroup数据库;
  • appilication.yaml配置rabbitmq地址,pom文件引入amqp starter即可。
二、spring事件驱动

(1)spring事件驱动模型由三部分构成,生产者、事件(消息)、消费者,即生成者采用异步的方式把事件(消息)发送给消费者,消费者监听到事件(消息)再进一步处理。


spring事件驱动.jpeg

(2)代码目录结构如下:


springevent.png

(3)示例代码
  • OrderEvent类,订单事件,继承ApplicationEvent,:
public class OrderEvent extends ApplicationEvent {
    public OrderEvent(Order source) {
        super(source);
    }
}
  • OrderPublisher类,生产者,异步发送事件:
@Component
public class OrderPublisher {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    public void sendMsg() {
        Order order = new Order();
        order.setOrdernum("123456");
        OrderEvent orderEvent = new OrderEvent(order);
        //发送消息
        applicationEventPublisher.publishEvent(orderEvent);
    }
}
  • OrderConsumer类,消费者,监听生产者发送过来的事件
@Component//加入Spring的IOC容器
@EnableAsync//允许异步执行
@Slf4j
public class OrderConsumer implements ApplicationListener<OrderEvent> {
    @Override
    @Async
    public void onApplicationEvent(OrderEvent event) {
        log.info("监听到订单,订单号:{}", ((Order) event.getSource()).getOrdernum());
    }
}

(4)执行MessageQueueApplicationTests的test方法即可看结果,RabbitMQ本质也是异步通信,消息在生产者端进行发送,在消费者端进行监听,对监听到的消息进一步处理,其功能更加强大。


test-springevent.png
三、RabbitMQ一些专有名词

Producer/Publisher生产者,投递消息的一方。
Consumer消费者,接收消息的一方。
Message消息:实际的数据,如demo中的order订单消息载体。
Queue队列:是RabbitMQ的内部对象,用于存储消息,最终将消息传输到消费者。
Exchange交换机:在RabbitMQ中,生产者发送消息到交换机,由交换机将消息路由到一个或者多个队列中
RoutingKey路由键:生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。
Binding绑定:RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样RabbitMQ就知道如何正确地将消息路由到队列。

四、RabbitMQ使用

(1)简单的demo


消息队列流程图.jpeg
简单的demo.png

(2)示例代码

  • RabbitmqConfig配置类,SimpleRabbitListenerContainerFactory Bean是消息监听容器,服务于监听者;RabbitTemplate是RabbitMQ发送消息的操作组件RabbitTemplate,此外配置类还有三个Bean,一个是队列basicQueue用于存储消息最终消息会被消费者监听到,basicExchange是交换机,生产者发送消息到交换机根据路由规则发送到相应的队列basicQueue上,basicBinding是负责绑定交换机basicExchange和队列basicQueue,根据路由规则绑定起来。创建队列、交换机的名词以及路由规则我都放到常量类RabbitMqConstants里面。
@Slf4j
@Configuration
public class RabbitmqConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    /**
     * 下面为单一消费者实例的配置
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON的格式进行传输
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //设置并发消费者实例的初始数量。在这里为1个
        factory.setConcurrentConsumers(1);
        //设置并发消费者实例的最大数量。在这里为1个
        factory.setMaxConcurrentConsumers(1);
        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        return factory;
    }
    //自定义配置RabbitMQ发送消息的操作组件RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //设置“发送消息后进行确认”
        connectionFactory.setPublisherConfirms(true);
        //设置“发送消息后返回确认信息”
        connectionFactory.setPublisherReturns(true);
        //构造发送消息组件实例对象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //发送消息后,如果发送成功,则输出“消息发送成功”的反馈信息
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
        //发送消息后,如果发送失败,则输出“消息发送失败-消息丢失”的反馈信息
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
        //定义消息传输的格式为JSON字符串格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //最终返回RabbitMQ的操作组件实例RabbitTemplate
        return rabbitTemplate;
    }
    //创建队列
    @Bean
    public Queue basicQueue(){
        return new Queue(RabbitMqConstants.BASIC_QUEUE,true);
    }
    //创建交换机:在这里以DirectExchange为例
    @Bean
    public DirectExchange basicExchange(){
        return new DirectExchange(RabbitMqConstants.BASIC_EXCHANGE,true,false);
    }
    //创建绑定
    @Bean
    public Binding basicBinding(){
        return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RabbitMqConstants.BASICE_ROUTING_KEY);
    }
}
  • RabbitMqConstants常量类,存放创建队列、交换机的名词以及路由规则。
@Data
public class RabbitMqConstants {
    //队列名词
    public static final String BASIC_QUEUE = "mq.basic.info.queue";
    //交换机名词
    public static final String BASIC_EXCHANGE = "mq.basic.info.exchange";
    //路由规则,实际为字符串
    public static final String BASICE_ROUTING_KEY = "mq.basic.info.routing.key";
}
  • BasicPublisher 类,生产者,异步发送消息
@Component
@Slf4j
public class BasicPublisher {
    //定义RabbitMQ消息操作组件RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送消息
     * @param message 待发送的消息
     */
    public void sendMsg(Order message){
        try {
            //指定消息模型中的交换机
            rabbitTemplate.setExchange(RabbitMqConstants.BASIC_EXCHANGE);
            //指定消息模型中的路由
            rabbitTemplate.setRoutingKey(RabbitMqConstants.BASICE_ROUTING_KEY);
            //转化并发送消息
            rabbitTemplate.convertAndSend(message);
            log.info("rabbitmq demo-生产者-发送消息:{} ", JSONUtil.toJsonStr(message));
        } catch (Exception e) {
            log.error("rabbitmq demo-生产者-发送消息发生异常:{} ", message, e.fillInStackTrace());
        }
    }
}
  • BasicConsumer 类,消费者,监听到消息时对消息进行处理,需要为消费者设置监听的队列mq.basic.info.queue以及监听容器singleListenerContainer。
@Component
@Slf4j
public class BasicConsumer {
    /**
     * 监听并接收消费队列中的消息-在这里采用单一容器工厂实例即可
     */
    @RabbitListener(queues = RabbitMqConstants.BASIC_QUEUE, containerFactory = "singleListenerContainer")
    public void consumeMsg(Order message) {
        try {
            log.info("rabbitmq demo-消费者-监听消费到消息:{} ", JSONUtil.toJsonStr(message));
        } catch (Exception e) {
            log.error("rabbitmq demo-消费者-发生异常:", e.fillInStackTrace());
        }
    }
}

(3)安装好erlang语言以及rabbitmq之后,项目启动,访问http://127.0.0.1:15672,输入默认账号密码,可以看到:


消息队列.png
交换机.png
交换机绑定关系.png

(4)运行test方法:

    @Test
    public void testBasicPublish() {
        Order order = new Order();
        order.setOrdernum("123456");
        basicPublisher.sendMsg(order);
    }
生产者发送消息.png
消费者监听消息.png

下一篇:springboot rabbitmq不同交换机类型实战

下一篇:springboot rabbitmq高可用消息确认消费实战

参考资料:
《分布式中间件实战》
《rabbitmq实战指南》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容