2.Kafka源码深入解析之生产端初始化

上一节我们讲到了在KafkaProducer初始化的时候,初始化了三个组件:

  • 分区器Partitioner
  • 序列化器Serializer
  • 拦截器Interceptor
  1. 接下来我们要讲到第一个非常核心的组件:MetaData,我们想一下,当一条消息要写入broker里,是不是要先知道这条数据要写入哪个分区里,这个分区在哪个broker上,MetaData是用来从broker集群去拉取元数据的Topics(Topic -> Partitions(Leader+Followers,ISR))
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    true, true, clusterResourceListeners);

/**
* max.request.size:生产者往服务端发送消息的时候,规定一条消息最大是多大,默认是1m
*/
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
/**
             * recordAccumlator 缓存区大小:buffer.memory 默认值是32m
             */
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
/**
             * 默认不压缩
             * 可以用下面压缩:gzip,lax4,snappy
             * 提交吞吐量,但消耗cpu
             */
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

 /**
  * max.block.ms
  * 当buffer满了,或者metadata获取不到,或者化没完成分区函数完成
  * 等情况下的最大阻塞时间:默认是60s
  */
 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 /**
  * 当设置了transactionId,必须是idempotenceEnabled是开启的
  */
 this.transactionManager = configureTransactionState(config, logContext, log);
 /**
  * public static final String RETRIES_CONFIG = "retries";
  * 这个是发送失败后重启几次,默认是不重启
  * 如果配置了,就按配置的值重启
  * 如果没有配置,但开启了幂等,那么重启次数是Integer.Max
  */
 int retries = configureRetries(config, transactionManager != null, log);

 /**
  * max.in.flight.requests.per.connection
  * 这个参数的意思是说生产者会把同一分区的每条消息打包成一个batch
  * 每个request请求是把多个分区对应的多个batch打包成一个request发送给broker
  * 这个时候,最多有几个request都没收到响应,
  * 每个连接broker都有一个connection,每个connection最多有几个request没收到request
  * 这个参数默认值:5,就是说最多有5个request没到响应的请教放在requests集合里
  */
 int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
 /**
  *  0:不会重试,直接发布就不管了
  *  1:leader发送成功就表示发送成功
  *  -1/all: 所有的isr的副本发送成功,才表示发送成功
  */
 short acks = configureAcks(config, transactionManager != null, log);

 this.apiVersions = new ApiVersions();

这上面配置了几个非常重要的参数值,以及参数说明,用在哪些地方,默认值大家要记住

  • max.request.size
  • buffer.memory
  • max.block.ms
  • retries
  • request.timeout.ms
  • max.in.flight.requests.per.connection
  • acks
  • linger.ms

接下来我们看这个Metadata组件的作用:其实就是拉元数据过来,保存在本地缓存起来

/**
             * 核心行为:初始化的时候,直接调用Metadata组件的方法,去broker上
             * 拉取一次集群的元数据过来,后面每隔5分钟刷新一次元数据,但是发送消息的时候
             * ,如果没有找到某个topic的元数据,也一定会拉取一次的
             *
             * 实际上我们看了一下update方法,在kafakPrducer初始化的时候并没有真正的
             * 去拉取topic的元数据,但是他肯定是对集群元数据做了一个初始化的,
             * 把你配置的那些broker地址转化为了Node,放在Cluster对象实例化
             *
             * 在发送消息的时候,如果发现你要写入的某个Topic对应的元数据不在本地,那么他是不是肯定会通过这个组件,
             * 发送请求到broker尝试拉取这个topic对应的元数据,
             * 如果你在集群里增加了一台broker,也会涉及到元数据的变化

             * 这里的意思其实仅仅把我们配置的那个broker的地址放了进去,在客户端缓存
             * 集群元数据的时候,采用了哪些数据结构:
             * 我们看代码实际化了一个Cluster对象,里面有List<Node> nodes;
             * 每个node包括了node_id实际上是我们kafka配置文件里的brokerid,
             *    private final int id;
             *     private final String idString;
             *     private final String host;
             *     private final int port;
             *     private final String rack;
             *
             *
             */
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);

2. 接下来我们来看另一个核心组件:RecordAccumulator,从名字也可以看出来,就是一个消费缓存器,我们知道Kafka其实是批量发送消息到Broker的,为啥要批量发,而不是一条一条的发呢?很明显,减少网络请求,提高吞吐呀

/**
             * 核心组件:RecordAccumulator,缓冲区,负责消息的复杂的缓冲机制,
             * 发送到每个分区的消息会被打包成batch,
             * 一个broker上的多个分区对应的多个batch会被打包成一个request,batch size(16kb)
             * 一个request请求其实是多个batch打包后发送出去给broker的,
             * 每个request默认最大长度 :max.request.size = 1M
             *
             * linger.ms,默认没有时间
             * 默认情况下,如果光光是考虑batch的机制的话,那么必须要等到足够多的消息打包成一个batch,
             * 才能通过request发送到broker上去;
             * 但是有一个问题,如果你发送了一条消息,但是等了很久都没有达到一个batch大小
             * 所以说要设置一个linger.ms,如果在指定时间范围内,
             * 都没凑出来一个batch把这条消息发送出去,
             * 那么到了这个linger.ms指定的时间,比如说5ms,
             * 如果5ms还没凑出来一个batch,那么就必须立即把这个消息发送出去
             */
            /**
             *"batch.size";
             * 这个batch的意思是发送给broker中topic中的同一个分区的数据
             * 会打包成一个batch,batch默认大小:16k
             *
             * retries:配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。
             * retry.backoff.ms:设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100
             *
             * linger.ms:batch不管大小达没达到16k,在这个时间内都要发出去,默认是0,表示不延迟
             * 这样实际工作中,要设置的,这样就是批量发送
             */
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time,
                    apiVersions,
                    transactionManager);

3. 最后一个核心组件:网络通信的组件,NetworkClient,这个一看就是网络连接用的呀,哈哈,必须的,是生产端与broker通信用的组件,其实关于Kafka这块自己封装的通信非常有必要学一下,这是世界级工业代码,比任何人都写的好吧,完全可以拿出来放在我们的系统里嘛,这也是我们学习的一种方法

/**
             * 核心组件:网络通信的组件,NetworkClient,
             * connections.max.idle.ms,一个网络连接最多空闲多长时间(9分钟),
             *  max.in.flight.requests.per.connection 每个连接最多有几个request没收到响应(5个),
             * reconnect.backoff.ms: 重试连接的时间间隔(50ms),
             * send.buffer.bytes:  Socket发送缓冲区大小(128kb),
             * receive.buffer.bytes:   Socket接收缓冲区大小(32kb)
             */
            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                            this.metrics, time, "producer", channelBuilder, logContext),
                    this.metadata,
                    clientId,
                    maxInflightRequests,
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs,
                    time,
                    true,
                    apiVersions,
                    throttleTimeSensor,
                    logContext);

4.我们知道Kafka发消息默认是异步的,就是说主线程在生产消息,放在我们上面说的缓冲器里,另一个线程去拉消息到Broker,那么下面不是初始化这个sender线程,接着,我们可以看到newKafkaThread一个线程,名称叫kafka-producer-network-thread,同时把上面我们新建的sender线程放进去,这里我们可以看出一种方法,业务逻辑写在我们的线程类里,用另一个线程包装起来,这是线程类与业务逻辑分开的编程技巧,值得大家学习,工业级代码就是不一样,最后可以看到,线程启动,Kafka producer started!

/**
             * 核心组件:Sender线程,负责从缓冲区里获取消息发送到broker上去,
             * request最大大小(1mb),
             * acks(1,只要leader写入成功就认为成功),默认是1
             * 重试次数(0,无重试),这个在生产上是一定要设置的,可以重启次数为3
             * ,请求超时的时间(30s),
             */
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);

String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            /**
             * 实际上这里是新建一个thread,把sender放里面,
             * start启动线程
             *
             * 这里sender是一个runable线程
             * 线程类叫做“KafkaThread”,
             * 线程名字叫做“kafka-producer-network-thread”,此处线程直接被启动
             * 这里定义了一个kafkaThread,直接启动!
             */
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            this.errors = this.metrics.sensor("errors");
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }

到这里,我们讨论的KafkaProducer初始化工作,全部完成了,用了两小节详细解析了他在初始化时做的工作,是不是感觉背后做了好多事,这是我们写一行API无法知道的,本次解析完全是按源码每行代码非常详细的分析出来的结果,希望能给大家带来不一样的收获

下一篇,我们就进入生产者发送消息的解析章节

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容