Pulsar consumer seek() 源码梳理

seek

目前 Pulsar 中 consumer 的 seek 接口主要支持了两种:

  • messageID
  • publishTime

除这两种之外,还支持了特殊的两种移动cousor的方式:

  • Earliest(将游标移动到最前面)
  • Latest(移动到最后面,默认方式)

Pulsar 的client端与broker端是使用 protobuf 协议进行交互的。seek 在 proto 中的定义如下:

message CommandSeek {
    required uint64 consumer_id = 1;
    required uint64 request_id  = 2;

    optional MessageIdData message_id = 3;
    optional uint64 message_publish_time = 4;
}

message_idmessage_publish_time 就是进行 seek 操作的方式,consumer_idrequest_id 这两个是大家公有的,实现每一种 seek 的操作都会用到这两个字段,如果后续还有需要支持的seek方式,需要在 proto 协议中添加。consumer_idrequest_id 这两个字段的作用也相对好理解,consumer_id是指当前seek的操作发生在哪一个consumer上,request_id是指哪一次请求中发生的这个seek操作,有了这两个字段,pulsar可以快速定位当前seek操作发生的情况。

其中 MessageIdData 的内容如下:

message MessageIdData {
    required uint64 ledgerId = 1;
    required uint64 entryId  = 2;
    optional int32 partition = 3 [default = -1];
    optional int32 batch_index = 4 [default = -1];
}

MessageID总共有四个字段,这四个字段都是pulsar自己生成的,用户没办法set只能get。其中 ledgerIdentryId 这两个字段是booker需要使用的。后面两个字段标示的是当前message的行为,也就是其属于哪一个partition,是否开启了batch功能。这四个四段构成了一个messageID用来唯一标识一条message。

下面从源码角度来看一下,当一次 seek 操作发生时,pulsar是如何来处理的。

首先来看在 serverCnx中的 handlerSeek(CommandSeek seek) 的处理逻辑。需要说明一下,在 pulsar 中所有的请求都在 serverCnx 中封装了对应的 handler 函数来处理。

protected void handleSeek(CommandSeek seek) {
        // 1. 首先check当前连接的状态是否处于已经连接的状态
        checkArgument(state == State.Connected);
        // 2. 获取request_id
        final long requestId = seek.getRequestId();
        // 3. 通过 consumer_id 获取对应的consumer对象
        CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId());

        // 4. 判断当前messageId和messagePublishId是否存在,如果都不存在,直接返回。
        if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
            ctx.writeAndFlush(
                    Commands.newError(requestId, ServerError.MetadataError, "Message id and message publish time were not present"));
            return;
        }

        // 5. 判断consumer的状态
        boolean consumerCreated = consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();

        // 6-1. 如果consumer状态正常且是按照messageID进行seek操作,执行下面的分支。
        if (consumerCreated && seek.hasMessageId()) {
            // 获取当前consumer
            Consumer consumer = consumerFuture.getNow(null);
            // 获取当前consumer的订阅信息
            Subscription subscription = consumer.getSubscription();
            // 获取当前consumer的messageid
            MessageIdData msgIdData = seek.getMessageId();

            // 根据messageID中的ledgerID和EntryID获取当前message在bk中的具体位置
            Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());


            // 根据获取到的位置信息,将cousor重置到指定的位置
            subscription.resetCursor(position).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
                        subscription.getTopic().getName(), subscription.getName(), position);
                ctx.writeAndFlush(Commands.newSuccess(requestId));
            }).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex);
                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
                        "Error when resetting subscription: " + ex.getCause().getMessage()));
                return null;
            });
        // 6-2. 如果是按照publishTime执行seek,执行下面的分支
        } else if (consumerCreated && seek.hasMessagePublishTime()){
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            long timestamp = seek.getMessagePublishTime();

            subscription.resetCursor(timestamp).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to publish time {}", remoteAddress,
                        subscription.getTopic().getName(), subscription.getName(), timestamp);
                ctx.writeAndFlush(Commands.newSuccess(requestId));
            }).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex);
                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
                        "Reset subscription to publish time error: " + ex.getCause().getMessage()));
                return null;
            });
        // 6-3. 其它的情况出错返回
        } else {
            ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer not found"));
        }
    }

resetCursor 实现:

private void resetCursor(Position finalPosition, CompletableFuture<Void> future) {
        // 1. 确保当前操作是原子的操作,如果进入resetCousor时为false的话,将该标记为置为true,表明当前有人在操作cousor。
        if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
            future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription"));
            return;
        }

        // 2. broker需要断开与所有consumer的连接,这个也好理解,如果我们没有将与consumer的连接信息断开的话,在操作cousor时,consumer还可能receive到消息。这个行为是不可控制的。当broker断开连接之后,client会去触发重连的逻辑。具体代码实现在grabcnx()中。
        final CompletableFuture<Void> disconnectFuture;
        if (dispatcher != null && dispatcher.isConsumerConnected()) {
            disconnectFuture = dispatcher.disconnectAllConsumers();
        } else {
            disconnectFuture = CompletableFuture.completedFuture(null);
        }

        disconnectFuture.whenComplete((aVoid, throwable) -> {
            if (throwable != null) {
                log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
                IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                future.completeExceptionally(
                        new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                return;
            }
            log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
                    topicName, subName);

            try {
                // 3. 调用asyncResetCursor重置cousor的位置,到这一步代码已经进入到bk的client层面,在pulsar中,cousor的信息是存储到bk中的,还记得messageID中的ledgerID和entryID吗?这两个字段就是告诉bk当前messgae的位置信息。
                cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() {
                    // 4. 无论成功与否,都需要将IS_FENCED_UPDATER的值置为false,表明当前这一次的reset操作已经结束
                    @Override
                    public void resetComplete(Object ctx) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
                                    finalPosition);
                        }
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                        future.complete(null);
                    }

                    @Override
                    public void resetFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName,
                                finalPosition, exception);
                        IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                        // todo - retry on InvalidCursorPositionException
                        // or should we just ask user to retry one more time?
                        if (exception instanceof InvalidCursorPositionException) {
                            future.completeExceptionally(new SubscriptionInvalidCursorPosition(exception.getMessage()));
                        } else if (exception instanceof ConcurrentFindCursorPositionException) {
                            future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
                        } else {
                            future.completeExceptionally(new BrokerServiceException(exception));
                        }
                    }
                });
            } catch (Exception e) {
                log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
                IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                future.completeExceptionally(new BrokerServiceException(e));
            }
        });
    }

asyncResetCursor:

public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback callback) {
        // 1. 判断传入进来的 newPos 参数是否正确
        checkArgument(newPos instanceof PositionImpl);
        // 2. 将 Position 的newPos强转为PositionImpl的newPosition
        final PositionImpl newPosition = (PositionImpl) newPos;

        // order trim and reset operations on a ledger
        ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
            // 3. 判断position是否有效
            // 3-1: newPosition是否有效
            // 3-2: 是否为earliest
            // 3-3: 是否为latest
            if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest)
                    || newPosition.equals(PositionImpl.latest)) {
                // 4. 如果有效,调用internalResetCursor
                internalResetCursor(newPosition, callback);
            } else {
                // caller (replay) should handle this error and retry cursor reset
                callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(newPosition.toString()),
                        newPosition);
            }
        }));
    }

internalResetCursor:

protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
        // 1. 如果为earliest或者latest的position,直接将position重置为指定值
        if (position.equals(PositionImpl.earliest)) {
            position = ledger.getFirstPosition();
        } else if (position.equals(PositionImpl.latest)) {
            position = ledger.getLastPosition().getNext();
        }

        log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);

        // 2. 确保此次resetCursor的操作是原子的
        synchronized (pendingMarkDeleteOps) {
            if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
                log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
                        ledger.getName(), position, name);
                resetCursorCallback.resetFailed(
                        new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
                        position);
            }
        }

        final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;

        // 3. 复制一个position的副本为newPosition
        final PositionImpl newPosition = position;

        VoidCallback finalCallback = new VoidCallback() {
            @Override
            public void operationComplete() {

                // modify mark delete and read position since we are able to persist new position for cursor
                lock.writeLock().lock();
                try {
                    // 4. 根据newPosition的位置获取前一个位置
                    PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);

                    // 5. 计数器
                    if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
                        messagesConsumedCounter -= getNumberOfEntries(
                                Range.closedOpen(newMarkDeletePosition, markDeletePosition));
                    } else {
                        messagesConsumedCounter += getNumberOfEntries(
                                Range.closedOpen(markDeletePosition, newMarkDeletePosition));
                    }
                    // 6. 将newMarkDeletePosition赋值给markDeletePosition,markDeletePosition表示当前需要删除的消息的位置。
                    markDeletePosition = newMarkDeletePosition;
                    
                    lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
                            null, null);
                    individualDeletedMessages.clear();

                    PositionImpl oldReadPosition = readPosition;
                    if (oldReadPosition.compareTo(newPosition) >= 0) {
                        log.info("[{}] reset position to {} before current read position {} on cursor {}",
                                ledger.getName(), newPosition, oldReadPosition, name);
                    } else {
                        log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
                                ledger.getName(), newPosition, oldReadPosition, name);
                    }
                    // 7. readPosition表示consumer接收数据时会从哪里开始读取数据,这个也是resetCursor的关键步骤,将readPosition到的位置重置为要被seek到的位置。
                    readPosition = newPosition;
                } finally {
                    lock.writeLock().unlock();
                }
    }

综上所述,一个简单的resetCursor的操作,牵扯到了这么多的逻辑。下面我们用一张图来说明其调用关系。

seek.png

以上部分为当broker接收到一个seek请求时会执行哪些操作,下面我们来看用户调用seek接口之后,client时如何将请求传到broker上的。

client

seek 接口是在 Consumer 这个接口中暴漏给用户的,有如下两种使用形式:

  • void seek(MessageId messageId) throws PulsarClientException;
    • seek by messageID
  • void seek(long timestamp) throws PulsarClientException;
    • seek by publishTime

client 的实现相对简单,就是构建好seek请求所需要的数据,将相应的数据发送给broker即可,具体如下:

public CompletableFuture<Void> seekAsync(MessageId messageId) {
        // 检查当前 client 的连接状态是否正常
        if (getState() == State.Closing || getState() == State.Closed) {
            return FutureUtil
                    .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
        }

        // 检查连接信息是否正常
        if (!isConnected()) {
            return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
        }

        final CompletableFuture<Void> seekFuture = new CompletableFuture<>();

        // 初始化当前的 requestID
        long requestId = client.newRequestId();
        // 将用户传入进来的 messageID 强转为MessageIdImpl 类型
        MessageIdImpl msgId = (MessageIdImpl) messageId;
        // newSeek 根据上述提供的信息,将要发送的seek请求的数据准备好
        ByteBuf seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId());
        // 获取client连接信息
        ClientCnx cnx = cnx();

        log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);

        // 将seek请求发送出去,并携带此次请求的requestID
        cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
            log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
            // acknowledgmentsGroupingTracker主要是用来解决单一ack的问题,之前每次接收一条消息都会进行一次ack操作,现在将多个ack请求放到一个group中,定时来进行ack操作,节省资源,提高效率。
            // 在 consumerBuilder 中提供了acknowledgmentGroupTime() 接口,允许用户自己设定group ack中触发的时间,默认情况是:100ms
            // 需要注意的是,如果将该时间设置为 0 ,将会立即发送确认。
            acknowledgmentsGroupingTracker.flushAndClean();
            lastDequeuedMessage = messageId;
            // 清空每一个consumer的message queue
            incomingMessages.clear();
            seekFuture.complete(null);
        }).exceptionally(e -> {
            log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
            seekFuture.completeExceptionally(e.getCause());
            return null;
        });
        return seekFuture;
    }

publishTime 与此同理,在此不做赘述。差不多以上就是 seek 操作的全部流程。

补充说明:当 client 端发送 seek 指令到达 broker 时,broker 会关闭与当前连接的所有 consumer,总之一条原则,不能让原先的 consumer 接着去消费数据,因为 seek 会改变 consumer cursor 的位置信息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353