ReplicaManager源码解析1-消息同步线程管理

  • 现在的Kafka增加了高可用的特性,即增加了复本的特性,同时必然会引入选主,同步等复杂性;
  • ReplicaManager负责消息的写入,消费在多复本间同步, 节点成为主或从的转换等等相关的操作;
  • 这篇我们先集中介绍下ReplicaManager里用到的各种基础类库

OffsetCheckPoint类
  • 文件:/core/src/main/scala/kafka/server/OffsetCheckPoint.scala
  • 在kafka的log dir目录下有一文件:replication-offset-checkpoint, 以Topic+Partition为key, 记录其高水位的值。那么何为高水位?简单说就是已经复制到所有replica的最后提交的offset, 即所有ISR中的logEndOffset的最小值与leader的目前的高水位值的之间的大者.
  • replication-offset-checkpoint文件结构很简单:
    第一行:版本号,当前是0
    第二行:当前写入的Topic+Partition的记录个数
    其他每行: topic 空格 partition 空格 offset
  • OffsetCheckPoint类实现了对这个文件的读写
    每次写入的时修会先写到 replication-offset-checkpoint.tmp 的临时文件,读入后再作rename操作;
  • recovery-point-offset-checkpoint文件格式与replication-offset-checkpointg一样,同样使用OffsetCheckPoint类来读写.
AbstractFetcherManager类
  • 文件:/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
  • 是个基类, 用于管理当前broker上的所有从leader的数据同步;
  • 主要成员变量:private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread], 实现的拉取消息由AbstractFetcherThread来负责, 每个brokerId+fetcherId对应一个AbstractFetcherThread;
  • addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]): 创建并开始消息同步线程;
    其中最主要的操作是调用AbstractFetcherThreadaddPartitions方法来告诉同步线程具体需要同步哪些partition;
  • def removeFetcherForPartitions(partitions: Set[TopicAndPartition]): 移出对某些partition的同步;
  • def shutdownIdleFetcherThreads(): 如果某些同步线程负责同步的partition数量为0,则停掉该线程;
  • def closeAllFetchers(): 停掉所有的同步线程
  • def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread: 抽象方法, 由子类实现, 用于创建具体的同步线程;
ReplicaFetcherManager类
  • 文件:/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
  • 继承自AbstractFetcherManager类
  • 仅实现了def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread: 创建了ReplicaFetcherThread
AbstractFetcherThread类
  • 文件: /core/src/main/scala/kafka/server/AbstractFetcherThread.scala
  • 本身是个抽象基类, 实现了从partition的leader来同步数据的具体操作;
  • def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]): 添加需要同步的partition信息, 包换topic, partition和初始开始同步的offset;
    如果提供的初始offset无效, 则通过handleOffsetOutOfRange(topicAndPartition)方法来获取有效的初始offset, 这个方法的说明参见下面ReplicaFetcherThread类的分析;
  • def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long): 延迟同步某些partition, 通过DelayItem来实现;
  • def removePartitions(topicAndPartitions: Set[TopicAndPartition]): 移除某些partition的同步;
  • 此线程的执行体:
override def doWork() {
     val fetchRequest = inLock(partitionMapLock) {
      val fetchRequest = buildFetchRequest(partitionMap)
      if (fetchRequest.isEmpty) {
        trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
        partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      }
      fetchRequest
    }

    if (!fetchRequest.isEmpty)
      processFetchRequest(fetchRequest)
  }

基本上就是作三件事: 构造FetchRequest, 同步发送FetchRequest并接收FetchResponse, 处理FetchResponse, 这三件事的实现调用了下列方法:

def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)

  // handle a partition whose offset is out of range and return a new fetch offset
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long

  // deal with partitions with errors, potentially due to leadership changes
  def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])

  protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ

  protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]

它们都是在具体的子类中实现, 我们在下面的 ReplicaFetcherThread类 中作说明.

ReplicaFetcherThread类
  • 文件: /core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
  • handleOffsetOutOfRange:处理从leader同步数据时,当前replica的的初始offset为-1的情况
  1. earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId) 先通过向Leader发送OffsetRequest来获取leader当前的LogEndOffset;
  2. 如果Leader的LogEndOffset小于当前replica的logEndOffset, 这原则上不可能啊,除非是出现了Unclean leader election:即ISR里的broker都挂了,然后ISR之外的一个replica作了主;
  3. 如果broker的配置不允许Unclean leader election, 则Runtime.getRuntime.halt(1);
  4. 如果broker的配置允许Unclean leader election, 则当前replica本地的log要作truncate, truncate到Leader的LogEndOffset;
  5. 如果Leader的LogEndOffset大于当前replica的logEndOffset, 说明Leader有有效的数据供当前的replica来同步,那么剩下的问题就是看从哪里开始同步了;
  6. earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId) 通过向Leader发送OffsetRequest来获取leader当前有效的最旧Offset: StartOffset;
  7. 作一次truncate, 从startOffset开始追加:replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
  • def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest: 构造FetchRequest
  val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData]

    partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) =>
      if (partitionFetchState.isActive)
        requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)
    }

    new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava))

这个没什么好说的,就是按照FetchRequest的协议来;

  • def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]: 发送FetchRequest请求,同步等待FetchResponse的返回
    val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying)
    new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) =>
      TopicAndPartition(key.topic, key.partition) -> new PartitionData(value)
    }

使用NetworkClient来实现到leader broker的连接,请求的发送和接收,
使用kafka.utils.NetworkClientBlockingOps._实现了这个网络操作的同步阻塞方式.
这个实现可参见KafkaController分析2-NetworkClient分析

  • def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData): 处理拉取过来的消息
  try {
      val TopicAndPartition(topic, partitionId) = topicAndPartition
      val replica = replicaMgr.getReplica(topic, partitionId).get
      val messageSet = partitionData.toByteBufferMessageSet
      warnIfMessageOversized(messageSet)

      if (fetchOffset != replica.logEndOffset.messageOffset)
        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
      replica.log.get.append(messageSet, assignOffsets = false)
      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
      replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
      trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
            .format(replica.brokerId, topic, partitionId, followerHighWatermark))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error while replicating data.", e)
        Runtime.getRuntime.halt(1)
    }

干三件事:

  1. 消息写入以相应的replica;
  2. 更新replica的highWatermark
  3. 如果有KafkaStorageException异常,就退出啦~~
  • def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]): 对于在同步过程中发生错误的partition,会调用此方法处理:
delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)

目前的作法是将此partition的同步操作延迟一段时间.

Kafka源码分析-汇总

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容