Kafka源码分析-序列5 -Producer -RecordAccumulator队列分析

在Kafka源码分析-序列2中,我们提到了整个Producer client的架构图,如下所示:


1.png

其它几个组件我们在前面都讲过了,今天讲述最后一个组件RecordAccumulator.

Batch发送

在以前的kafka client中,每条消息称为 “Message”,而在Java版client中,称之为”Record”,同时又因为有批量发送累积功能,所以称之为RecordAccumulator.

RecordAccumulator最大的一个特性就是batch消息,扔到队列中的多个消息,可能组成一个RecordBatch,然后由Sender一次性发送出去。

每个TopicPartition一个队列

下面是RecordAccumulator的内部结构,可以看到,每个TopicPartition对应一个消息队列,只有同一个TopicPartition的消息,才可能被batch。

public final class RecordAccumulator {  
    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;  
  
   ...  
}

batch的策略

那什么时候,消息会被batch,什么时候不会呢?下面从KafkaProducer的send方法看起:

//KafkaProducer  
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {  
        try {  
            // first make sure the metadata for the topic is available  
            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);  
  
            ...  
  
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);   //核心函数:把消息放入队列  
  
            if (result.batchIsFull || result.newBatchCreated) {  
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);  
                this.sender.wakeup();  
            }  
            return result.future;

从上面代码可以看到,batch逻辑,都在accumulator.append函数里面:

public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException {  
    appendsInProgress.incrementAndGet();  
    try {  
        if (closed)  
            throw new IllegalStateException("Cannot send after the producer is closed.");  
        Deque<RecordBatch> dq = dequeFor(tp);  //找到该topicPartiton对应的消息队列  
        synchronized (dq) {  
            RecordBatch last = dq.peekLast(); //拿出队列的最后1个元素  
            if (last != null) {    
                FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds()); //最后一个元素, 即RecordBatch不为空,把该Record加入该RecordBatch  
                if (future != null)  
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);  
            }  
        }  
  
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));  
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());  
        ByteBuffer buffer = free.allocate(size, maxTimeToBlock);  
        synchronized (dq) {  
            // Need to check if producer is closed again after grabbing the dequeue lock.  
            if (closed)  
                throw new IllegalStateException("Cannot send after the producer is closed.");  
            RecordBatch last = dq.peekLast();  
            if (last != null) {  
                FutureRecordMetadata future = last.tryAppend(key, value, callback, time.milliseconds());  
                if (future != null) {  
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...  
                    free.deallocate(buffer);  
                    return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);  
                }  
            }  
  
            //队列里面没有RecordBatch,建一个新的,然后把Record放进去  
            MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);  
            RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());  
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, callback, time.milliseconds()));  
  
            dq.addLast(batch);  
            incomplete.add(batch);  
            return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);  
        }  
    } finally {  
        appendsInProgress.decrementAndGet();  
    }  
}  
  
private Deque<RecordBatch> dequeFor(TopicPartition tp) {  
    Deque<RecordBatch> d = this.batches.get(tp);  
    if (d != null)  
        return d;  
    d = new ArrayDeque<>();  
    Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);  
    if (previous == null)  
        return d;  
    else  
        return previous;  
}

从上面代码我们可以看出Batch的策略:

  1. 如果是同步发送,每次去队列取,RecordBatch都会为空。这个时候,消息就不会batch,一个Record形成一个RecordBatch
  2. Producer 入队速率 < Sender出队速率 && lingerMs = 0 ,消息也不会被batch
  3. Producer 入队速率 > Sender出对速率, 消息会被batch
  4. lingerMs > 0,这个时候Sender会等待,直到lingerMs > 0 或者 队列满了,或者超过了一个RecordBatch的最大值,就会发送。这个逻辑在RecordAccumulator的ready函数里面。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {  
    Set<Node> readyNodes = new HashSet<Node>();  
    long nextReadyCheckDelayMs = Long.MAX_VALUE;  
    boolean unknownLeadersExist = false;  
  
    boolean exhausted = this.free.queued() > 0;  
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {  
        TopicPartition part = entry.getKey();  
        Deque<RecordBatch> deque = entry.getValue();  
  
        Node leader = cluster.leaderFor(part);  
        if (leader == null) {  
            unknownLeadersExist = true;  
        } else if (!readyNodes.contains(leader)) {  
            synchronized (deque) {  
                RecordBatch batch = deque.peekFirst();  
                if (batch != null) {  
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;  
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;  
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;  
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);  
                    boolean full = deque.size() > 1 || batch.records.isFull();  
                    boolean expired = waitedTimeMs >= timeToWaitMs;  
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();  //关键的一句话  
                    if (sendable && !backingOff) {  
                        readyNodes.add(leader);  
                    } else {  
  
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);  
                    }  
                }  
            }  
        }  
    }  
  
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);  
}

为什么是Deque?

在上面我们看到,消息队列用的是一个“双端队列“,而不是普通的队列。
一端生产,一端消费,用一个普通的队列不就可以吗,为什么要“双端“呢?

这其实是为了处理“发送失败,重试“的问题:当消息发送失败,要重发的时候,需要把消息优先放入队列头部重新发送,这就需要用到双端队列,在头部,而不是尾部加入。

当然,即使如此,该消息发出去的顺序,还是和Producer放进去的顺序不一致了。

欢迎加入QQ群:104286694

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,816评论 8 167
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,426评论 0 34
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,299评论 1 15
  • 转帖:原文地址http://www.infoq.com/cn/articles/depth-interpretat...
    端木轩阅读 2,315评论 0 19