Kafka消息的磁盘存储

  • 发送到Kafka的消息最终都是要落盘存储到磁盘上;
  • 本章涉及到的类:
    1. OffsetIndex;
    2. LogSegment;

OffsetIndex类
  • 所在文件: core/src/main/scala/kafka/log/OffsetIndex.scala
  • 作用: 我们知道所有发送到kafka的消息都是以Record的结构(Kafka中Message存储相关类大揭密)写入到本地文件, 有写就要有读,读取时一般是从给定的offset开始读取,这个offset是逻辑offset, 需要转换成文件的实际偏移量, 为了加速这个转换, kafka针对每个log文件,提供了index文件, index文件采用稀疏索引的方式, 只记录部分log offset到file position的转换, 然后还需要在log文件中进行少量的顺序遍历, 来精确定位到需要的Record;
  • index文件结构: 文件里存的是一条条的log offset与file position的映射, 每条记录8个字节,前4个字节是log offset, 后4个字节是file position, 这样的每一条映射信息我们可以称为是一个slot
  • 读写方式: 为了加速index文件的读写, 采用了文件内存映射的方式:
    /* initialize the memory mapping for this index */
    private var mmap: MappedByteBuffer = 
    {
      val newlyCreated = file.createNewFile()
      val raf = new RandomAccessFile(file, "rw")
      try {
        /* pre-allocate the file if necessary */
        if(newlyCreated) {
          if(maxIndexSize < 8)
            throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
        }
          
        /* memory-map the file */
        val len = raf.length()
        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
          
        /* set the position in the index for the next entry */
        if(newlyCreated)
          idx.position(0)
        else
          // if this is a pre-existing index, assume it is all valid and set position to last entry
          idx.position(roundToExactMultiple(idx.limit, 8))
        idx
      } finally {
        CoreUtils.swallow(raf.close())
      }
    }

  • 主要方法:
    1. def lookup(targetOffset: Long): OffsetPosition: 查找小于或等于targetOffset的最大Offset;
 maybeLock(lock) {
      val idx = mmap.duplicate
      val slot = indexSlotFor(idx, targetOffset)
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else
        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
      }
  1. private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int:采用二分法查找到对于targetOffset在index文件中的slot
   // binary search for the entry 二分查找法
    var lo = 0
    var hi = entries-1
    while(lo < hi) {
      val mid = ceil(hi/2.0 + lo/2.0).toInt
      val found = relativeOffset(idx, mid)
      if(found == relOffset)
        return mid
      else if(found < relOffset)
        lo = mid
      else
        hi = mid - 1
    }
  1. def append(offset: Long, position: Int): 向index文件中追加一个offset/location的映射信息
  2. def truncateTo(offset: Long): 按给定的offset,找到对应的slot, 然后截断
  3. def resize(newSize: Int): 重新设置index文件size, 但保持当前mmap的position不变
      inLock(lock) {
      val raf = new RandomAccessFile(file, "rw")
      val roundedNewSize = roundToExactMultiple(newSize, 8)
      val position = this.mmap.position
      
      /* Windows won't let us modify the file length while the file is mmapped :-( */
      if(Os.isWindows)
        forceUnmap(this.mmap)
      try {
        raf.setLength(roundedNewSize)
        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
        this.maxEntries = this.mmap.limit / 8
        this.mmap.position(position)
      } finally {
        CoreUtils.swallow(raf.close())
      }
    }
LogSegment
  • 所在文件: core/src/main/scala/kafka/log/LogSegment.scala
  • 作用: 封装对消息落地后的log和index文件的所有操作
  • 类定义:
      class LogSegment(val log: FileMessageSet, 
                 val index: OffsetIndex, 
                 val baseOffset: Long, 
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Loggin

可以看到使用FileMessageSet来操作Log文件, 使用OffsetIndex来操作Index文件

  • 主要方法:
    1. def size: Long = log.sizeInBytes() : 返回当前log文件的大小
    2. def append(offset: Long, messages: ByteBufferMessageSet):追加msg到log文件尾部,必要时更新index文件
 if (messages.sizeInBytes > 0) {
      // append an entry to the index (if needed)
      // index采用的是稀疏索引, 所以先判断是否需要写入
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages)  //追加msg到log文件尾部
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  1. def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size): FetchDataInfo: 根据给定的offset信息等读取相应的msg 和offset信息,构成FetchDataInfo返回
 val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

    // if the size is zero, still return a log segment but with zero size
    if(maxSize == 0)
      return FetchDataInfo(offsetMetadata, MessageSet.Empty)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    val length = 
      maxOffset match {
        case None =>
          // no max offset, just read until the max position
          min((maxPosition - startPosition.position).toInt, maxSize)
        case Some(offset) => {
          // there is a max offset, translate it to a file position and use that to calculate the max read size
          if(offset < startOffset)
            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
          val mapping = translateOffset(offset, startPosition.position)
          val endPosition = 
            if(mapping == null)
              logSize // the max offset is off the end of the log, use the end of the file
            else
              mapping.position
          min(min(maxPosition, endPosition) - startPosition.position, maxSize).toInt
        }
      }
    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))

实际上最终是调用FileMessageSetread方法读取

  1. def recover(maxMessageSize: Int): Int :读取当前的log文件内容,重新构建index文件
//逐条读取log里的msg, 然后构建index文件
val iter = log.iterator(maxMessageSize)
   try {
     while(iter.hasNext) {
       val entry = iter.next
       entry.message.ensureValid()
       if(validBytes - lastIndexEntry > indexIntervalBytes) {
         // we need to decompress the message, if required, to get the offset of the first uncompressed message
         val startOffset =
           entry.message.compressionCodec match {
             case NoCompressionCodec =>
               entry.offset
             case _ =>
               ByteBufferMessageSet.deepIterator(entry.message).next().offset
         }
         index.append(startOffset, validBytes)
         lastIndexEntry = validBytes
       }
       validBytes += MessageSet.entrySize(entry.message)
     }
   } catch {
     case e: InvalidMessageException => 
       logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
   }
  1. def truncateTo(offset: Long): Int: 根据给定的offset截断log和index文件
 val mapping = translateOffset(offset)
    if(mapping == null)
      return 0
    index.truncateTo(offset)
    // after truncation, reset and allocate more space for the (new currently  active) index
    index.resize(index.maxIndexSize)
    val bytesTruncated = log.truncateTo(mapping.position)
    if(log.sizeInBytes == 0)
      created = time.milliseconds
    bytesSinceLastIndexEntry = 0
    bytesTruncated
  1. def nextOffset(): Long : 获取下一个offset值, 其实就是当前最大的offset + 1
val ms = read(index.lastOffset, None, log.sizeInBytes)
    if(ms == null) {
      baseOffset
    } else {
      ms.messageSet.lastOption match {
        case None => baseOffset
        case Some(last) => last.nextOffset
      }
    }

Kafka源码分析-汇总

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容