跟我学Kafka源码Producer分析

我的原文博客地址是:http://flychao88.iteye.com/blog/2266611

本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和负载逻辑都在Producer中维护。

一、Kafka的总体结构图

(图片转发)

二、Producer源码分析

class Producer[K,V](val config: ProducerConfig,

private val eventHandler: EventHandler[K,V])  // only for unit testing

extends Logging {

private val hasShutdown = new AtomicBoolean(false)

//异步发送队列

private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)

private var sync: Boolean = true

//异步处理线程

private var producerSendThread: ProducerSendThread[K,V] = null

private val lock = new Object()

//根据从配置文件中载入的信息封装成ProducerConfig类

//判断发送类型是同步,还是异步,如果是异步则启动一个异步处理线程

config.producerType match {

case "sync" =>

case "async" =>

sync = false

producerSendThread =

new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,

queue,

ventHandler,

config.queueBufferingMaxMs,

config.batchNumMessages,

config.clientId)

producerSendThread.start()

}

private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)

KafkaMetricsReporter.startReporters(config.props)

AppInfo.registerInfo()

def this(config: ProducerConfig) =

this(config,

new DefaultEventHandler[K,V](config,

Utils.createObject[Partitioner](config.partitionerClass, config.props),

Utils.createObject[Encoder[V]](config.serializerClass, config.props),

Utils.createObject[Encoder[K]](config.keySerializerClass, config.props),

new ProducerPool(config)))

/**

* Sends the data, partitioned by key to the topic using either the

* synchronous or the asynchronous producer

* @param messages the producer data object that encapsulates the topic, key and message data

*/

def send(messages: KeyedMessage[K,V]*) {

lock synchronized {

if (hasShutdown.get)

throw new ProducerClosedException

recordStats(messages)

sync match {

case true => eventHandler.handle(messages)

case false => asyncSend(messages)

}

}

}

private def recordStats(messages: Seq[KeyedMessage[K,V]]) {

for (message <- messages) {

producerTopicStats.getProducerTopicStats(message.topic).messageRate.mark()

producerTopicStats.getProducerAllTopicsStats.messageRate.mark()

}

}

//异步发送流程

//将messages异步放到queue里面,等待异步线程获取

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {

for (message <- messages) {

val added = config.queueEnqueueTimeoutMs match {

case 0  =>

queue.offer(message)

case _  =>

try {

config.queueEnqueueTimeoutMs < 0 match {

case true =>

queue.put(message)

true

case _ =>

queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)

}

}

catch {

case e: InterruptedException =>

false

}

}

if(!added) {

producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()

producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()

throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)

}else {

trace("Added to send queue an event: " + message.toString)

trace("Remaining queue size: " + queue.remainingCapacity)

}

}

}

/**

* Close API to close the producer pool connections to all Kafka brokers. Also closes

* the zookeeper client connection if one exists

*/

def close() = {

lock synchronized {

val canShutdown = hasShutdown.compareAndSet(false, true)

if(canShutdown) {

info("Shutting down producer")

val startTime = System.nanoTime()

KafkaMetricsGroup.removeAllProducerMetrics(config.clientId)

if (producerSendThread != null)

producerSendThread.shutdown

eventHandler.close

info("Producer shutdown completed in " + (System.nanoTime() - startTime) / 1000000 + " ms")

}

}

}

}

说明:

上面这段代码很多方法我加了中文注释,首先要初始化一系列参数,比如异步消息队列queue,是否是同步sync,异步同步数据线程ProducerSendThread,其实重点就是ProducerSendThread这个类,从队列中取出数据并让kafka.producer.EventHandler将消息发送到broker。这个代码量不多,但是包含了很多内容,通过config.producerType判断是同步发送还是异步发送,每一种发送方式都有相关类支持,下面我们将重点介绍这二种类型。

1、同步发送

private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {

//分区并且整理方法

val partitionedDataOpt = partitionAndCollate(messages)

partitionedDataOpt match {

case Some(partitionedData) =>

val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]

try {

for ((brokerid, messagesPerBrokerMap) <- partitionedData) {

if (logger.isTraceEnabled)

messagesPerBrokerMap.foreach(partitionAndEvent =>

trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))

val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)

val failedTopicPartitions = send(brokerid, messageSetPerBroker)

failedTopicPartitions.foreach(topicPartition => {

messagesPerBrokerMap.get(topicPartition) match {

case Some(data) => failedProduceRequests.appendAll(data)

case None => // nothing

}

})

}

} catch {

case t: Throwable => error("Failed to send messages", t)

}

failedProduceRequests

case None => // all produce requests failed

messages

}

}

说明:

这个方法主要说了二个重要信息,一个是partitionAndCollate,这个方法主要获取topic、partition和broker的,这个方法很重要,下面会进行分析。另一个重要的方法是groupMessageToSet是要对所发送数据进行压缩  设置。

在我们了解的partitionAndCollate方法之前先来了解一下如下类结构:

TopicMetadata -->PartitionMetadata

case class PartitionMetadata(partitionId: Int,

val leader: Option[Broker],

replicas: Seq[Broker],

isr: Seq[Broker] = Seq.empty,

errorCode: Short = ErrorMapping.NoError)

也就是说,Topic元数据包括了partition元数据,partition元数据中包括了partitionId,leader(leader partition在哪个broker中,备份partition在哪些broker中,以及isr有哪些等等。

def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {

val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]

try {

for (message <- messages) {

//获取Topic的partition列表

val topicPartitionsList = getPartitionListForTopic(message)

//根据hash算法得到消息应该发往哪个分区(partition)

val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)

val brokerPartition = topicPartitionsList(partitionIndex)

// postpone the failure until the send operation, so that requests for other brokers are handled correctly

val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null

ret.get(leaderBrokerId) match {

case Some(element) =>

dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]

case None =>

dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]

ret.put(leaderBrokerId, dataPerBroker)

}

val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)

var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null

dataPerBroker.get(topicAndPartition) match {

case Some(element) =>

dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]

case None =>

dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]

dataPerBroker.put(topicAndPartition, dataPerTopicPartition)

}

dataPerTopicPartition.append(message)

}

Some(ret)

}catch {    // Swallow recoverable exceptions and return None so that they can be retried.

case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None

case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None

case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None

}

}

说明:

调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。

partitionAndCollate这个方法的主要作用是:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),创建一个HashMap>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作。

我们进入getPartitionListForTopic这个方法看一下,这个方法主要是干什么的。

private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {

val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)

debug("Broker partitions registered for topic: %s are %s"

.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))

val totalNumPartitions = topicPartitionsList.length

if(totalNumPartitions == 0)

throw new NoBrokersForPartitionException("Partition key = " + m.key)

topicPartitionsList

}

说明:这个方法看上去没什么,主要是getBrokerPartitionInfo这个方法,其中KeyedMessage这个就是我们要发送的消息,返回值是Seq[PartitionAndLeader]。

def getBrokerPartitionInfo(topic: String, correlationId: Int): Seq[PartitionAndLeader] = {

debug("Getting broker partition info for topic %s".format(topic))

// check if the cache has metadata for this topic

val topicMetadata = topicPartitionInfo.get(topic)

val metadata: TopicMetadata =

topicMetadata match {

case Some(m) => m

case None =>

// refresh the topic metadata cache

updateInfo(Set(topic), correlationId)

val topicMetadata = topicPartitionInfo.get(topic)

topicMetadata match {

case Some(m) => m

case None => throw new KafkaException("Failed to fetch topic metadata for topic: " + topic)

}

}

val partitionMetadata = metadata.partitionsMetadata

if(partitionMetadata.size == 0) {

if(metadata.errorCode != ErrorMapping.NoError) {

throw new KafkaException(ErrorMapping.exceptionFor(metadata.errorCode))

} else {

throw new KafkaException("Topic metadata %s has empty partition metadata and no error code".format(metadata))

}

}

partitionMetadata.map { m =>

m.leader match {

case Some(leader) =>

debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, leader.id))

new PartitionAndLeader(topic, m.partitionId, Some(leader.id))

case None =>

debug("Partition [%s,%d] does not have a leader yet".format(topic, m.partitionId))

new PartitionAndLeader(topic, m.partitionId, None)

}

}.sortWith((s, t) => s.partitionId < t.partitionId)

}

说明:

这个方法很重要,首先看一下topicPartitionInfo这个对象,这个一个HashMap结构:HashMap[String, TopicMetadata] key是topic名称,value是topic元数据。

通过这个hash结构获取topic元数据,做match匹配,如果有数据(Some(m))则赋值给metadata,如果没有,也就是None的时候,则通过nio远程连到服务端更新topic信息。

请看如下流程图:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容