Kafka源码分析-Server-日志存储(1)

基本概念

Kafka使用日志文件的方式保存生产者发送的消息。每条消息都有一个offset值来表示它在分区的偏移量,这个offset值是逻辑值,并不是消息实际的存放物理地址。offset类似数据库表的主键,主键唯一确定了数据库表中的一条记录,offset唯一确定了分区的一条消息。Kafka存储机制在逻辑上如下图:


日志存储结构.png

为了提高写入的性能,同一个分区中的消息是顺序写入的,这就避免了随机写入带来的性能问题。一个topic可以有n个分区,每个分区也有多个副本。当一个分区的副本(无论是Leader副本还是Follower副本)被划分到某个Broker上时,Kafka就要在此Broker上为此分区建立相应的Log,生产者发送的消息会存储在Log里,然后被消费者拉取消费。
Kafka中存储的数据都是海量的,为了避免日志文件太大,Log并不是直接对应磁盘上的一个日志文件,而是对应磁盘上的一个目录,目录的命名规则是<topic_name>_<partition_id>,Log和分区直接的关系是一一对应的,对应分区的全部消息都存储在这个目录中。
Kafka通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,,日志文件用于记录消息,索引文件保存消息的索引。日志文档到一个阈值时,就会创建新的日志文件继续写入后续的消息和索引信息。日志文件的文件名的命名规则是[baseOffset].log,baseOffset是日志文件中的第一条消息的offset。下面是Log的结构:


Log结构.png

为了提高查询消息的效率,每个日志文件都对应一个索引文件,这个索引文件并没有为每条消息都建索引,而是使用稀疏索引方式为日志文件中的部分消息建立了索引。下面的图展示了所有文件和日志文件的关系:
log的索引.png

FileMessageSet

Kafka使用FileMessageSet管理日志文件,它对应磁盘上一个真正的日志文件。FileMessageSet继承了MessageSet抽象类,如下图:


FileMessageSet类图.png

MessageSet中保存的数据格式有三个部分:8个字节的offset值,4个字节的size表示message data的大小,这两个结合成为LogOverhead, message data保存了消息的数据,逻辑上对应一个Message对象:


FileMessageSet定义的消息结构.png

Kafka使用Message类表示消息,Message类使用ByteBuffer保存数据,格式和各个部分的含义如下:
Message类定义的保存数据.png
  • CRC32:4个字节,消息的校验码。
  • magic:1个字节,魔数标识,与消息的格式相关,取值为0或1。当magic为0时,消息的offset使用绝对offset且消息格式中没有timestamp部分;当magic为1时,消息的offset使用相对的offset且消息格式中有timestamp部分。所以,magic不同,消息的长度不同。
  • attributes:1个字节,消息的属性。其中0~2位表示消息使用的压缩类型,0表示gzip压缩,2表示snappy压缩,3表示lz4压缩。第3位表示时间戳类型,0表示创建时间,1表示追加时间。
  • timestamp:时间戳,含义有attributes的第3位确定。
  • key length:消息key的长度。
  • key:消息的key。
  • value length:消息value的长度。
  • value: 消息的value。
    MessageSet抽象类中定义了两个比较关键的方法:
/** Write the messages in this set to the given channel starting at the given offset byte. 
    * Less than the complete amount may be written, but no more than maxSize can be. The number
    * of bytes written is returned
    * 将当前MessageSet中的消息写入到Channel中
    * */
  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
/**
   * Provides an iterator over the message/offset pairs in this set
    * 提供迭代器,顺序读取MessageSet中的消息
   */
  def iterator: Iterator[MessageAndOffset]

这两个方法说明MessageSet具有顺序写入消息和顺序读取的特性。后面介绍FileMessageSet和ByteBufferMessageSet时会说明这两个方法的实现。

分析FileMessageSet实现类

核心字段:
*file: java.io.File类型,指向磁盘上对应的日志文件。
*channel:FileChannel类型,用于读写对应的日志文件。
*start和end:MessageSet对象除了表示一个完整的日志文件,还可以表示日志文件分片(Slice),start和end分别表示分片的起始位置和结束位置。文件分配的相关概念可以找资料了解下。

  • isSlice:Boolean类型,表示当前FileMessageSet是日志文件的分片。
  • _size:FileMessageSet大小,单位是字节。如果FileMessageSet是日志文件的分片,就表示分片的大小(即end-start的值);如果不是分片,则表示整个日志文件的大小。因为会有多个Handler线程并发向一个分区写入消息,所有的_size是AtomicInteger类型。
    FileMessageSet中有多个重载的构造方法,这里选择一个比较重要的构造方法来介绍。此构造方法会创建一个非分片的FileMessageSet对象。在Window NTFS文件系统或老版本的Linux文件系统上,进行文件的预分配会提高后续写操作的性能,为此FileMessageSet提供了preallocate的选项,决定是否开启预分配的功能。我们也可以通过FileMessageSet构造方法的mutable参数决定是否创建只读的FileMessageSet。
 * Create a file message set with no slicing, and with initFileSize and preallocate.
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
   */
  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
      this(file,
        //如果使用preallocate进行预分配,end会初始化为零
        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
        start = 0,
        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
        isSlice = false)
/**
   * Open a channel for the given file
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
   * @param file File path
   * @param mutable mutable
   * @param fileAlreadyExists File already exists or not
   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
   * @param preallocate Pre allocate file or not, gotten from configuration.
    * FileMessageSet.openChannel()方法的具体实现。
   */
  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) {//根据mutable参数创建的FileChannel是否可写
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {//进行文件预分配
          val randomAccessFile = new RandomAccessFile(file, "rw")
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()//创建可读可写的FileChannel
      }
    }
    else
      new FileInputStream(file).getChannel()//创建只读的FileChannel
  }

在FileMessageSet对象初始化的过程中,会移动FileChannel的position指针,原因是为了每次写入的消息都在日志文件的尾部,避免重启服务后的写入操作覆盖之前的操作。对应新创建的且进行了预分配空间的日志文件,其end会初始化为0,所以也是从文件的起始写入数据的。

/* if this is not a slice, update the file pointer to the end of the file
  * 将position移动到最后一个字节,之后从此position开始写消息,这样防止重启后覆盖之前的操作
  *
  * */
  if (!isSlice)
    /* set the file position to the last byte in the file */
    channel.position(math.min(channel.size.toInt, end))

介绍完FileMessageSet的构造过程,下面介绍其读写过程。FileMessageSet.append()方法实现了写日志文件的功能,其参数必须是ByteBufferMessageSet对象,下面是FileMessageSet.append()方法的代码:

/**
   * Append these messages to the message set
   */
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)//写文件
    _size.getAndAdd(written)//修改FileMessageSet的大小
  }
/** Write the messages in this set to the given channel 
    * 下面是 ByteBufferMessageSet.writeFullyTo()方法
    * */
  def writeFullyTo(channel: GatheringByteChannel): Int = {
    buffer.mark()
    var written = 0
    while (written < sizeInBytes)//将ByteBufferMessageSet中的数据全部写入文件
      written += channel.write(buffer)
    buffer.reset()
    written
  }

查找指定消息的功能在FileMessageSet.searchFor()方法中实现。searchFor()方法的逻辑是从指定的startPosition开始逐条遍历FileMessageSet中的消息,并将每个消息的offset和targetOffset,最后返回查找到的offset。在遍历过程中不会将消息的key和value读取到内存,只是只读取LogOverhead(即offset和size),并通过size定位到下一条消息的开始位置。FileMessageSet.searchFor()方法代码如下:

/**
   * Search forward for the file position of the last offset that is greater than or equal to the target offset
   * and return its physical position. If no such offsets are found, return null.
   * @param targetOffset The offset to search for.
   * @param startingPosition The starting position in the file to begin searching from.
   */
  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
    var position = startingPosition //起始位置
    //创建用于读取 LogOverhead(即offset和size)的ByteBuffer(长度12)
    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
    val size = sizeInBytes()//当前FileMessageSet的大小,单位是字节
    //从position开始逐条消息遍历
    while(position + MessageSet.LogOverhead < size) {
      buffer.rewind()//重置ByteBuffer的position指针,准备读入数据
      //读取LogOverhead。这里会确保startingPosition位于一个消息的开头,否则
      //读取到的并不是 LogOverhead,这个条件的保证会在后面提到
      channel.read(buffer, position)
      if(buffer.hasRemaining)//未读取到12个字节的LogOverhead,抛出异常
        throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
                                        .format(targetOffset, startingPosition, file.getAbsolutePath))
      buffer.rewind()//重置ByteBuffer的position指针,准备从ByteBuffer中读取数据
      val offset = buffer.getLong()//读取消息的offset,8个字节
      if(offset >= targetOffset)//判断是否符合退出条件
        return OffsetPosition(offset, position)//得到消息的位置
      val messageSize = buffer.getInt()//获取消息的size,4个字节
      if(messageSize < Message.MinMessageOverhead)
        throw new IllegalStateException("Invalid message size: " + messageSize)
      //移动Position,准备读取下个消息
      position += MessageSet.LogOverhead + messageSize
    }
    null//找不到offset大于等于targetOffset,则返回Null
  }

FileMessageSet.writeTo()方法是将FileMessageSet中的数据写入指定的其他Channel中,这里先了解此方法的功能,具体实现会在后面介绍“零拷贝”的时候一起介绍。FileMessageSet.read*()方法是从FileMessageSet中读取数据,可以将FileMessageSet中的数据读入到别的ByteBuffer中返回,也可以按照指定位置和长度形成分片的FileMessageSet对象返回。FileMessageSet.delete()方法是将整个日志文件删除。
FileMessageSet还有一个truncateTo()方法,主要负责将日志文件截断到targetSize大小。此方法在后面介绍分区中Leader副本切换时还会提到。下面是truncateTo()方法的具体实现:

/**
   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
   * given size falls on a valid message boundary.
   * In some versions of the JDK truncating to the same size as the file message set will cause an
   * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
   * size of the underlying FileChannel.
   * It is expected that no other threads will do writes to the log when this function is called.
   * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
   * @return The number of bytes truncated off
   */
  def truncateTo(targetSize: Int): Int = {
    val originalSize = sizeInBytes
    if(targetSize > originalSize || targetSize < 0)//检测targetSize的有效性
      throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
                               " size of this log segment is " + originalSize + " bytes.")
    if (targetSize < channel.size.toInt) {
      channel.truncate(targetSize)//裁剪文件
      channel.position(targetSize)//移动position
      _size.set(targetSize)//修改_size
    }
    originalSize - targetSize//返回剪裁掉的字节数
  }

FileMessageSet还实现了iterator()方法,返回一个迭代器。FileMessageSet迭代器读取消息的逻辑是:先读取消息的LogOverhead部分,然后按照size分配合适的ByteBuffer,再读取message data部分,最后将message data和offset封装成MessageOffset对象返回。迭代器的实现和searchFor()方法类似。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,589评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,615评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,933评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,976评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,999评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,775评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,474评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,359评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,854评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,007评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,146评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,826评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,484评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,029评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,153评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,420评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,107评论 2 356

推荐阅读更多精彩内容