7.Kafka源码深入解析之发送消息01

上一章节我们重点分析了获取消息的分区号,这样我们才知道消息到底要发送到哪个broker上,对吧,同时我们看到后面把消息封装到一个TopicPartition对象

 int partition = partition(record, serializedKey, serializedValue, cluster);
            //把消息包装成一个topicPartition格式对象
 tp = new TopicPartition(record.topic(), partition);

接下来,我们想一下,已经把消息封装成了每一个topicPartition对象了,那么如果这条数据非常大怎么办,是不是要check一下,没错,接下来看源码:

setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            //计算消息的大小,后面要进行验证
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            /**
             * 检测请求request的大小是否超过了配置的最大请求大小
             * 以及缓存区最大的大小
             */
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback

上面主要是对序列化的key,value,计算它的大小,在 ensureValidRecordSize(serializedSize)里进行验证:

private void ensureValidRecordSize(int size) {
        //默认maxRequestSize 为 max.request.size = 1M
        //这里在生产上一般要调整,比如10M
        if (size > this.maxRequestSize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the maximum request size you have configured with the " +
                    ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                    " configuration.");
        //默认buffer 为buffer.memory = 32M
        if (size > this.totalMemorySize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                    ProducerConfig.BUFFER_MEMORY_CONFIG +
                    " configuration.");
    }
            /**
             * 设定好的拦截器,与callback,
             * 给每条消息绑定一个回调 函数,因为我们使用的是异步的方式发送消息的
             */
            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

上面两个验证,不就是对大小的判断吗,注解里也分析把这两个参数名,默认值说明很清楚了。接下来才是我们本节的重点:

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);

把消息append加入到缓存区里,这里我们看到这个accumulator,不就是前面我们在初始化KafkaProducer的时候同时初始化出来的吗,当时我们也做了详细的详解,点击append()方法进去看源码:

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        /**
         *  当前有多少个线程
         */
        appendsInProgress.incrementAndGet();
               ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
...
}

此方法实现稍复杂,代码量大,这里我们还是分片分析,上面我们看到appendsInProgress.incrementAndGet()这个其实就是计算了一下当前有多少个线程进入此方法,我们接下来看:

             //步骤一:先根据分区获取该消息所属的队列中,
            //       如果有已经存在的队列,那么我就使用存在的队列
            //       如果不存在,我们就新创建一个队列
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);

点击getOrCreateDeque(tp)进去,我们详细看一下实现:

 private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
        //直接从batches里面获取当前分区对应的存储队列
        Deque<ProducerBatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        //当第一次进来的时候,一定是不存在队列的
        d = new ArrayDeque<>();
        //把空的队列存入到batchs这个map里面去
        //putIfAbsent 这个方法,当添加键值对,如果Map里没有这个key对应的值,就直接添加,并返回null
        //如果已经存在对应的值,就依旧是原来的值,不做覆盖
        Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
        if (previous == null)
            return d;
        else
            //如果根据key去找value不为空
            return previous;
    }

这个方法注解我写的很清楚了,这里之所以要贴出来和大家分享有两个原因,原因一,这个方法代码写的太好了,我们做为程序员,编写代码一定要学习世界大佬们的写法。
原因二,是为了引出batches这个属性,大家看一下这个属性的定义

 //每个分区的数据结构,key:TopicPartition,value:Deque<ProductBatch>
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
//在RecordAccumulator构造类中:
this.batches = new CopyOnWriteMap<>();

我去,这是什么?CopyOnWriteMap大家看到了吧,点击这个类,发现这是kafka封装ConcurrentMap的一种类型:

public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
...
/**
     * 这里实现逻辑:
     *  1.先根据原map创建一个相同大小的副本copy
     *  2.把数据放在copy里
     *  3.把copy在赋值给map
     */
/* 1. 整个方法是使用了synchronized来修饰,它是线程安全的
     *    虽然加了锁,但性能依然很好,因为这里面全是纯内存计算
     * 2. 读数据与写数据的流程分离了,这里采用了读写分离的设计思想,读与写操作是互不影响的
     *    所以我们在读数据是线程安全的
     * 3. 最后把最新的数据值赋值给了map,map是使用了volatile去修饰的,
     *    说明这个map是有线程本地可见性的,如果map发生了变化,那么进行get操作的时候,是可以感知到的
     */
    @Override
    public synchronized V put(K k, V v) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.put(k, v);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }
}

为什么要有这样的一种类型,它的作用是什么呢?这里我们着重分析一下,CopyOnWriteMap这类最适合是读多写少的场景,每次更新的时候,都是一个copy一个副本,在副本里来更新,接着副本同到更新原来的值
好处:就是在于说写和读的操作互相 之间不会有长时间的锁互斥,写的时候不会阻塞读
坏处在于对内存的占用很大,适合是读多写少的场景
这里,我们是不是有大量的操作在读取,是不是我们每条消息都要根据topicPartition去找它对应的队列,就是去大量的从map读取一个分区的Deque,但是写的话,是不是就是一个分区写一次。
最后高并发频繁更新的就是分区对应的Dqueue,这个修改基于快照,适合CopyOnWrite
看到这里,大家是不是有一种仰望长叹的感觉,大佬们的思想我们不懂。这也是我们之后在开发中可以学习的地方。
发消息这部分章节较长,本章节主要是从最开始发消息,消息验证,以及copyOnWrite这个类型进行讲解。下章节我们继续!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容