Kafka中Message存储相关类大揭密

  • 我们使用Kafka, 最终都是要存,取消息,今天我们就来看下源码中和消息相关的类;
  • 涉及到的类:
    1. Message
    2. Record
    3. MessageSet
    4. ByteBufferMessageSet
    5. BufferingOutputStream
    6. MessageWriter
    7. FileMessageSet

Message类:
  • 所在文件: core/src/main/scala/kafka/message/Message.scala
  • 作用: kafka系统单条message结构组成
  • Message结构:
1.png
  • 这个类主要就是使用ByteBuffer来承载Message这个结构, 默认构造函数封装了ByteBuffer, 还提供了一系列的this构造函数,参数为Message结构的若干个字段;
  • checksum的计算: checksum的计算从Magic字段开始, 计算结果写入CRC32字段.
  • 提供了一系列便捷方法,来获取Message结构中各个字段和属性:
  /**
   * The complete serialized size of this message in bytes (including crc, header attributes, etc)
   */
  def size: Int = buffer.limit

  /**
   * The length of the key in bytes
   */
  def keySize: Int = buffer.getInt(Message.KeySizeOffset)

 /**
   * The length of the message value in bytes
   */
  def payloadSize: Int = buffer.getInt(payloadSizeOffset)
 /**
   * The magic version of this message
   */
  def magic: Byte = buffer.get(MagicOffset)
  
  /**
   * The attributes stored with this message
   */
  def attributes: Byte = buffer.get(AttributesOffset)
  
  /**
   * The compression codec used with this message
   */
  def compressionCodec: CompressionCodec = 
    CompressionCodec.getCompressionCodec(buffer.get(AttributesOffset) & CompressionCodeMask)
  
  /**
   * A ByteBuffer containing the content of the message
   */
  def payload: ByteBuffer = sliceDelimited(payloadSizeOffset)
Record类
  • 实际上kafka源码中没有这个类, kafka中的一条消息是上面我们讲的一个Message, 但实际上记录到log文件中的不是这个Message, 而是一条Record
  • Record的结构: 其实很简单 [Offset MessageSize Message], 在一条Message前面加上8字节的Offset和4字节的MessageSize
  • 实际是多条Record就构成了我们下面要说的一个MessageSet
MessageSet类
  • 所在文件: core/src/main/scala/kafka/message/MessageSet.scala
  • 作用: 存储若干条Record, 官网上给出的结构:
    MessageSet => [Offset MessageSize Message]  => 这里就是我们上面说的Record
      Offset => int64
      MessageSize => int32
      Message
  • 定义:abstract class MessageSet extends Iterable[MessageAndOffset]
    从定义可以看出MessageSet是个抽象类, 且继承了Iterable[MessageAndOffset],
  • 主要方法:
    1. def iterator: Iterator[MessageAndOffset]: 返回迭代器, 用于迭代所有的MessageAndOffset, 主要是因为它继承了Iterable[MessageAndOffset];
    2. def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int:写message到指定的Channel
  • Object Message里其实已经定义了我们上面说的Record:
  val MessageSizeLength = 4
  val OffsetLength = 8
  val LogOverhead = MessageSizeLength + OffsetLength
 
 //这里的entry就是我们说的Record
 def entrySize(message: Message): Int = LogOverhead + message.size
  • 结构示意图:
2.jpg
ByteBufferMessageSet类
  • 所在文件: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  • 定义: class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Logging
    1. 继承于MessageSet;
    2. 提供了ByteBufferMessageSet之间的相互转换, MessageSet在内存中的操作
  • 主要方法:
    1. override def iterator: Iterator[MessageAndOffset] = internalIterator(): 返回迭代器,用来遍历包含的每条MessageAndOffset; 主要是用来从ByteBuffer里抽取Message
      1.1 实际上是通过internalIterator()方法返回;
      1.2 private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset],返回MessageAndOffset的迭代器 new IteratorTemplate[MessageAndOffset]
      1.3 真正干活的是IteratorTemplate[MessageAndOffset]override def makeNext(): MessageAndOffset, 实际上就是把上面介绍的MessageSet的结构里的Record一条条解出来, 对于压缩后的MessageSet涉及到一层递归,具体可以参见上面的 2.jpg
      1.4 放一段核心代码:
       if(isShallow) { //是不是要作深层迭代需要迭代,就是我们上面2.jpg里的M1
          new MessageAndOffset(newMessage, offset) //直接返回一条MessageAndOffset
        } else { //需要迭代,就是我们上面2.jpg里的M2
          newMessage.compressionCodec match {//根据压缩Codec决定作什么处理
            case NoCompressionCodec => //未压缩,直接返回一条MessageAndOffset
              innerIter = null
              new MessageAndOffset(newMessage, offset)
            case _ => //压缩了的MessageSet, 就再深入一层, 逐条解压读取
              innerIter = ByteBufferMessageSet.deepIterator(newMessage)
              if(!innerIter.hasNext)
                innerIter = null
              makeNext()
          }
        }
  1. private def create(offsetCounter: AtomicLong, compressionCodec: CompressionCodec, messages: Message*): ByteBuffer: 用于从Message List到ByteBuffer的转换, 实际上最后生成的ByteBuffer里就是上面说的一条Record
   if(messages.size == 0) {
      MessageSet.Empty.buffer
    } else if(compressionCodec == NoCompressionCodec) {
      // 非压缩的
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      for(message <- messages)
        writeMessage(buffer, message, offsetCounter.getAndIncrement)
      buffer.rewind()
      buffer
    } else {
     //压缩的使用 MessageWriter类来写
      var offset = -1L
      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
      messageWriter.write(codec = compressionCodec) { outputStream =>
        val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream))
        try {
          //逐条压缩
          for (message <- messages) {
            offset = offsetCounter.getAndIncrement
            output.writeLong(offset)
            output.writeInt(message.size)
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)
          }
        } finally {
          output.close()
        }
      }
      //写入buffer作为一条Record
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      writeMessage(buffer, messageWriter, offset)
      buffer.rewind()
      buffer
    }
  1. def writeTo(channel: GatheringByteChannel, offset: Long, size: Int): Int: 写MessageSet到GatheringByteChannel:
    // Ignore offset and size from input. We just want to write the whole buffer to the channel.
    buffer.mark()
    var written = 0
    while(written < sizeInBytes)
      written += channel.write(buffer)
    buffer.reset()
    written
  }
  1. Message验证和Offset的重新赋值: 这是一个神奇的函数,在broker把收到的producer request里的MessageSet append到Log之前,以及consumer和follower获取消息之后,都需要进行校验, 这个函数就是这个验证的一部分, 我把相应的说明写在源码里,这个函数在后面讲到处理log append和consumer时我们还会用到.
private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: AtomicLong,
                                                      sourceCodec: CompressionCodec,
                                                      targetCodec: CompressionCodec,
                                                      compactedTopic: Boolean = false): ByteBufferMessageSet = {
    if(sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) { // 非压缩的Message
      // do in-place validation and offset assignment
      var messagePosition = 0
      buffer.mark()
      while(messagePosition < sizeInBytes - MessageSet.LogOverhead) {
        buffer.position(messagePosition)
       // 根据参数传入的 offsetCountern 更新当前的Offset
        buffer.putLong(offsetCounter.getAndIncrement())
        val messageSize = buffer.getInt()
        val positionAfterKeySize = buffer.position + Message.KeySizeOffset + Message.KeySizeLength
        // 如果是compact topic(比如__cosumer_offsets),  key是一定要有的, 这里检查这个key的合法性
        if (compactedTopic && positionAfterKeySize < sizeInBytes) {
          buffer.position(buffer.position() + Message.KeySizeOffset)
          val keySize = buffer.getInt()
          if (keySize <= 0) {
            buffer.reset()
            throw new InvalidMessageException("Compacted topic cannot accept message without key.")
          }
        }
        messagePosition += MessageSet.LogOverhead + messageSize
      }
      buffer.reset()
      this
    } else {
      // 压缩的Message,  下面源码里的注释已经说得很清楚了
      // We need to deep-iterate over the message-set if any of these are true:
      // (i) messages are compressed
      // (ii) the topic is configured with a target compression codec so we need to recompress regardless of original codec
      // 深度迭代, 获取所有的message
      val messages = this.internalIterator(isShallow = false).map(messageAndOffset => {
        if (compactedTopic && !messageAndOffset.message.hasKey)
          throw new InvalidMessageException("Compacted topic cannot accept message without key.")

        messageAndOffset.message
      })
      //使用targetCodec重新压缩
      new ByteBufferMessageSet(compressionCodec = targetCodec, offsetCounter = offsetCounter, messages = messages.toBuffer:_*)
    }
  }
BufferingOutputStream类
  • 所在文件: core/src/main/scala/kafka/message/MessageWriter.scala
  • 定义: class BufferingOutputStream(segmentSize: Int) extends OutputStream 继承自OutputStream
  • 作用: 这个来接纳写入它的各种数据类型, 比如int, byte, byte array, 其内部定义了 Segment类, Segment内部使用Array[byte]来存储数据, 多个Segment连成一个链接, 链接可以自动扩展,来存储写入BufferingOutputStream的所有数据
  • 主要方法:
    1. 一组write函数: 用于写入不能类型的数据;
    2. def reserve(len: Int): ReservedOutput: 从当前位置开始预留len长度存储空间
    3. def writeTo(buffer: ByteBuffer): Unit: 将存储在Segment链接中的数据全部拷贝到ByteBuffer中 .
MessageWriter
  • 所在文件: core/src/main/scala/kafka/message/MessageWriter.scala
  • 定义: class MessageWriter(segmentSize: Int) extends BufferingOutputStream(segmentSize), 继承自上面的BufferingOutputStream
  • 作用: 在ByteBufferMessageSet::create中用到, 将若干条Message构造成多条对应的压缩后的Record, 将这个压缩后的结果再次作为payload构造成一条新的Message;
  • 主要方法:
    1. 构造Message, 添加Crc, 写入Magic, Attribete, key size, key.......
def write(key: Array[Byte] = null, codec: CompressionCodec)(writePayload: OutputStream => Unit): Unit = {
    withCrc32Prefix {
      write(CurrentMagicValue)
      var attributes: Byte = 0
      if (codec.codec > 0)
        attributes = (attributes | (CompressionCodeMask & codec.codec)).toByte
      write(attributes)
      // write the key
      if (key == null) {
        writeInt(-1)
      } else {
        writeInt(key.length)
        write(key, 0, key.length)
      }
      // write the payload with length prefix
      withLengthPrefix {
        writePayload(this)
      }
    }
  }
FileMessageSet类
  • 所在文件:core/src/main/scala/kafka/log/FileMessageSet.scala
  • 定义: class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] val start: Int, private[log] val end: Int, isSlice: Boolean) extends MessageSet with Logging
  • 作用:用于MessageSet与磁盘文件之前的读取
  • 主要方法:
  1. def iterator(maxMessageSize: Int): Iterator[MessageAndOffset]: 返回一个迭代器,用于获取对应本地log文件里的每一条Record, 写入到文件里是不是Message,而是Record
override def makeNext(): MessageAndOffset = {
        if(location >= end)
          return allDone()
          
        // read the size of the item
        sizeOffsetBuffer.rewind()
        // 先读Record的头部,Offset + MessageSize , 共12字节
        channel.read(sizeOffsetBuffer, location)
        if(sizeOffsetBuffer.hasRemaining)
          return allDone()
        
        sizeOffsetBuffer.rewind()
        val offset = sizeOffsetBuffer.getLong()
        val size = sizeOffsetBuffer.getInt()
        if(size < Message.MinHeaderSize)
          return allDone()
        if(size > maxMessageSize)
          throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))
        
        // read the item itself 
       //  根所MessageSize读Message
        val buffer = ByteBuffer.allocate(size)
        channel.read(buffer, location + 12)
        if(buffer.hasRemaining)
          return allDone()
        buffer.rewind()
        
        // increment the location and return the item
        location += size + 1
        new MessageAndOffset(new Message(buffer), offset)
      }
  1. def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written) } :将多条Record`由内存落地到本地Log文件
  2. def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int: 将本地Log文件中的Message发送到批定的Channel
 val newSize = math.min(channel.size().toInt, end) - start
    if (newSize < _size.get()) {
      throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
        .format(file.getAbsolutePath, _size.get(), newSize))
    }
    val position = start + writePosition
    val count = math.min(size, sizeInBytes)
    val bytesTransferred = (destChannel match {
      // 利用sendFile系统调用已零拷贝方式发送给客户端
      case tl: TransportLayer => tl.transferFrom(channel, position, count)
      case dc => channel.transferTo(position, count, dc)
    }).toInt
    trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
      + " bytes requested for transfer : " + math.min(size, sizeInBytes))
    bytesTransferred
总结
  • 我们看到ByteBufferMessageSetFileMessageSet都是继承于MessageSet, 也就是说一条Record的结构在内存和本地文件中的存储格式是完全一样的,在Message的读写时不用作多余的转换。

Kafka源码分析-汇总

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • ** 今天看了一下kafka官网,尝试着在自己电脑上安装和配置,然后学一下官方document。** Introd...
    RainChang阅读 4,994评论 1 30
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,311评论 1 15
  • 发送到Kafka的消息最终都是要落盘存储到磁盘上; 本章涉及到的类:OffsetIndex;LogSegment;...
    扫帚的影子阅读 3,410评论 0 1
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,812评论 4 54