RocketMQ消费模式

RocketMQ的消费模式有2种

查看一下源码,在默认情况下,就是集群消费(CLUSTERING)。另一种消费模式,是广播消费(BROADCASTING)。

其实,对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/……的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!

至于消息分发到C1/C2/C3,其实也是可以设置策略的:

默认的分配算法是AllocateMessageQueueAveragely

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:

广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。  由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

RocketMQ-广播消费模式设置

/**

* Consumer,订阅消息

*/

public classConsumer2{  

  public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");

        consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");

        consumer.setConsumeMessageBatchMaxSize(10);

        // 设置为广播消费模式        

        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {

                try {

                    for (MessageExt msg : msgs) {

                        System.out.println(" Receive New Messages: " + msg);

                    }

                } catch (Exception e) {

                    e.printStackTrace();

                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;  // 重试          

                 }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      // 成功    

                }

        });

        consumer.start();

        System.out.println("Consumer Started.");

    }

}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容