PushConsumer核心参数详解
参数名 | 默认值 | 说明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组 |
messageModel | CLUSTERING | 消息模型,支持以下两种1.集群消费2.广播消费 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Subscription | {} | 订阅关系 |
messageListener | 消息监听器 | |
offsetStore | 消费进度存储 | |
consumeThreadMin | 10 | 消费线程池数量 |
consumeThreadMax | 20 | 消费线程池数量 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
Pullinterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
注意:ConsumeFromWhere
consumer可以设置消费的起始点,MQ提供了三种方式:
CONSUME_FROM_LAST_OFFSET
CONSUME_FROM_FIRST_OFFSET
CONSUME_FROM_TIMESTAMP
PushConsumer消费模式-集群模式
RocketMQ有两种消费模式:Broadcasting广播模式,Clustering集群模式,默认的是集群消费模式。
Clustering集群模式(默认):
- 1、通过consumer.setMessageModel(MessageModel.CLUSTERING);进行设置
- 2、GroupName用于把多个Consumer组织到一起
- 3、相同GroupName的Consumer只消费所订阅消息的一部分,即ConsumerGroup中的Consumer实例平均分摊消费topic的消息
- 4、目的:达到天然的负载均衡机制
- 5、消息的消费进度,即consumerOffset.json保存在broker上。
- 6、消息消费失败后,consumer会发回broker,broker根据消费失败次数设置不同的delayLevel进行重发。
- 7、相同topic不同的consumerGroup组成伪广播模式,可达到所有consumer都会收到消息。
PushConsumer消费模式-广播模式
Broadcasting广播模式:
1、通过consumer.setMessageModel(MessageModel.BROADCASTING);进行设置
2、消息的消费进度保存在consumer的机器上。
3、同一个ConsumerGroup里的Consumer都消费订阅Topic的全部信息
4、不同ConsumerGroup里的Consumer可以实现根据tags进行消费即:
consumer1.subscribe("test_model_topic","TagA");
consumer2.subscribe("test_model_topic","TagB");
5、消息消费失败后直接丢弃,不会发回broker进行重投递。
6、 由于所有consumer都需要收到消息,所以不存在负载均衡策略。
消息存储核心-偏移量Offset
Offset是消息消费进度的核心,指某个topic下的一条消息在某个MessageQueue里的位置,通过Offset可以进行消息的定位
Offset的存储实现分为远程文件类型和本地文件类型两种,集群模式下offset存在broker中; 广播模式下offset存在consumer中。
集群模式-RemoteBrokerOffsetStore解析
RocketMQ默认是集群消费模式Clustering,采用远程文件存储Offset,即存储在broker中
本质是因为多消费模式,每个Consumer只消费所订阅主题的一部分,这种情况下就需要由Broker去控制Offset的值,使用RemoteBrokerOffsetStore
广播模式-LocalFileOffsetStore解析
在广播模式下,由于每个Consumer都会收到消息且消费,那么各个Consumer之间没有任何干扰,都是独立线程消费,所以使用LocalFileOffsetStore,即把Offset存储到本地
PushConsumer消费者长轮询模式分析
DefaultPushConsumer是使用长轮询模式进行实现的
常见的数据同步方式有这几种:
push:producer发送消息后,broker马上把消息投递给consumer。这种方式好在实时性比较高,但是会增加broker的负载;而且消费端能力不同,如果push推送过快,消费端会出现很多问题。
pull:producer发送消息后,broker什么也不做,等着consumer自己来读取。它的优点在于主动权在消费者端,可控性好;但是间隔时间不好设置,间隔太短浪费资源,间隔太长又会消费不及时。
长轮询机制:当consumer过来请求时,broker会保持当前连接一段时间 默认15s,如果这段时间内有消息到达,则立刻返回给consumer;15s没消息的话则返回空然后重新请求。这种方式的缺点就是服务端要保存consumer状态,客户端过多会一直占用资源。
consumer是长轮询拉消息,当consumer拉消息时,broker端如果没有新消息,broker会通过PullRequestHoldService服务hold住这个请求:
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
Broker通过ReputMessageService异步构建ConsumeQueue并通过注册的MessageArrivingListener通知PullRequestHoldService#notifyMessageArriving达到有消息,立即推送给consumer。具体实现在ReputMessageService的实现类DefaultMessageStore#doReput()中
详细的长轮询和Offset分析:RocketMQ原理解析-Consumer
RocketMQ消费者-PullConsumer使用
pull方式主要做了三件事:
- 1、获取MessageQueue并遍历
- 2、维护OffsetStore
- 3、根据不同的消息状态做不同的处理
1、DefaultMQPullConsumer,Pull模式简单样例
public class PullConsumer {
// Map<key,value> key为指定队列,value为这个队列拉取数据的最后位置
private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
public static void main(String[] args) {
try {
String group_name = "test_pull_producer_name";
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESER_ADDR);
consumer.start();
System.out.println("consumer start......");
//从topicTest这个主题去获取所有队列(默认会有4个队列)
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues("test_pull_topic");
//遍历每一个队列进行数据拉取
for (MessageQueue messageQueue : messageQueues) {
System.out.println("consumer from the queue:" + messageQueue);
SINGLE_MQ:
while (true) {
try {
//从queue中获取数据,从什么位置开始拉取数据,单次最多拉取32条数据
PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, null, getMessageQueueOffset(messageQueue), 32);
System.out.println(pullResult);
System.out.println(pullResult.getPullStatus());
putMessageQueueOffset(messageQueue, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
System.out.println("没有新的数据啦......");
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (MQClientException e) {
e.printStackTrace();
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offsetTable.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offsetTable.put(mq, offset);
}
}
2.RocketMQ Pull模式下提供的负载均衡样例(基于MQPullConsumerScheduleService)
public class PullScheduleService {
public static void main(String[] args) throws MQClientException {
String group_name = "test_pull_consumer_name";
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESER_ADDR);
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
try {
// 获取从哪里拉取
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> list = pullResult.getMsgFoundList();
for(MessageExt msg : list){
//消费数据...
System.out.println(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
// 设置再过3000ms后重新拉取
context.setPullNextDelayTimeMillis(3000);
}
catch (Exception e) {
e.printStackTrace();
}
}
});
scheduleService.start();
}
}
https://www.cnblogs.com/wlwl/p/10889850.html
https://www.jianshu.com/p/fac642f3c1af?utm_source=oschina-app