RabbitMQ 的自定义消费者使用

之前的文章里面,我都是在消费端的代码里面编写 while 循环,进行 consumer.nextDelivery 方法进行获取下一条消息,然后进行消费处理,这种方式太 low 了,耦合性太高,所以要使用自定义的 consumer 来解耦,这种方式更方便一些,也是在实际工作中最常用的使用方式

下面来看看具体的代码实现, 代码地址:

https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 项目下

如图所示,先来实现我们的自定义消费者

public class MyConsumer extends DefaultConsumer {

    private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
    
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag,  //消费者标签
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        log.info("------MyConsumer-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
    }
}

接着,重点来了,在声明消费者的代码里面使用刚才的自定义消费者

/**
 * 使用自定义消费者
 */
public class Consumer {

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
    public static final String EXCHANGE_NAME = "test_consumer_exchange";
    public static final String EXCHANGE_TYPE = "topic";
    public static final String ROUTING_KEY_TYPE = "consumer.#";
    public static final String ROUTING_KEY = "consumer.save";
    public static final String QUEUE_NAME = "test_consumer_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 获取C onnection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
      //使用自定义消费者
        channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
        log.info("消费端启动成功");
    }
}

生产端代码基本不需要修改

public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String msg = "Hello RabbitMQ Consumer Message";
        for(int i = 0; i < 5; i ++){
            log.info("生产端发送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
    }
}

先启动消费端,再启动生产端,查看运行结果:注意看消费端的日志,打印出了我们自定义消费者里面的东西了。

image
image

至此,简单的使用自定义消费者demo就完成了。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 4. 设计思想 4.1 动机 我们设计的 Kafka 能够作为一个统一的平台来处理大公司可能拥有的所有实时数据馈送...
    疯狂的橙阅读 4,746评论 1 4
  • 发现 关注 消息 iOS 第三方库、插件、知名博客总结 作者大灰狼的小绵羊哥哥关注 2017.06.26 09:4...
    肇东周阅读 14,825评论 4 61
  • 那棵枯树上结着 最美丽的苹果 她借着雨水的倒影 等待着 躲在云后的 睡着了的太阳
    省略掉阅读 1,090评论 0 1
  • 笨熊之所以叫笨熊,是因为森林里的居民都认为,这只熊很傻。 笨熊笨到捉不到猎物,只好整日吃素。 笨熊下河摸鱼,鱼没捉...
    兹心非心阅读 3,012评论 2 3
  • 橡树代表的意思是:永恒。 橡树材质坚硬,粗壮宽大,树冠繁茂,有“森林之王”的美称。 在欧美文化中,橡树与人的生命相...
    哥舒阅读 3,582评论 0 0

友情链接更多精彩内容