RabbitMQ基础教程之Spring&JavaConfig使用篇

RabbitMQ基础教程之Spring使用篇

相关博文,推荐查看:

  1. RabbitMq基础教程之安装与测试
  2. RabbitMq基础教程之基本概念
  3. RabbitMQ基础教程之基本使用篇
  4. RabbitMQ基础教程之使用进阶篇

在实际的应用场景中,将RabbitMQ和Spring结合起来使用的时候可能更加频繁,网上关于Spring结合的博文中,大多都是xml的方式,这篇博文,则主要介绍下利用JavaConfig的结合,又会是怎样的

I. Spring中RabbitMQ的基本使用姿势

1. 准备

开始之前,首先添加上必要的依赖,主要利用 spring-rabbit 来实现,这个依赖中,内部又依赖的Spring相关的模块,下面统一改成5.0.4版本

<dependencies>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.7.3.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.0.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

流程分析

实现主要分为两块,一个是投递服务,一个是消费服务,结合前面RabbitMQ的基本使用姿势中的流程,即便是使用Spring,我们也避免不了下面几步

  • 建立连接
  • 声明Exchange ,声明Queue
  • 建立Queue和Exchange之间的绑定关系
  • 发送消息
  • 消费消息(ack/nak)

2. 基本case

首先借助Spring,来实现一个最基本的最简单的实现方式

/**
 * Created by yihui in 19:53 18/5/30.
 */
public class SimpleProducer {
    public static void main(String[] args) throws InterruptedException {
        CachingConnectionFactory factory = new CachingConnectionFactory("127.0.0.1", 5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        RabbitAdmin admin = new RabbitAdmin(factory);

        // 创建队列
        Queue queue = new Queue("hello", true, false, false, null);
        admin.declareQueue(queue);

        //创建topic类型的交换机
        TopicExchange exchange = new TopicExchange("topic.exchange");
        admin.declareExchange(exchange);

        //交换机和队列绑定,路由规则为匹配"foo."开头的路由键
        admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));


        //设置监听
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        Object listener = new Object() {
            public void handleMessage(String foo) {
                System.out.println(" [x] Received '" + foo + "'");
            }
        };
        MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
        container.setMessageListener(adapter);
        container.setQueues(queue);
        container.start();

        //发送消息
        RabbitTemplate template = new RabbitTemplate(factory);
        template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
        Thread.sleep(1000);

        // 关闭
        container.stop();
    }
}

3. 逻辑分析

上面这一段代码中,包含了消息投递和消费两块,从实现而言,基本上逻辑和前面的基础使用没有什么太大的区别,步骤如下:

  1. 建立连接: new CachingConnectionFactory("127.0.0.1", 5672)
  2. 声明Queue: new Queue("hello", true, false, false, null)
  3. 声明Exchange: new TopicExchange("topic.exchange")
  4. 绑定Queue和Exchange: admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*"));
  5. 投递消息: template.convertAndSend("topic.exchange", "foo.bar", "Hello, world!");
  6. 消费消息: 设置MessageListenerAdapter

这里面有几个类需要额外注意:

  • RabbitTemplate: Spring实现的发送消息的模板,可以直接发送消息
  • SimpleMessageListenerContainer: 注册接收消息的容器

II. Spring结合JavaConfig使用RabbitMQ使用姿势

1. 公共配置

主要是将公共的ConnectionFactory 和 RabbitAdmin 抽取出来

@Configuration
@ComponentScan("com.git.hui.rabbit.spring")
public class SpringConfig {

    private Environment environment;

    @Autowired
    public void setEnvironment(Environment environment) {
        this.environment = environment;
        System.out.println("then env: " + environment);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

2. 消息投递

发送消息的组件就比较简单了,直接利用 AmqpTemplate 即可

@Component
public class AmqpProducer {

    private AmqpTemplate amqpTemplate;

    @Autowired
    public void amqpTemplate(ConnectionFactory connectionFactory) {
        amqpTemplate = new RabbitTemplate(connectionFactory);
    }

    /**
     * 将消息发送到指定的交换器上
     *
     * @param exchange
     * @param msg
     */
    public void publishMsg(String exchange, String routingKey, Object msg) {
        amqpTemplate.convertAndSend(exchange, routingKey, msg);
    }
}

3. DirectExchange消息消费

根据不同的Exchange类型,分别实现如下

DirectExchange方式

@Configuration
public class DirectConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public DirectExchange directExchange() {
        DirectExchange directExchange = new DirectExchange("direct.exchange");
        directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return directExchange;
    }

    @Bean
    public Queue directQueue() {
        Queue queue = new Queue("aaa");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding directQueueBinding() {
        Binding binding = BindingBuilder.bind(directQueue()).to(directExchange()).with("test1");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener directConsumer() {
        return new BasicConsumer("direct");
    }

    @Bean(name = "directMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(directQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(directConsumer());
        return container;
    }
}

从上面的实现,基本上都是重新定义了一个Queue, Exchange, Binding, MessageListenerContainer(用来监听消息),并将消息的消费抽出了一个公共类

@Slf4j
public class BasicConsumer implements ChannelAwareMessageListener {
    private String name;

    public BasicConsumer(String name) {
        this.name = name;
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            byte[] bytes = message.getBody();
            String data = new String(bytes, "utf-8");
            System.out.println(name + " data: " + data + " tagId: " + message.getMessageProperties().getDeliveryTag());
        } catch (Exception e) {
            log.error("local cache rabbit mq localQueue error! e: {}", e);
        }
    }
}

4. 测试

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfig.class)
public class SprintUnit {
    @Autowired
    private AmqpProducer amqpProducer;

    @Test
    public void testDirectConsumer() throws InterruptedException {
        String[] routingKey = new String[]{"hello.world", "world", "test1"};
        for (int i = 0; i < 10; i++) {
            amqpProducer
                    .publishMsg("direct.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
        }
        System.out.println("-------over---------");

        Thread.sleep(1000 * 60 * 10);
    }
}

这个测试类中,虽然主要是往MQ中投递消息,但在Spring容器启动之后,接收MQ消息并消费的实际任务,是通过前面的MessageListenerContainer托付给Spring容器了,上面测试执行之后,输出为

direct data: >>> hello test1>>> 2 tagId: 1
direct data: >>> hello test1>>> 5 tagId: 2
direct data: >>> hello test1>>> 8 tagId: 3

5. Topic & Fanout策略

上面的一个写出来之后,再看这两个就比较相似了

@Configuration
public class TopicConsumerConfig {
    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public TopicExchange topicExchange() {
        TopicExchange topicExchange = new TopicExchange("topic.exchange");
        topicExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return topicExchange;
    }

    @Bean
    public Queue topicQueue() {
        Queue queue = new Queue("bbb");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding topicQueueBinding() {
        Binding binding = BindingBuilder.bind(topicQueue()).to(topicExchange()).with("*.queue");
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener topicConsumer() {
        return new BasicConsumer("topic");
    }

    @Bean(name = "topicMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(topicQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(topicConsumer());
        return container;
    }
}

对应的测试case

@Test
public void testTopicConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue"};
    for (int i = 0; i < 20; i++) {
        amqpProducer.publishMsg("topic.exchange", routingKey[i % 3], ">>> hello " + routingKey[i % 3] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

广播方式

@Configuration
public class FanoutConsumerConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Bean
    public FanoutExchange fanoutExchange() {
        FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange");
        fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return fanoutExchange;
    }

    @Bean
    public Queue fanoutQueue() {
        Queue queue = new Queue("ccc");
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding fanoutQueueBinding() {
        Binding binding = BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }

    @Bean
    public ChannelAwareMessageListener fanoutConsumer() {
        return new BasicConsumer("fanout");
    }

    @Bean(name = "FanoutMessageListenerContainer")
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setRabbitAdmin(rabbitAdmin);
        container.setQueues(fanoutQueue());
        container.setPrefetchCount(20);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(fanoutConsumer());
        return container;
    }
}

对应的测试case

@Test
public void testFanoutConsumer() throws InterruptedException {
    String[] routingKey = new String[]{"d.queue", "a.queue", "cqueue", "hello.world", "world", "test1"};
    for (int i = 0; i < 20; i++) {
        amqpProducer
                .publishMsg("fanout.exchange", routingKey[i % 6], ">>> hello " + routingKey[i % 6] + ">>> " + i);
    }
    System.out.println("-------over---------");

    Thread.sleep(1000 * 60 * 10);
}

II. 其他

项目地址

一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

扫描关注

QrCode
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容