Kafka源码分析-Server-日志存储(3)-ByteBufferMessageSet

ByteBufferMessageSet分析

介绍完生产者和消费者对压缩消息的处理过程,我们回到服务端,开始对ByteBufferMessageSet的分析,它底层使用了ByteBuffer保存消息数据。ByteBufferMessageSet角色和功能与MemoryRecords类似。ByteBufferMessageSet提供了三个方面的功能:

  • 将Message集合按照指定的压缩类型进行压缩,此功能主要用于构建ByteBufferMessageSet对象,通过ByteBufferMessageSet.create()方法完成。
  • 提供迭代器,实现深层迭代和浅层迭代两种迭代方式。
  • 提供了消息验证和offset分配的功能
    在ByteBufferMessageSet.create()方法中实现了消息的压缩以及offset分配,步骤如下:
  • 如果传入的Message集合为空,则返回空ByteBuffer。
  • 如果要求不对消息进行压缩,则通过OffsetAssigner分配每个消息的offset,在将消息写入到ByteBuffer后,返回ByteBuffer。OffsetAssigner的功能是存储一串offset值,并像迭代器那样逐个返回。
  • 如果要求对消息进行压缩,则先将Message集合按照指定的压缩方式进行压缩并放入缓冲区,同时也会完成offset分配,然后按照压缩消息的格式写入外层消息,最后将整个外层消息所在的ByteBuffer返回。
    ByteBufferMessageSet.create()方法代码如下:
private def create(offsetAssigner: OffsetAssigner,
                     compressionCodec: CompressionCodec,
                     wrapperMessageTimestamp: Option[Long],
                     timestampType: TimestampType,
                     messages: Message*): ByteBuffer = {
    if (messages.isEmpty)//第一种情况:处理Message集合为空的情况
      MessageSet.Empty.buffer
    //第二种情况:不需要对Message集合进行压缩
    else if (compressionCodec == NoCompressionCodec) {
      val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
      for (message <- messages) writeMessage(buffer, message, offsetAssigner.nextAbsoluteOffset())//为每个消息分配offset,并写入Buffer
      buffer.rewind()
      buffer//返回buffer
    } else {//第三种情况:需要对Message集合进行压缩
      //得到Magic值和时间戳
      val magicAndTimestamp = wrapperMessageTimestamp match {
        case Some(ts) => MagicAndTimestamp(messages.head.magic, ts)
        case None => MessageSet.magicAndLargestTimestamp(messages)
      }
      var offset = -1L
      //底层使用byte数组保存写入的压缩数据
      val messageWriter = new MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 1 << 16))
      messageWriter.write(codec = compressionCodec, timestamp = magicAndTimestamp.timestamp, timestampType = timestampType, magicValue = magicAndTimestamp.magic) { outputStream =>
        //创建指定压缩类型的输出流
        val output = new DataOutputStream(CompressionFactory(compressionCodec, magicAndTimestamp.magic, outputStream))
        try {
          for (message <- messages) {//遍历写入内层压缩信息
            offset = offsetAssigner.nextAbsoluteOffset()
            //Magic值为1,写入的是相对offset;Magic值为0,写入的是offset
            if (message.magic != magicAndTimestamp.magic)
              throw new IllegalArgumentException("Messages in the message set must have same magic value")
            // Use inner offset if magic value is greater than 0
            if (magicAndTimestamp.magic > Message.MagicValue_V0)
              output.writeLong(offsetAssigner.toInnerOffset(offset))
            else
              output.writeLong(offset)
            output.writeInt(message.size)//写入size
            output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit)//写入Message数据
          }
        } finally {
          output.close()
        }
      }
      val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)
      //按照消息格式写入整个外层消息。外层消息的offset是最后一条内层消息的offset
      writeMessage(buffer, messageWriter, offset)
      buffer.rewind()
      buffer
    }

ByteBufferMessageSet提供的浅层迭代器和深层迭代器与MemoryRecords的迭代器的实现和功能都十分类似,与MemoryRecords.RecordsIterator的实现类似。
ByteBufferMessageSet.validateMessagesAndAssignOffsets()方法实现了验证消息并分配offset的功能,需要验证的部分如下:

  • 检查Magic Value;
  • 检查时间戳和时间戳类型。
  • 对于压缩消息需要检查它是否有key。
  • 可以重新设定时间戳类型和时间戳。
  • 进行offset分配;
  • 如果消息压缩类型和Broker指定的压缩类型不一致,需要进行重新压缩。
    下面是validateMessagesAndAssignOffsets()方法的代码第一个参数是分配offset的起始值,其他的参数比较好理解;改方法的第二个返回值表示ByteBufferMessageSet中Message集合的长度是否会发生变化。
/**
   * Update the offsets for this message set and do further validation on messages including:
   * 1. Messages for compacted topics must have keys
   * 2. When magic value = 1, inner messages of a compressed message set must have monotonically increasing offsets
   *    starting from 0.
   * 3. When magic value = 1, validate and maybe overwrite timestamps of messages.
   *
   * This method will convert the messages in the following scenarios:
   * A. Magic value of a message = 0 and messageFormatVersion is 1
   * B. Magic value of a message = 1 and messageFormatVersion is 0
   *
   * If no format conversion or value overwriting is required for messages, this method will perform in-place
   * operations and avoid re-compression.
   *
   * Returns the message set and a boolean indicating whether the message sizes may have changed.
   */
  private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
                                                      now: Long,
                                                      sourceCodec: CompressionCodec,
                                                      targetCodec: CompressionCodec,
                                                      compactedTopic: Boolean = false,
                                                      messageFormatVersion: Byte = Message.CurrentMagicValue,
                                                      messageTimestampType: TimestampType,
                                                      messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
    if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {//处理非压缩的情况
      // check the magic value 检测所有的Massage的Magic value是否与指定的magic value 一致
      if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
        // Message format conversion
        // 如果有Massage的Magic value是否与指定的magic value 不一致,则需要统一,这样
        // 就可能导致消息长度变化,需要创建新的 ByteBufferMessageSet。同时还会进行offset分配,
        // 验证并更新CRC32,时间戳等信息。
        (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
          messageFormatVersion), true)
      } else {
        // Do in-place validation, offset assignment and maybe set timestamp
        // 处理非压缩消息且Magic值统一的情况,由于Magic值确定,长度不会改变。主要是进行offset分配,
        // 验证并更新CRC32,时间戳等信息
        (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
          messageTimestampDiffMaxMs), false)
      }
    } else {//处理压缩的情况
      // Deal with compressed messages
      // We cannot do in place assignment in one of the following situations: inPlaceAssignment标识是否可以直接复用当前ByteBufferMessage对象。四种情况不能复用。
      // 1. Source and target compression codec are different   1.消息当前压缩类型与此Broker指定的压缩类型不一致,需要重新压缩。
      // 2. When magic value to use is 0 because offsets need to be overwritten 2.Magic为0时,需要重写消息的 offset 为绝对offset
      // 3. When magic value to use is above 0, but some fields of inner messages need to be overwritten.3.Magic大于0,但内部压缩信息某些字段需要修改,例如时间戳。
      // 4. Message format conversion is needed.4.消息格式需要转换。

      // No in place assignment situation 1 and 2  检测情况1,检测情况2
      var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0

      var maxTimestamp = Message.NoTimestamp
      val expectedInnerOffset = new LongRef(0)
      val validatedMessages = new mutable.ArrayBuffer[Message]
      //遍历内存压缩消息,这个步骤会解压
      this.internalIterator(isShallow = false).foreach { messageAndOffset =>
        val message = messageAndOffset.message
        validateMessageKey(message, compactedTopic)//检测消息的key

        if (message.magic > Message.MagicValue_V0 && messageFormatVersion > Message.MagicValue_V0) {
          // No in place assignment situation 3
          // Validate the timestamp
          validateTimestamp(message, now, messageTimestampType, messageTimestampDiffMaxMs)//检测时间戳
          // Check if we need to overwrite offset  检测情况3,检查内部offset是否正常
          if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
            inPlaceAssignment = false
          maxTimestamp = math.max(maxTimestamp, message.timestamp)
        }

        if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
          throw new InvalidMessageException("Compressed outer message should not have an inner message with a " +
            s"compression attribute set: $message")

        // No in place assignment situation 4 检测情况4
        if (message.magic != messageFormatVersion)
          inPlaceAssignment = false
        // 保存通过上述检测和转换的Message集合
        validatedMessages += message.toFormatVersion(messageFormatVersion)
      }

      if (!inPlaceAssignment) {//不能复用当前ByteBufferMessageSet对象的场景
        // Cannot do in place assignment.
        val wrapperMessageTimestamp = {
          if (messageFormatVersion == Message.MagicValue_V0)
            Some(Message.NoTimestamp)
          else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
            Some(maxTimestamp)
          else // Log append time
            Some(now)
        }
        //创建新ByteBufferMessageSet对象,重新压缩。此时调用上面介绍的create()方法进行压缩
        (new ByteBufferMessageSet(compressionCodec = targetCodec,
                                  offsetCounter = offsetCounter,
                                  wrapperMessageTimestamp = wrapperMessageTimestamp,
                                  timestampType = messageTimestampType,
                                  messages = validatedMessages: _*), true)
      } else {//复用当前ByteBufferMessageSet对象,这样少一次压缩的操作
        // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
        buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
        // validate the messages
        validatedMessages.foreach(_.ensureValid())

        var crcUpdateNeeded = true
        val timestampOffset = MessageSet.LogOverhead + Message.TimestampOffset
        val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
        val timestamp = buffer.getLong(timestampOffset)
        val attributes = buffer.get(attributeOffset)
        if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
          // We don't need to recompute crc if the timestamp is not updated.
          crcUpdateNeeded = false
        else if (messageTimestampType == TimestampType.LOG_APPEND_TIME) {
          // Set timestamp type and timestamp
          //更新外层消息的时间戳,attribute和CRC32
          buffer.putLong(timestampOffset, now)
          buffer.put(attributeOffset, messageTimestampType.updateAttributes(attributes))
        }

        if (crcUpdateNeeded) {
          // need to recompute the crc value
          buffer.position(MessageSet.LogOverhead)
          val wrapperMessage = new Message(buffer.slice())
          Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
        }
        buffer.rewind()
        (this, false)
      }
    }
  }

ByteBufferMessageSet中的其他方法是用来辅助实现上述方法的。
最终,我们回到开始的那个问题,FileMessageSet.append()方法会将ByteBufferMessageSet中全部数据追加到日志文件中,对于压缩消息来说,多条压缩消息就以一个外层的状态存在于日志文件了。当消费者获取消息时也会得到压缩消息,这样就实现了"端到端的压缩"。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容