8.Kafka源码深入解析之发送消息02

上一章节我们在org.apache.kafka.clients.producer.internals.RecordAccumulator#append最开始简单分析了一下发送消息的准备,其中重要的工作就是通过topicPartition找相关的队列,如果没有找到就新建一个新队列。如图:

image.png

本节接着向下看:
步骤二:尝试向队列里面的批次添加消息

//这里加锁
            //假如有3个线程进来了,都是需要把数据写往同一个分区中相同的队列中,那么就要加锁,线程安全
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                /**
                 * 就是尝试将消息写入队列最近一个batch中,但实际上我们现在的Deque是空的
                 * 里面是没有batch的,在队列空的batcdh为空的情况下,源码是如何运行的,如果是存在的
                               
                //目前还没有分配到内存,所以添加的数据会失败
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

尝试向队列里面的批次添加消息tryAppend,这里说明一下,目前我们只是有了队列,数据是存在批次当中的,接着对象是要分配内存的,这里我们详细看一下tryAppend方法:

 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        //获取到队列中最后一个批次
        ProducerBatch last = deque.peekLast();
        //第一次进入,last为空,直接返回null
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        //第一次进来,直接返回Null
        return null;
    }

可以想一下,当我们如果是第一次来这里,上节我们只是新建了一个队列Deque,里面并没有数据,所以当执行ProducerBatch last = deque.peekLast()一定是null,直接返回,那么继续看代码
步骤三:计算一个批次的大小

int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
  

这里大家是不是有一个疑问,为什么要计算批次的大小了,我们知道一个批次默认大小16k,万一我们请求的数据大小超过16k怎么办,这是因为只有知道了数据的大小,我们才可以去申请内存的大小呀
步骤四:根据批次大小分配内存

buffer = free.allocate(size, maxTimeToBlock);

申请分配内存allocate这个方法还是很复杂的,这里我们简单描述下,后面详细分析,如上面的图,我们知道消息累加器,就是我们常常说的RecordAccumulator,默认最大内存32M,那么会分两部分,一个是存放Deque的BufferPool,就是内存池,另一个是非内存池可用空间
接下来继续
步骤五:再次尝试 把数据写入到批次中

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

代码到这里,依然还是失败,目前只有内存,也有了队列,目前还没有创建批次了,还没有把批次放入内存里了
步骤六:根据内存大小封装新建了批次ProducerBatch

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                //在这里,尝试往这个批次写数据,到了这里,就会写入成功batch.tryAppend
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

步骤七:把这个批次放入一这个队列的尾部

 dq.addLast(batch);
 incomplete.add(batch);

  //释放内存
 free.deallocate(buffer);
appendsInProgress.decrementAndGet();

到这里,我们成功把消息写入到了缓冲区,那么前几节的流程我们可以用下图说明一下,图是大佬们绘图好的,我感觉非常好,这里使用一下:大家看一下,我们是不是已经走到第4步了


image.png

其实这一节,我们已经把主线程的工作分析完了,下一节我们开始Sender线程是如何拉取数据,转化成ClientRequest来发磅到Broker端的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352