Apache Pulsar 技术系列-Batch子消息确认机制

在 Pulsar 中,Producer 生产的消息进入到队列后,由 Consumer 进行消费,Consumer 使用订阅来记录消费的位置,在消费完消息后,就需要对已消费的消息进行确认,以更新订阅的位置,接着再不断消费不断订阅。但是对于Batch的消息不能简单的像Individual message去确认,因为在一个Batch内部可能会存在多条子消息,对于一个子消息的确认,会导致对该batch消息的整体确认,但其他子消息可能还未进行消费。因此对于Batch中的子消息的确认,需要一些额外的功能进行处理。

MessageIdData

MessageIdData 的 proto 定义如下:

message MessageIdData {

    required uint64 ledgerId = 1;

    required uint64 entryId  = 2;

    optional int32 partition = 3 [default = -1];

    optional int32 batch_index = 4 [default = -1];

    repeated int64 ack_set = 5;

    optional int32 batch_size = 6;

}

ledgerId和entryId:唯一标识一条消息,对于batch消息,就是唯一确认整个batch消息;

partition:代表该消息对应的分区;

batch_index:代表该子消息在batch中的索引编号;

ack_set:代表该batch消息中的子消息的确认情况,可以表示该batch消息中,还有子消息被确认了,还有哪些未被确认;

batch_size:代表该batch中的子消息数;

当consumer确认一条消息时,需要向Broker发送一个MessageIdData数据,通过ack_set告诉服务端该batch消息中的哪些子消息已经被确认,在Consumer client内部是通过ConcurrentBitSetRecyclable类来记录一条batch消息中的子消息确认情况的,也就是对应ack_set数据;当服务端收到MessageIdData数据后,对于一个batch消息对应的子消息确认信息,会用BitSetRecyclable类来进行记录存储,对应所有待确认的batch消息的子消息确认情况会放到一个Map结构中,该结构的定义如下:

private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;

这里的BitSetRecyclable类和ConcurrentBitSetRecyclable类都是从Java的BitSet类演变而来,其中ConcurrentBitSetRecyclable是继承自BitSet,只是相对于BitSet来说是线程安全的, 因此在了解pulsar是如何记录一个batch的子消息确认之前,需要先了解下java的BitSet类原理;

BitSet

java.util.BitSet是BitMap的实现,BitMap直译是位图,是一种数据结构。在理解BitSet之前需要先了解下BitMap是什么。

什么是BitMap

计算机中1 byte = 8 bit,一个比特(bit,称为比特或者位)可以表示1或者0两种值,通过一个比特去标记某个元素的值,因为采用了Bit作为底层存储数据的单位,所以可以极大地节省存储空间,以下图为例,我们可以定义一个byte数组,对应每一个byte我们有8个bit位,从而可以标识8个元素的值,这里有2个byte元素,因此一共可以标识16个元素值:


BitMap如果使用byte数组存储,当新添加元素的逻辑下标超过了初始化的byte数组的最大逻辑下标就必须进行扩容。为了尽可能减少扩容的次数,除了需要按实际情况定义初始化的底层存储结构,还应该选用能够"承载"更多比特的数据类型数组,因此在BitSet中底层的存储结构选用了long数组,一个long整数占64比特,位长是一个byte整数的8倍,在需要处理的数据范围比较大的场景下可以有效减少扩容的次数。因此在BitSet中,定义了一个long[] words数组,当要表示的元素个数超过64时,就会对该words数组进行扩容;

BitSet的常用方法

在Pulsar中,主要利用到了set和clear方法,所以这里主要介绍这两个方法。

set方法

set方法根据不同的传参,定义了几种不同的使用方式:

1. java.util.BitSet#set(int)

2. java.util.BitSet#set(int, boolean)

3. java.util.BitSet#set(int, int)

这里主要介绍java.util.BitSet#set(int, int)方法,该方法有两个传参,分别对应两个比特位的索引编号,比如:bitSet.set(0, 3),表示将下标为0到2的比特位上的值设置为1,这里的两个起始位置索引是左闭右开的。当然你也可以使用java.util.BitSet#set(int, boolean)方法将指定位置的比特位设置为0或1,但默认set方法是将对应的比特位设置为1,而java.util.BitSet#set(int)方法是将指定的某一个比特位设置为1;

clear方法

该方法有两个接口定义:

1.java.util.BitSet#clear(int, int)

2.java.util.BitSet#clear(int)

该方法要求传入两个参数,分别是两个比特位的索引下标,表示将一个范围的bite位上的值设置为0,同样也是该范围也是左闭右开的,比如:bitSet.clear(0, 3),表示将比特索引为0到2的比特位设置为0;如果只需要将一个比特位设置为0,则可以使用java.util.BitSet#clear(int)接口;

Batch 消息确认流程

客户端处理流程

如果Consumer客户端收到一个batch消息,那么首先会为该消息创建一个BatchMessageAcker对象,用来记录该batch消息中的子消息的ack情况:

BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);

static BatchMessageAcker newAcker(int batchSize) {

    BitSet bitSet = new BitSet(batchSize);

    bitSet.set(0, batchSize);

    return new BatchMessageAcker(bitSet, batchSize);

}

可以看到在BatchMessageAcker对象创建时,会调用bitSet.set(0, batchSize)方法,将比特位索引范围从0到batchSize-1的比特位上的值全部设置为1(Pulsar用1表示未确认,0表示确认)。

Consumer client在收到服务端发送的一个batch消息后,会从中解析出一个个子消息,并放入incomingMessages队列中,当用户调用consumer.receive接口后,就可以获取到一条子消息,并在处理完成后,调用consumer.acknowledge对batch中的子消息进行确认。在consumer client内部首先会将该子消息对应的比特位设置为0,然后再判断该子消息对应的batch消息的所有子消息是否全部确认完毕,当全部确认完毕后,就会发送ack请求到broker端,当然如果客户端开启了配置batchIndexAckEnabled,那么即便没有全部ack完毕也会发送ack请求到服务端:

if (batchMessageId == null || batchMessageId.ackIndividual()) {

    consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1);

    consumer.getUnAckedMessageTracker().remove(msgId);

    if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {

        consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId);

    }

    return individualAckFunction.apply(msgId);

} else if (batchIndexAckEnabled) {

    return batchAckFunction.apply(batchMessageId);

}

这里的BatchMessageIdImpl#ackIndividual方法会调用BatchMessageAcker#ackIndividual方法:

public synchronized boolean ackIndividual(int batchIndex) {

    bitSet.clear(batchIndex);

    return bitSet.isEmpty();

}

可以看到,在该方法中,会将入的子消息索引batchIndex对应的比特位设置为0,表示该子消息已被确认,然后通过bitSet.isEmpty()来判断该batch消息中的子消息是否全部被ack完毕:

public boolean isEmpty() {

    return wordsInUse == 0;

}

当所有的比特位设置为0时,wordsInUse就等于0,wordsInUse代表了比特位设置为1的个数。

服务端处理流程

当Pulsar broker服务端收到客户端发送的ack请求后,会将每个batch消息对应的子消息确认信息记录到一个map结构中,如下:

private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;

这里的BitSetRecyclable也是拷贝自JDK的BitSet,和BitSet的原理一样。在batchDeletedIndexes结构中,key是一个batch消息对应的position位置,也唯一标识了一个batch消息,而value对应了该batch消息中,所有子消息的确认情况,我们可以从中知道还有多少子消息未被确认,以及确认了多少子消息。

Broker在收到ack请求后,如果发现这个ack确认的消息是一个batch消息,并且服务端也开启了batch级别的子消息配置时,会更新batchDeletedIndexes中对应消息的bitSet信息,更新完成后,会通过bitSet.isEmpty()方法,检查该batch中的子消息是否都已经全部确认完毕,如果所有子消息都已经被ack了,那么会将该batch消息从batchDeletedIndexes中移除,并放入单条确认的消息结构中,从而就转换成非batch消息的单条确认逻辑了:

if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {

    BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);

    BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> givenBitSet);

    if (givenBitSet != bitSet) {

        bitSet.and(givenBitSet);

        givenBitSet.recycle();

    }

    if (bitSet.isEmpty()) {

        PositionImpl previousPosition = ledger.getPreviousPosition(position);

        individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(),

            previousPosition.getEntryId(),

            position.getLedgerId(), position.getEntryId());

        MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);

        BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);

        if (bitSetRecyclable != null) {

            bitSetRecyclable.recycle();

        }

    }

}

相关配置

如果要开启batch中的子消息级别的ack确认机制,对应客户端配置是:batchIndexAckEnabled,比如:

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)

        .topic(topic)

        .subscriptionName("s1")

        .enableBatchIndexAcknowledgment(isEnableAckReceipt)

        .subscribe();

当然对于服务端也需要开启相应配置:

deletionAtBatchIndexLevelEnabled=true

结尾

关于Batch子消息级别的确认实现PIP可以参考:https://github.com/apache/pulsar/pull/6052

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容