rocketmq源码6-客户端-pull消费者

一 DefaultMQPullConsumer

  • 消费组
    String consumerGroup;
  • Long polling模式(即消费时没有消息时阻塞在broker端,等有消息可消费时再响应),消费端连接最长等待时间
    long brokerSuspendMaxTimeMillis = 1000 * 20;
  • Long polling模式,消费端连接超时时间
    long consumerTimeoutMillisWhenSuspend = 1000 * 30;
  • 非long polling模式,socket连接超时时间。
    long consumerPullTimeoutMillis = 1000 * 10;
  • 消费模式,默认集群模式,即一个消费组内的实例只有一个实例会消费一次。广播模式是所有实例各消费一次,无消费组概念。
    MessageModel messageModel = MessageModel.CLUSTERING;
  • 消息队列变更监听回调函数。
    rebalanceService重新按负载均衡策略分配消费队列时,若队列有变更则触发。
MessageQueueListener messageQueueListener;

public interface MessageQueueListener {
 messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
        final Set<MessageQueue> mqDivided);
}
  • 消费偏移读写服务
    OffsetStore offsetStore;
  • 注册PullTaskCallback回调的topic集合。
    Set<String> registerTopics = new HashSet<String>();
public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
    synchronized (this.registerTopics) {
        this.registerTopics.add(topic);
        if (listener != null) {
            this.messageQueueListener = listener;
        }
    }
}

public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
    this.callbackTable.put(topic, callback);
    this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
}
  • MessageQueue消费负载均衡策略,默认平均分配
    AllocateMessageQueueStrategy allocateMessageQueueStrategy
  • 单元模式boolean unitMode = false;
  • 重试队列最大重试消费次数,达到后仍未消费则放入死信队列
    int maxReconsumeTimes = 16;

1.1 OffsetStore

  • 初始化MessageQueue消费偏移读写服务
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPullConsumer.getMessageModel()) {
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
            break;
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
}
//启动时从持久化数据加载消费偏移到内存中
this.offsetStore.load();
  • 广播模式消费,则每个实例使用LocalFileOffsetStore本地文件持久化存储消费偏移
  • 集群模式消费,则在broker持久化每个消费组的消费偏移

1.1.1 LocalFileOffsetStore

  • 持久化存储文件路径
    String storePath;
  • 内存存储的各MessageQueue的消费偏移,ConcurrentMap和AtomicLong控制并发。
    ConcurrentMap<MessageQueue, AtomicLong> offsetTable

1.1.1.1 load()

  • 初始化时,本地文件加载消费偏移量
public void load() throws MQClientException {
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

        for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
            AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
            log.info("load consumer's offset, {} {} {}",
                this.groupName,
                mq,
                offset.get());
        }
    }
}

1.1.1.2 readOffset

  • 根据读取类型,从内存或从文件缓存中读取消费便宜量
  • 从文件读取后,更新内存中的消费偏移量
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        switch (type) {
            case MEMORY_FIRST_THEN_STORE:
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            case READ_FROM_STORE: {
                OffsetSerializeWrapper offsetSerializeWrapper;
                try {
                    offsetSerializeWrapper = this.readLocalOffset();
                } catch (MQClientException e) {
                    return -1;
                }
                if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                    AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
                    if (offset != null) {
                        this.updateOffset(mq, offset.get(), false);
                        return offset.get();
                    }
                }
            }
            default:
                break;
        }
    }

    return -1;
}

1.1.1.3 updateOffset

  • 两种方式更新消费偏移量,一种是增量更改,一种是全量更改。
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
    if (mq != null) {
        AtomicLong offsetOld = this.offsetTable.get(mq);
        if (null == offsetOld) {
            offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
        }

        if (null != offsetOld) {
            if (increaseOnly) {
                MixAll.compareAndIncreaseOnly(offsetOld, offset);
            } else {
                offsetOld.set(offset);
            }
        }
    }
}

1.1.1.4 persistAll

  • 持久化内存缓存的消费偏移到文件缓存中
  • 通过定时任务周期调用
public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        if (mqs.contains(entry.getKey())) {
            AtomicLong offset = entry.getValue();
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        }
    }

    String jsonString = offsetSerializeWrapper.toJson(true);
    if (jsonString != null) {
        try {
            MixAll.string2File(jsonString, this.storePath);
        } catch (IOException e) {
            log.error("persistAll consumer offset Exception, " + this.storePath, e);
        }
    }
}

1.1.2 RemoteBrokerOffsetStore

  • 功能和LocalFileOffsetStore相同
  • 持久化时,从本地文件缓存持久化改为broker持久化

1.1.2.1 updateConsumeOffsetToBroker

  • 查找MessageQueue所在broker地址信息,不存在则从namesrv更新topic路由信息后再重新获取
  • updateTopicRouteInfoFromNameServer向目标broker更新消费偏移。
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {

        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

1.1.2.2 fetchConsumeOffsetFromBroker

  • 获取mq所在broker地址
  • 从目标broker获取mq的消费偏移
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
    InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {

        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());

        return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

1.2 DefaultMQPullConsumerImpl

  • 配置项
    DefaultMQPullConsumer defaultMQPullConsumer;
  • 启动时间
    long consumerStartTimestamp = System.currentTimeMillis();
  • 钩子函数
private final RPCHook rpcHook;
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
  • 客户端状态
    volatile ServiceState serviceState = ServiceState.CREATE_JUST;
  • 通信接口
    MQClientInstance mQClientFactory;
  • 消费偏移持久化服务
    OffsetStore offsetStore;
  • 消费组实例消费mq负载均衡服务
    RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
  • 消费端拉取消息封装接口
    PullAPIWrapper pullAPIWrapper;
  • 消费类型,拉模式
@Override
public ConsumeType consumeType() {
    return ConsumeType.CONSUME_ACTIVELY;
}
  • 消费偏移位置,首次从最后开始消费
@Override
public ConsumeFromWhere consumeFromWhere() {
    return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
}

public enum ConsumeFromWhere {
//从最后位置开始消费
    CONSUME_FROM_LAST_OFFSET,
//从起始位置开始消费
    CONSUME_FROM_FIRST_OFFSET,
//按时间戳消费
    CONSUME_FROM_TIMESTAMP,
}

1.2.1 pull消息拉取

1.2.1.1 同步处理pullSyncImpl

private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
    long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
/* 参数校验
* 客户端状态为ServiceState.RUNNING
* 指定目标MessageQueue
* 指定消费偏移
* 指定最大获取数量
*/
    this.makeSureStateOK();
    if (null == mq) {
        throw new MQClientException("mq is null", null);
    }
    if (offset < 0) {
        throw new MQClientException("offset < 0", null);
    }
    if (maxNums <= 0) {
        throw new MQClientException("maxNums <= 0", null);
    }
//纪录topic信息
    this.subscriptionAutomatically(mq.getTopic());
//构建拉取消息的标记
/*
* int FLAG_COMMIT_OFFSET = 0x1 << 0; broker同时持久化消费偏移
* int FLAG_SUSPEND = 0x1 << 1; 无消息时,long polling等待
* int FLAG_SUBSCRIPTION = 0x1 << 2;   //消费类型,tag方式消费
* int FLAG_CLASS_FILTER = 0x1 << 3;//过滤服务器处理
*/
    int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
//构建消费类型信息
    SubscriptionData subscriptionData;
    try {
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }
//消费超时时间
    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
//pullAPIWrapper接口调用
    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
        mq,
        subscriptionData.getSubString(),
        0L,
        offset,
        maxNums,
        sysFlag,
        0,
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
        timeoutMillis,
        CommunicationMode.SYNC,
        null
    );
//返回结果处理
    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
// 钩子函数调用
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}

1.2.1.2异步处理pullAsyncImpl

  • 处理流程与同步调用类似,只是返回结果在异步回调中处理。根据返回成功或异常调用PullCallback对应接口。
private void pullAsyncImpl(
    final MessageQueue mq,
    final String subExpression,
    final long offset,
    final int maxNums,
    final PullCallback pullCallback,
    final boolean block,
    final long timeout) throws MQClientException, RemotingException, InterruptedException {
   ...
        this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            0L,
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.ASYNC,
            new PullCallback() {

                @Override
                public void onSuccess(PullResult pullResult) {
                    pullCallback
                        .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
                }

                @Override
                public void onException(Throwable e) {
                    pullCallback.onException(e);
                }
            });
    } catch (MQBrokerException e) {
        throw new MQClientException("pullAsync unknow exception", e);
    }
}

1.3 RebalancePullImpl

  • 负载均衡策略功能在父类RebalanceImpl实现
  • 实现了负载均衡消费队列变更监听回调
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
    MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
    if (messageQueueListener != null) {
        try {
            messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
        } catch (Throwable e) {
            log.error("messageQueueChanged exception", e);
        }
    }
}

1.3.1 RebalanceImpl

  • 主要push方式使用,见后面

1.4 PullAPIWrapper

1.4.1 消息拉取接口代理

  • 获取消息队列所在broker地址
  • 根据broker地址,拉取消息

1.4.2 获取消息结果处理

  • 解码消息体
  • 按tag过滤需要消费的消息
  • 钩子函数处理
  • 返回消息
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容

  • metaq是阿里团队的消息中间件,之前也有用过和了解过kafka,据说metaq是基于kafka的源码改过来的,他...
    菜鸟小玄阅读 32,915评论 0 14
  • MQ在我们日常开发过程中有着不可替代的作用,不仅可以帮助我们做到信息在系统间的传递,还能进行系统间的解耦合,也就是...
    数齐阅读 3,443评论 2 7
  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 7,589评论 0 10
  • RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点: 能够保证严格的消息顺序 提供丰富的消息拉取模式...
    AI乔治阅读 2,074评论 2 5
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,471评论 0 34