01、Kafka日志段LogSegment源码分析

Kafka 日志对象由多个日志段对象组成,日志段是Kafka保存消息的最小载体,而每个日志段对象会在磁盘上创建一组文件,

  • 包括消息日志文件(.log)
  • 位移索引文件(.index)
  • 时间戳索引文件(.timeindex)
  • 已中止(Aborted)事务的索引文件(.txnindex)

本文对Kafka日志段进行详细说明,重点介绍Kafka日志段LogSegment的声明、append、read、recover方法。

下面我们看一下LogSegment的实现情况,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala

LogSegmen

LogSegment.scala这个文件里面定义了三个对象:

  • LogSegment class;
  • LogSegment object;
  • LogFlushStats object。LogFlushStats 结尾有个 Stats,它是做统计用的,主要负责为日志落盘进行计时。

说明:scala语法中,允许scala中包含同名的class 和object,这种用法称之为伴生(Companion),class 对象称之为伴生类,和java语法中的类是一样的。而 object是一个单例对象

里面包含静态方法和变量,用java比喻时,object相当于java的utils工具类

(1)日志段LogSegment 注释说明

class LogSegment private[log] (val log: FileRecords,
                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
                               val lazyTimeIndex: LazyIndex[TimeIndex],
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
  val time: Time) extends Logging { … }

lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分别对应于刚才所说的 3 个索引文件(位移索引、时间戳索引、已终止的事务索引)

字段解析和说明

  • baseoffset:每个日志段都保留了自身的起始位移。
  • indexIntervalBytes:即为Broker段参数,log.index.interval.bytes,控制了日志段对象新增索引项的频率。默认情况下日志段写入4KB的消息才会新增一条索引项目
  • rollJitterMs:是一个抖动值,如果没有这个干扰值,未来某个时刻可能同时创建多个日志段对象,将会大大的增加IO压力。

(2)Append方法

以下append源码实现即为写入消息的具体操作

   @nonthreadsafe
  def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    // 判断是否日志段是否为空
    if (records.sizeInBytes > 0) {
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      val physicalPosition = log.sizeInBytes()

      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)
      // 确保输入参数最大位移值是合法的
      ensureOffsetInRange(largestOffset)

      // append the messages
      // 执行真正的写入
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      // 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
      }
      // append an entry to the index (if needed)
      // 当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        offsetIndex.append(largestOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        bytesSinceLastIndexEntry = 0
      }
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }

append方法入参介绍:

  • largestOffset:写入消息批次中消息的最大位移值
  • largestTimestamp:写入消息批次中最大的时间戳
  • shallowOffsetOfMaxTimestamp:写入消息批次中最大时间戳对应消息的位移
  • records:真正要写入的的消息。

这个方法主要做了那么几件事:

  1. 判断日志段是否为空,不为空则往下进行操作
  2. 调用ensureOffsetInRange方法,确保输入参数最大位移值是合法的。
  3. 调用 FileRecords 的 append 方法执行真正的写入。
  4. 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。
  5. 更新索引项和写入的字节数,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数。

(3)Read方法

以下read源码实现即为读取日志段的具体操作

  def read(startOffset: Long,
           maxSize: Int,
           maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
    // 将位移索引转换成物理文件位置索引
    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    if (startOffsetAndSize == null)
      return null

    val startPosition = startOffsetAndSize.position
    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
    
    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    // return a log segment but with zero size in the case below
    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    // 计算要读取的总字节数
    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
    // log.slice读取消息后封装成FetchDataInfo返回
    FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
  }

read方法接收四个参数

  • startOffset:要读取的第一条消息的位移
  • maxSize:能读取的最大字节数
  • maxPosition:能读取的最大文件的位置
  • minOneMessage:是否允许在消息体过大时至少返回第1条消息,为了解决消费饿死的情况

这段代码中,主要做了这几件事:

  1. 调用 translateOffset 方法定位要读取的起始文件位置 (startPosition)。
    举个例子,假设 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。

  2. 调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合。

(4)Recover方法

以下recover源码实现即为恢复日志段的具体操作,就是broker在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的LogSegment对象实例。

  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
    //情况索引文件
    offsetIndex.reset()
    timeIndex.reset()
    txnIndex.reset()
    var validBytes = 0
    var lastIndexEntry = 0
    maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
    try {
      //遍历日志段中所有消息集合
      for (batch <- log.batches.asScala) {
        // 校验
        batch.ensureValid()
        // 校验消息中最后一条消息的位移不能越界
        ensureOffsetInRange(batch.lastOffset)

        // The max timestamp is exposed at the batch level, so no need to iterate the records
        // 获取最大时间戳及所属消息位移
        if (batch.maxTimestamp > maxTimestampSoFar) {
          maxTimestampSoFar = batch.maxTimestamp
          offsetOfMaxTimestampSoFar = batch.lastOffset
        }

        // Build offset index
        // 当已写入字节数超过了 4KB 之后,调用索引对象的 append 方法新增索引项,同时清空已写入字节数
        if (validBytes - lastIndexEntry > indexIntervalBytes) {
          offsetIndex.append(batch.lastOffset, validBytes)
          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
          lastIndexEntry = validBytes
        }
        // 更新总消息字节数
        validBytes += batch.sizeInBytes()
        // 更新Porducer状态和Leader Epoch缓存
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
          leaderEpochCache.foreach { cache =>
            if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
          }
          updateProducerState(producerStateManager, batch)
        }
      }
    } catch {
      case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
        warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
          .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
    }
    // 遍历完后将 遍历累加的值和日志总字节数比较,
    val truncated = log.sizeInBytes - validBytes
    if (truncated > 0)
      debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
    //执行日志截断操作
    log.truncateTo(validBytes)
    // 调整索引文件大小
    offsetIndex.trimToValidSize()
    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
    timeIndex.trimToValidSize()
    truncated
  }

(5)truncateTo方法

下面我们进入到truncateTo方法中,看一下截断操作是怎么做的:

public int truncateTo(int targetSize) throws IOException {
    int originalSize = sizeInBytes();
    // 要截断的目标大小不能超过当前文件的大小
    if (targetSize > originalSize || targetSize < 0)
        throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
                " size of this log segment is " + originalSize + " bytes.");
    //如果目标大小小于当前文件大小,那么执行截断
    if (targetSize < (int) channel.size()) { 
        channel.truncate(targetSize);
        size.set(targetSize);
    }
    return originalSize - targetSize;
}

Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。

小结

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355