Kafka 日志对象由多个日志段对象组成,日志段是Kafka保存消息的最小载体,而每个日志段对象会在磁盘上创建一组文件,
- 包括消息日志文件(.log)
- 位移索引文件(.index)
- 时间戳索引文件(.timeindex)
- 已中止(Aborted)事务的索引文件(.txnindex)
本文对Kafka日志段进行详细说明,重点介绍Kafka日志段LogSegment的声明、append、read、recover方法。
下面我们看一下LogSegment的实现情况,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala
LogSegmen
LogSegment.scala这个文件里面定义了三个对象:
- LogSegment class;
- LogSegment object;
- LogFlushStats object。LogFlushStats 结尾有个 Stats,它是做统计用的,主要负责为日志落盘进行计时。
说明:scala语法中,允许scala中包含同名的class 和object,这种用法称之为伴生(Companion),class 对象称之为伴生类,和java语法中的类是一样的。而 object是一个单例对象
里面包含静态方法和变量,用java比喻时,object相当于java的utils工具类
(1)日志段LogSegment 注释说明
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging { … }
lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分别对应于刚才所说的 3 个索引文件(位移索引、时间戳索引、已终止的事务索引)
字段解析和说明
- baseoffset:每个日志段都保留了自身的起始位移。
- indexIntervalBytes:即为Broker段参数,log.index.interval.bytes,控制了日志段对象新增索引项的频率。默认情况下日志段写入4KB的消息才会新增一条索引项目
- rollJitterMs:是一个抖动值,如果没有这个干扰值,未来某个时刻可能同时创建多个日志段对象,将会大大的增加IO压力。
(2)Append方法
以下append源码实现即为写入消息的具体操作
@nonthreadsafe
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
// 判断是否日志段是否为空
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
// 确保输入参数最大位移值是合法的
ensureOffsetInRange(largestOffset)
// append the messages
// 执行真正的写入
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
// Update the in memory max timestamp and corresponding offset.
// 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
}
// append an entry to the index (if needed)
// 当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex.append(largestOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
append方法入参介绍:
- largestOffset:写入消息批次中消息的最大位移值
- largestTimestamp:写入消息批次中最大的时间戳
- shallowOffsetOfMaxTimestamp:写入消息批次中最大时间戳对应消息的位移
- records:真正要写入的的消息。
这个方法主要做了那么几件事:
- 判断日志段是否为空,不为空则往下进行操作
- 调用ensureOffsetInRange方法,确保输入参数最大位移值是合法的。
- 调用 FileRecords 的 append 方法执行真正的写入。
- 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。
- 更新索引项和写入的字节数,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了 4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数。
(3)Read方法
以下read源码实现即为读取日志段的具体操作
def read(startOffset: Long,
maxSize: Int,
maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
if (maxSize < 0)
throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
// 将位移索引转换成物理文件位置索引
val startOffsetAndSize = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if (startOffsetAndSize == null)
return null
val startPosition = startOffsetAndSize.position
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
// 计算要读取的总字节数
val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
// log.slice读取消息后封装成FetchDataInfo返回
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
read方法接收四个参数
- startOffset:要读取的第一条消息的位移
- maxSize:能读取的最大字节数
- maxPosition:能读取的最大文件的位置
- minOneMessage:是否允许在消息体过大时至少返回第1条消息,为了解决消费饿死的情况
这段代码中,主要做了这几件事:
调用 translateOffset 方法定位要读取的起始文件位置 (startPosition)。
举个例子,假设 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合。
(4)Recover方法
以下recover源码实现即为恢复日志段的具体操作,就是broker在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的LogSegment对象实例。
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
//情况索引文件
offsetIndex.reset()
timeIndex.reset()
txnIndex.reset()
var validBytes = 0
var lastIndexEntry = 0
maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
try {
//遍历日志段中所有消息集合
for (batch <- log.batches.asScala) {
// 校验
batch.ensureValid()
// 校验消息中最后一条消息的位移不能越界
ensureOffsetInRange(batch.lastOffset)
// The max timestamp is exposed at the batch level, so no need to iterate the records
// 获取最大时间戳及所属消息位移
if (batch.maxTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = batch.maxTimestamp
offsetOfMaxTimestampSoFar = batch.lastOffset
}
// Build offset index
// 当已写入字节数超过了 4KB 之后,调用索引对象的 append 方法新增索引项,同时清空已写入字节数
if (validBytes - lastIndexEntry > indexIntervalBytes) {
offsetIndex.append(batch.lastOffset, validBytes)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
lastIndexEntry = validBytes
}
// 更新总消息字节数
validBytes += batch.sizeInBytes()
// 更新Porducer状态和Leader Epoch缓存
if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
updateProducerState(producerStateManager, batch)
}
}
} catch {
case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
.format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
}
// 遍历完后将 遍历累加的值和日志总字节数比较,
val truncated = log.sizeInBytes - validBytes
if (truncated > 0)
debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
//执行日志截断操作
log.truncateTo(validBytes)
// 调整索引文件大小
offsetIndex.trimToValidSize()
// A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
timeIndex.trimToValidSize()
truncated
}
(5)truncateTo方法
下面我们进入到truncateTo方法中,看一下截断操作是怎么做的:
public int truncateTo(int targetSize) throws IOException {
int originalSize = sizeInBytes();
// 要截断的目标大小不能超过当前文件的大小
if (targetSize > originalSize || targetSize < 0)
throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
" size of this log segment is " + originalSize + " bytes.");
//如果目标大小小于当前文件大小,那么执行截断
if (targetSize < (int) channel.size()) {
channel.truncate(targetSize);
size.set(targetSize);
}
return originalSize - targetSize;
}
Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。