RabbitMQ几种消费者实现方式

在Java Spring Boot中,对于RabbitMQ消费者/订阅者,大概有以下实现方式:

  1. 采用SimpleMessageListenerContainer
  2. 采用RabbitListener

第一种:采用SimpleMessageListenerContainer

@Configuration
public class RabbitMQConfig {

    @Bean
    public CachingConnectionFactory connectionFactory1() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("host1");
        connectionFactory.setPort(port1);
        connectionFactory.setUsername("user1");
        connectionFactory.setPassword("password1");
        return connectionFactory;
    }

    @Bean
    public SimpleMessageListenerContainer consumeMessages(ConnectionFactory connectionFactory1) {
        SimpleMessageListenerContainer container1 = new SimpleMessageListenerContainer(connectionFactory1);
        container1.setQueueNames("queue_name_1");
        container1.setMessageListener((Message message) -> {
            // 处理来自第一个队列的消息
            System.out.println("Received message from first queue: " + new String(message.getBody()));
        });
        // container1.start(); //这里不用启动,Spring会自动启动
        return container1;
    }
}

实现原理

  1. 首先要定义连接,我们假设为connectionFactory1,当然也可以定义多个连接,并且设置Bean,方便注入使用
  2. 构造Container,这里有个参数ConnectionFactory,对于Bean,Spring会自动调用,默认是上面的连接,如果有多个,就要给参数增加指定了:@Qualifier("connectionFactory1")
    对于SimpleMessageListenerContainer,可以参考:https://cloud.tencent.com/developer/article/2244239

第二种:RabbitListener
这个就非常简单了

@Component
public class RabbitMQListenerDemo {

    @RabbitListener(queues = "queue_name_1")
    public void onMessage1(String message) {
        String messageFormat = String.format("RabbitMQListenerDemo:Listener1:%s", message);
        System.out.println(messageFormat);
    }

    @RabbitListener(queues = "queue_name_1")
    public void onMessage2(Message message) {
        String mqMsg = new String(message.getBody(), StandardCharsets.UTF_8);
        String messageId = "";
        if (message.getMessageProperties() != null && !StringUtils.isBlank(message.getMessageProperties().getMessageId())) {
            messageId = message.getMessageProperties().getMessageId();
        }

        String messageFormat = String.format("RabbitMQListenerDemo:Listener2:%s:%s", messageId, mqMsg);
        System.out.println(messageFormat);
    }
}

第一种接收字符串类型,第二种接收Message类型。

其他问题:
比如现在都是多容器部署,一个代码可能包含Api、Job等,如果都采用自动部署,可能每个容器都会启动,特别采用了配置中心,每个容器都一样的,虽然多个消费者都可以工作,提高了效率,但难以管理,如果处理呢

  1. 采用环境变量
  2. 采用其他方式触发,比如放到Job里面,可以采用Job来触发
    如果采用第二种方式,是否可以配合RabbitListener使用呢,答案是可以的
@RabbitListener(queues = "queue_name_1", autoStartup = "false", id = "RabbitMQListenerDemo1", containerFactory = "rabbitListenerContainerFactory")
public void onMessage1(String message) {
    String messageFormat = String.format("RabbitMQListenerDemo:Listener1:%s", message);
    System.out.println(messageFormat);
}

我们发现RabbitListener有个autoStartup 属性,这个可以设置为"false",这样应用在启动的时候,就不会自动启动了,那该如何启动呢

private RabbitListenerEndpointRegistry registry;
MessageListenerContainer container = registry.getListenerContainer("RabbitMQListenerDemo1");
if (!container.isRunning()) {
    container.start();
}

这里可以通过RabbitListenerEndpointRegistry的getListenerContainer(id)方法,获取Listener,然后调用start()方法启动Listener。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容