1. PushConsumer核心参数详解
- consumeFromWhere
CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
- allocateMessageQueueStrategy
平均分配的实现算法
如果消费者的个数可以除尽队列的个数,那么就完全平均分。
如果不能除尽。那么靠前的消费者多消费一个队列,靠后的消费平均数个队列。
如果消费者的个数大于队列的个数,那么靠前的消费者消费一个队列,后面的不消费。
- subscription
订阅关系 - offsetStore
消费进度相关类 - consumeThreadMin/consumeThreadMax
最小使用者线程数/最大使用者线程数 - consumeConcurrentlyMaxSpan
同时最大跨度偏移,它对顺序消耗没有影响 - pullThresholdForQueue
队列级别的流控制阈值,默认情况下每个消息队列最多缓存1000条消息,参考pullBatchSize,瞬时值可能会超过限制 - pullInterval/pullBatchSize
消息拉取间隔/消息拉取大小 - consumeMessageBatchMaxSize
批量消费消息大小
2. PushConsumer消费模式-集群模式
- Clustering模式(默认)
GroupName用于把多个Consumer组织到一起
相同GroupName的Consumer只消费所订阅消息的一部分
目的:达到天然的负载均衡机制
public class Consumer1 {
public Consumer1() {
try {
String group_name = "test_model_consumer_name1";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Consumer1 c1 = new Consumer1();
System.out.println("c1 start..");
}
class Listener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
public class Consumer2 {
public Consumer2() {
try {
String group_name = "test_model_consumer_name2";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic", "TagB");
consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Consumer2 c2 = new Consumer2();
System.out.println("c2 start..");
}
class Listener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
String tags = msg.getTags();
//if(tags.equals("TagB")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
public class Consumer3 {
public Consumer3() {
try {
String group_name = "test_model_consumer_name2";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic", "TagB");
consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Consumer3 c2 = new Consumer3();
System.out.println("c3 start..");
}
class Listener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
String tags = msg.getTags();
//if(tags.equals("TagB")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
其中Consumer1订阅Tag 为TagA, 主题 Consumer2、 Consumer3订阅Tag 为TagB
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String group_name = "test_model_producer_name";
DefaultMQProducer producer = new DefaultMQProducer(group_name);
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.start();
for (int i = 0; i < 8; i++) {
try {
String tag = (i % 2 == 0) ? "TagA" : "TagB";
Message msg = new Message("test_model_topic",// topic
tag,// tag
("信息内容" + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
consumer1,2,3 先于Producer启动
Producer 运行输出:
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992CE10000, offsetMsgId=276A70F600002A9F0000000000004F6C, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D040001, offsetMsgId=276A70F600002A9F0000000000005022, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D0F0002, offsetMsgId=276A70F600002A9F00000000000050D8, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D180003, offsetMsgId=276A70F600002A9F000000000000518E, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D230004, offsetMsgId=276A70F600002A9F0000000000005244, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D2D0005, offsetMsgId=276A70F600002A9F00000000000052FA, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D3D0006, offsetMsgId=276A70F600002A9F00000000000053B0, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D470007, offsetMsgId=276A70F600002A9F0000000000005466, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=3], queueOffset=3]
12:34:46.783 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:9876] result: true
12:34:46.785 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.112.246:10911] result: true
12:34:46.785 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:10911] result: true
Process finished with exit code 0
Consumer1,2,3示例运行输出
c1 start..
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容0
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容2
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容4
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容6
c2 start..
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容1
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容5
c3 start..
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容3
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容7
最佳实践:
假如生产队列是8个(queueId个数),那么消费者最好是2个、4个、8个(约数)
3. PushConsumer消费模式-广播模式
- BROADCASTING模式(广播模式)
同一个ConsumerGroup里的Consumer都消费订阅Topic全部信息
也就是一条消息会被每一个Consumer消费
setMessageModel方法
将 PushConsumer消费模式-集群模式的 Consumer1,2,3示例改为BROADCASTING模式(广播模式),同时按上述示例步骤运行
// consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setMessageModel(MessageModel.BROADCASTING);
Consumer1,2,3示例运行输出
c1 start..
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容0
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容2
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容4
收到消息: topic :test_model_topic ,tags : TagA ,msg : 信息内容6
c2 start..
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容1
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容3
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容7
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容5
c3 start..
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容7
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容3
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容1
收到消息: topic :test_model_topic ,tags : TagB ,msg : 信息内容5
结论 :
通过c2、c3输出得出同一个ConsumerGroup里的Consumer都消费订阅Topic全部信息
4.消息存储核心-偏移量Offset
Offset是消息消费进度的核心
Offset指某个topic下的一条消息在某个MessageQueue里的位置
通过Offset可以进行定位到这条消息
Offset的存储实现分为远程文件类型和本地文件类型两种
5.集群模式-RemoteBrokerOffsetStore解析
默认集群模式Clustering,采用远程文件存储Offset
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分
6.广播模式-LocalFileOffsetStore解析
广播模式下,由于每个Consumer都会收到消息且消费
各个Consumer之间没有任何干扰,独立线程消费
所以使用LocalFileOffsetStore,也就是把Offset存储到本地
7.消费者长轮询模式分析
DefaultPushConsumer是使用长轮询模式进行实现的
通常主流消息获取模式:Push消息推送模式&Pull消息拉取模式
长轮询机制
consumer 拉取消息,对应的 queue 如果没有数据,broker 不会立即返回,而是以一种长轮询的方式处理,把 PullReuqest 保存起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。
rocketMQ 长轮询
8.RocketMQ消费者-PullConsumer使用
- 消息拉取方式:DefaultMQPullConsumer
- Pull方式主要了三件事:
获取Message Queue并遍历
维护OffsetStore
根据不同的消息状态做不同的处理
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
String group_name = "test_pull_producer_name";
DefaultMQProducer producer = new DefaultMQProducer(group_name);
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("test_pull_topic",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(3000);
}
}
producer.shutdown();
}
}
public class PullConsumer {
/**
* Map<key, value> key为指定的队列,value为这个队列拉取数据的最后位置
* <p>
* 重启,当前offset不会改变,因为offset没有存储到数据里,而只是存储到内存中
*/
private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.start();
System.err.println("consumer start");
// 从TopicTest这个主题去获取所有的队列(默认会有4个队列)
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
// 遍历每一个队列,进行拉取数据
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ:
while (true) {
try {
// 从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
System.out.println(pullResult.getPullStatus());
System.out.println();
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
System.out.println("没有新的数据啦...");
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
先运行Producer再运行PullConsumer
Producer运行结果
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0B3F10000, offsetMsgId=276A70F600002A9F0000000000000E60, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0B7F00001, offsetMsgId=276A70F600002A9F0000000000000F18, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0BBE50002, offsetMsgId=276A70F600002A9F0000000000000FD0, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0BFDB0003, offsetMsgId=276A70F600002A9F0000000000001088, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0C3DA0004, offsetMsgId=276A70F600002A9F0000000000001140, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0C7D50005, offsetMsgId=276A70F600002A9F00000000000011F8, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0CBD90006, offsetMsgId=276A70F600002A9F00000000000012B0, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0CFDA0007, offsetMsgId=276A70F600002A9F0000000000001368, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0D3D00008, offsetMsgId=276A70F600002A9F0000000000001420, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0D7D00009, offsetMsgId=276A70F600002A9F00000000000014D8, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=7]
17:22:47.425 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:9876] result: true
17:22:47.430 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.112.246:10911] result: true
17:22:47.431 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:10911] result: true
PullConsumer 运行结果
consumer start
Consume from the queue: MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3]
PullResult [pullStatus=FOUND, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=5]
FOUND
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 1
Hello RocketMQ 5
Hello RocketMQ 9
PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=0]
NO_NEW_MSG
没有新的数据啦...
Consume from the queue: MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0]
PullResult [pullStatus=FOUND, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=5]
FOUND
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 2
Hello RocketMQ 6
PullResult [pullStatus=FOUND, nextBeginOffset=6, minOffset=0, maxOffset=6, msgFoundList=1]
FOUND
...省略
注意:重启,当前offset不会改变,因为offset没有存储到数据里,而只是存储到内存中
上述主动pull 消息问题:1. 没有同步远程更新消费offset 2.没有拉取消息的周期
优化
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
try {
// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for (MessageExt msg : list) {
//消费数据...
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
//更新ConsumeOffset 消费记录
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过3000ms后重新拉取
context.setPullNextDelayTimeMillis(3000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
-------------- queueId: 3-------------
-------------- queueId: 0-------------
-------------- queueId: 2-------------
-------------- queueId: 1-------------
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 1
Hello RocketMQ 3
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 7
Hello RocketMQ 5
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 9
Hello RocketMQ 1
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 8
Hello RocketMQ 5
Hello RocketMQ 9
Hello RocketMQ 1
Hello RocketMQ 5
Hello RocketMQ 9
-------------- queueId: 0-------------
-------------- queueId: 3-------------
-------------- queueId: 2-------------
-------------- queueId: 1-------------
-------------- queueId: 0-------------
...省略
特别感谢:
阿神