rabbitmq:java编程(Spring Boot:Spring AMQP)

参考资料

https://spring.io/projects/spring-boot/ 在该链接下可找到 Spring AMQP链接, 可查看相关 Reference Doc 当前文档为: https://docs.spring.io/spring-amqp/docs/2.1.8.RELEASE/reference/html/

实战

  • 添加依赖
    使用Spring Boot创建项目,并添加如下依赖:
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

发送消息

  • 发送消息到队列 (Producer.java)

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class Producer {
    private String queueName = "hello.queue";
    private String exchangeName = "hello.exchange";

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.sendMsg2Queue();
    }
}

sendMsg2Queue() 向指定的队列,发送消息,具体代码如下:

   private void sendMsg2Queue() {
        //创建连接
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        //connectionFactory.setVirtualHost();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //connectionFactory.setAddresses("localhost1:5672,localhost2:5672"); //多个地址时使用
        //onnectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); //缓存类型
        //connectionFactory.setChannelCacheSize(50); //缓存数量
        //connectionFactory.setConnectionLimit(100); //最大连接数
        //connectionFactory.setConnectionTimeout(60000);


        //AmqpAdmin用于声明队列、交换器、绑定
        AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
        //声明队列:若队列不存在会自动创建队列   (若不声明队列,若队列不存在,则会导致消息发送失败)
        amqpAdmin.declareQueue(new Queue(queueName));

        //AmqpTemplate 用于发送接收消息
        AmqpTemplate amqpTemplate = new RabbitTemplate(connectionFactory);  //AmqpTemplate目前只有RabbitTemplate实现
        amqpTemplate.convertAndSend(queueName, "helloworld");
    }

描述:

  • CachingConnectionFactory用于连接管理
  • AmqpAdmin 可以声明交换器、队列、绑定 (也可以通过rabbitmq管理WEB界面进行相关的操作)
  • AmqpTemplate 可以发送、接收消息。 (异步监听接收消息不是通过AmqpTemplate ,需要通过 容器 方式实现。)
  • 发送消息到扇出交换器(FanoutExchange)

FanoutExchange:发送到扇出交换器的消息,可以转发到所有绑定到交换器的队列

       String routingKey = "";

        //AmqpAdmin用于声明交换器、队列、绑定
        AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
        amqpAdmin.declareExchange(new FanoutExchange(exchangeName));   //
        amqpAdmin.declareQueue(new Queue(queueName + ".1"));
        amqpAdmin.declareQueue(new Queue(queueName + ".2"));
        amqpAdmin.declareBinding(new Binding(queueName + ".1", Binding.DestinationType.QUEUE, exchangeName, routingKey, null));
        amqpAdmin.declareBinding(new Binding(queueName + ".2", Binding.DestinationType.QUEUE, exchangeName, routingKey, null));

        //AmqpTemplate 用于发送接收消息
        AmqpTemplate amqpTemplate = new RabbitTemplate(connectionFactory);  //AmqpTemplate目前只有RabbitTemplate实现
        amqpTemplate.convertAndSend(exchangeName, routingKey, "helloworld");
  • 路由交换器(DirectExchange) & 主题交换器(TopicExchange)

  • FanoutExchange交换器,routingKey设置为 "" 即可(实际上只要不为 null, 即使绑定的routingKey,与发送消息的routingKey不同也没有关系),所有与交换器绑定的队列均会收到消息。
  • DirectExchange交换器,绑定队列时,需要指定routingKey。在向交换器发达消息时也需要指定routingKey,该消息只会转发给对应routingKey的队列。
  • TopicExchange交换器,绑定队列时,需要指定routingKey,在routingKey中可以使用通配符(例如hello.# / hello.*)。在发送消息时,需要指定具体的routingKey ( 如 hello.world.q1 / hello.q1) , 这时只有匹配的队列才会收到消息
    说明:为了直观,routingKey通常指定为队列名,或队列名通配符。
    注意:如果交换器事先存在,请确保交换器的类型正确,否则会异常。

接收消息(同步方式)

RabbitTemplate包含发送,也包含接收API。(略)

接收消息(监听方式)

由于监听器需要长时间运行,因此采用 @Configuration @Bean注解的方式,具体如下:

  • 新建ConsumerConfiguration类:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfiguration {
    private String queueName = "hello.queue";

    @Bean //连接工厂
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

// 消息处理
    private MessageListener listener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            System.out.println("received: " + message);
        }
    };

//    @Bean  //消息监听器实现--用于处理收到的消息
//    public MessageListener listener() {
//        return new MessageListener() {
//            public void onMessage(Message message) {
//                System.out.println("received: " + message);
//            }
//        };
//    }

    @Bean //消息监听器容器
    public SimpleMessageListenerContainer messageListenerContainer() {
        //连接
        ConnectionFactory connectionFactory = rabbitConnectionFactory();

        //声明队列
        AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
        amqpAdmin.declareQueue(new Queue(queueName));

        //监听器容器
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listener);     //设置消息监听器
        //container.setMessageListener(listener());     //设置消息监听器
        //container.start();  // 在非Bean方式下使用时,需要start才能监听到消息。
        return container;
    }
}
  • 运行监听器

使用Spring Boot创建项目时,会自动生成Application启动类(类名通常为 XxxApplication.java)。代码如下图所示:

@SpringBootApplication
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication .class, args);
    }
}

该类具有@SpringBootApplication注解。运行该类,即可自动启动监听器。

  • 监听多个队列

        container.setQueueNames("queue.1","queue.2");
        container.addQueueNames("queue.3");
  • 并行监听

        container.setConcurrentConsumers(5);
        container.setMaxConcurrentConsumers(10);

事务:发送及接收消息事务(AmqpTemplate方式): [TransactionTemplate ]

未使用事务的代码示例

        AmqpTemplate amqpTemplate = new RabbitTemplate(connectionFactory);  //AmqpTemplate目前只有RabbitTemplate实现
        amqpTemplate.convertAndSend(queueName, "helloworld");

使用事务,需要做几点调整:

  1. 通过TransactionTemplater的execute ()方法,调用事务接口doInTransaction
    () 方法
    将AmqpTemplate 发送及接收消息的方法,移到doInTransaction
    ()方法中
  2. 将AmqpTemplate 类,改为RabbitTemplate类
  3. 启用事务 rabbitTemplate.setChannelTransacted(true);
    启用事务后,doInTransaction方法执行完成后,会自动提交事务;
    若通过transactionStatus.setRollbackOnly(),设置了回滚标识 ,会自动回滚。

示例代码如下:

        TransactionTemplate transactionTemplate = new TransactionTemplate(new RabbitTransactionManager(connectionFactory));
        transactionTemplate.execute(new TransactionCallback<String>() {
            @Override
            public String doInTransaction(TransactionStatus transactionStatus) {
                RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);  //AmqpTemplate目前只有RabbitTemplate实现
                rabbitTemplate.setChannelTransacted(true);  //启用事务
                try {
                    rabbitTemplate.convertAndSend(queueName, "helloworld1");
                    rabbitTemplate.convertAndSend(queueName, "helloworld2");
                } catch (Exception e) {
                    transactionStatus.setRollbackOnly();    //设置回滚标识
                }
                return null;
            }
        });

事务:接收消息(监听方式)事务控制

  • 接收消息(监听方式)应答确认

监听方式接收消息的事务控制,需使用应答确认机制。 具体需要做如下调整:

  1. 启动手动确认模式:container.setAcknowledgeMode(AcknowledgeMode.MANUAL)
  2. MessageListener接口改为ChannelAwareMessageListener接口,该接口的 onMessage(Message message, Channel channel) throws Exception 若抛出异常,则消息会返回到代理。
    代码如下:
@Configuration
public class ConsumerConfiguration {
    private String queueName = "hello.queue";

    @Bean //连接工厂
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean //消息监听器容器
    public SimpleMessageListenerContainer messageListenerContainer() {
        //连接
        ConnectionFactory connectionFactory = rabbitConnectionFactory();

        //声明队列
        AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
        amqpAdmin.declareQueue(new Queue(queueName));

        //监听器容器
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);  //手动确认
        container.setQueueNames(queueName);
        container.setMessageListener(listener);     //设置消息监听器
        return container;
    }

    private RabbitTransactionManager getRabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    private ChannelAwareMessageListener listener = new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            System.out.println("received: " + message);
            //如果本方法抛出异常,在channel上执行的事务自动回滚,消息会返回代理
        }
    };
}

注意:异常时,消息虽然返回代理,但不会再次发送给消费者处理,直到消费者断开连接后重新连接。

如果需要再次处理,请参见事务管理器

  • 事务管理器

定义事务管理器

    private RabbitTransactionManager getRabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

设置事务管理器

//设置事务管理器
container.setTransactionManager(getRabbitTransactionManager(connectionFactory));  

再次运行消息监听:若在消息处理过程中,出现异常,则消息会返回代理。然后消息会再次发送给消费者处理。(若该消息处理始终有异常,则可能不断重复该过程。)

消息处理异常 & 延迟投递重试 解决方案

如前所述 ,如果使用接收消息确认模式,如果消息处理异常,有几种结果:

  • 抛出异常,消息返回代理,但不再投递(除非重启消费者)
  • 抛出异常,消息返回代理,立即重新投递(可能导致死循环)
  • 不抛出异常,即消息不返回代理(丢弃消息)

很多业务场景,这几种场景都不是我们需要的,我们希望有异常时,不是立即重试,而是采用延迟重试策略,并控制重试次数。 activemq支持消息延迟投递,但rabbitmq并不支持。但我们可以利用rabbitmq的两处特性,间接实现消息延迟投递。这两个特性是:

  1. Time To Live(TTL):生存时间:若某个队列没有消耗者,而消息也设置了生存时间,则消息在队列中超时后,消息要么丢弃,要么进入指定的死信队列
  2. Dead Letter Exchanges(DLX):死信交换器:进入死信队列的消息可以根据路由KEY,路由到指定的队列。

利用这两个属性,即可实现消息的延迟投递,流程如下图所示:


image.png
  • 消息延迟投递

我们先来实现消息的延迟投递:

  1. 定义消费队列:hello.queue
  2. 定义延迟缓冲队列:hello.queue.delay (消息过期后,转发至DLX)
  3. 定义DLX死信交换器:hello.exchange.DLX
  4. 绑定:将DLX绑定到消费队列

下面是具体的代码:

@Configuration
public class ConsumerConfiguration {
    private String rabbmitmq_host = "localhost";
    private int rabbmitmq_prot = 5672;
    private String rabbmitmq_user = "guest";
    private String rabbmitmq_password = "guest";

    private String queueName = "hello.queue";
    private String delayQueueName = "hello.queue.delay";
    private String dlxExchangeName = "hello.exchange.DLX";
    private String dlxRoutingKey = "Hello";

    //第1步:定义消费队列
    @Bean
    public Queue helloQueue() {
        return new Queue(queueName, true, false, false);
    }

    //第2步:定义延迟缓冲队列 (消息过期后,转发至DLX)
    @Bean
    public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", dlxExchangeName);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", dlxRoutingKey);
        //params.put("x-message-ttl", 24 * 60 * 60 * 1000);            //队列TTL:1天(定义消息成为死信的最长时间,一般建议在消息中设置TTL)
        return new Queue(delayQueueName, true, false, false, params);
    }

    //第3步:定义DLX死信交换器
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchangeName);
    }

    //第4步:定义绑定:将DLX消息,路由到消费队列
    @Bean
    public Binding dlxBinding() {
        //DLX收到消息后,会向helloQueue转发;(也可向另一个Exchange转发)
        return BindingBuilder.bind(helloQueue()).to(dlxExchange()).with(dlxRoutingKey);
    }

    @Bean //连接工厂
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbmitmq_host);
        connectionFactory.setPort(rabbmitmq_prot);
        connectionFactory.setUsername(rabbmitmq_user);
        connectionFactory.setPassword(rabbmitmq_password);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        ConnectionFactory connectionFactory = rabbitConnectionFactory();
        //声明队列、交换器、绑定
        RabbitAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
        amqpAdmin.declareQueue(helloQueue());
        amqpAdmin.declareQueue(delayQueue());
        amqpAdmin.declareExchange(dlxExchange());
        amqpAdmin.declareBinding(dlxBinding());
        return amqpAdmin;
    }
}

测试:如果向延迟缓冲队列发送TTL消息(即还有过期时间的消息),该消息过期后,会转发到DLX,DLX将根据路由KEY,将消息路由到指定的消费队列,从而实现消息延迟投递。

  • 消息处理异常 & 延迟投递重试

前面演示了消息延迟投递。下面将消息处理异常 与 消息延迟投递进行结合,实现异常时延迟投递重试。
逻辑描述:消息处理过程中

  • 如果正常处理,则对消息进行确认。
  • 如果有异常,则对消息设置过期时间(以及重试计数),然后发到延迟缓冲队列,再对消息进行确认。

下面添加消息处理代码:

    @Bean //消息监听器容器
    public SimpleMessageListenerContainer messageListenerContainer() {
        //监听器容器
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);  //手动确认
        container.setQueueNames(queueName);
        container.setMessageListener(listener);     //设置消息监听器
        return container;
    }

    //定义消息监听器
    private ChannelAwareMessageListener listener = new ChannelAwareMessageListener() {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            boolean reQueue = false; //是否重入队列
            try {
                //处理消息
                System.out.println("received: " + message);

                //这里模拟异常
                //throw new Exception("");

                //正常情况下,处理完消息,执行如下手动确认语句
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), reQueue);
            } catch (Exception e) {
                try {
                    System.out.println("消息延迟投递:准备中");
                    sendDelayMsg(channel, message);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), reQueue); //确认消息
                    System.out.println("消息延迟投递-已完成!");
                } catch (Exception e2) {
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), !reQueue);
                    System.out.println("消息拒绝!");
                }
            }
        }
    };

    //发送延迟投递消息
    private void sendDelayMsg(Channel channel, Message message) throws IOException {
        //向延迟缓冲队列发送TTL消息  (如果异常,则会回滚)
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                .contentEncoding("UTF-8")
                .expiration("60000")
                .build();
        //如果要控制重试次数,可以在消息头中加上自定义属性
        String exchange = "";
        String routingKey = delayQueueName;
        channel.basicPublish(exchange, routingKey, basicProperties, message.getBody());  //发送延迟投递消息
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容