所有的写操作在写入memtable
之前都必须先成功写入log
文件中,主要两点好处:
- 可以将随机的写IO变成append,极大的提高写磁盘速度;
- 防止在节点宕机导致内存数据丢失,造成数据丢失。
格式
The log file contents are a sequence of 32KB blocks. The only
exception is that the tail of the file may contain a partial block.
Each block consists of a sequence of records:
block := record* trailer?
record :=
checksum: uint32 // crc32c of type and data[] ; little-endian
length: uint16 // little-endian
type: uint8 // One of FULL, FIRST, MIDDLE, LAST
data: uint8[length]
A record never starts within the last six bytes of a block (since it
won't fit). Any leftover bytes here form the trailer, which must
consist entirely of zero bytes and must be skipped by readers.
也就是说,日志文件由连续的大小为32KB的block
组成,block
又由连续的record
组成,record
的格式为
| CRC(4 byte) | Length(2 byte) | type(1 byte) | data |
Type
有四种类型
- FULL: 说明该log record包含一个完整的user record;
- FIRST,说明是user record的第一条log record
- MIDDLE,说明是user record中间的log record
- LAST,说明是user record最后的一条log record
看文档上给出的一个例子:
Example: consider a sequence of user records:
A: length 1000
B: length 97270
C: length 8000
A will be stored as a FULL record in the first block.
B will be split into three fragments: first fragment occupies the rest
of the first block, second fragment occupies the entirety of the
second block, and the third fragment occupies a prefix of the third
block. This will leave six bytes free in the third block, which will
be left empty as the trailer.
C will be stored as a FULL record in the fourth block.
Note: 由于一条logrecord
长度最短为7,如果一个block的剩余空间小于7byte,那么将被填充为空字符串,另外长度为7的logrecord
是不包括任何用户数据的。
log_format.h
namespace leveldb {
namespace log {
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
static const int kMaxRecordType = kLastType;
static const int kBlockSize = 32768;
// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;
} // namespace log
} // namespace leveldb
Writer
先看下Writer
类:
class Writer {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(WritableFile* dest);
// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
Writer(WritableFile* dest, uint64_t dest_length);
~Writer();
Status AddRecord(const Slice& slice);
private:
WritableFile* dest_;
int block_offset_; // Current offset in block
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
// No copying allowed
Writer(const Writer&);
void operator=(const Writer&);
};
类的结构比较简单,公开借口只有一个传入Slice
参数的AddRecord
,由于RecordType
是固定的几种,所以为了效率,类的成员type_crc_
数组,这里存放的为RecordType
预先计算的CRC32
值。
下面分析AddRecord
的实现
AddRecord
1.首先取出Slice
的字符串指针和长度,初始化begin=true,表明这是一条record
的开始
const char* ptr = slice.data();
size_t left = slice.size();
bool begin = true;
2.然后进入一个do{}while循环,直到写入出错,或者成功写入全部数据
- 检查当前
block
的大小是否小于kHeaderSize
,如果小于则先将,剩余的部份补0
,然后重制块位移
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
assert(kHeaderSize == 7);
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
- 计算
block
剩余大小,本次可写入长度
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
- 判断logType
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
- 调用EmitPhysicalRecord函数,append日志;并更新指针、剩余长度和begin标记。
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
EmitPhysicalRecord
这是实际写入的地方
- 首先计算
head
,并append到log中
// Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(n & 0xff);
buf[5] = static_cast<char>(n >> 8);
buf[6] = static_cast<char>(t);
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
- 写入,并Flush,更新block的当前偏移
if (s.ok()) {
s = dest_->Append(Slice(ptr, n));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + n;
Reader
先看下Reader
类的成员变量(基本都有注释,无需详解)
SequentialFile* const file_;
Reporter* const reporter_;
bool const checksum_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
// Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_;
// Offset at which to start looking for the first record to return
uint64_t const initial_offset_;
// True if we are resynchronizing after a seek (initial_offset_ > 0). In
// particular, a run of kMiddleType and kLastType records can be silently
// skipped in this mode
bool resyncing_;
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
// Returned whenever we find an invalid physical record.
// Currently there are three situations in which this happens:
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported)
// * The record is below constructor's initial_offset (No drop is reported)
kBadRecord = kMaxRecordType + 2
};
需要注意的两个类
Reporter : 用来记录错误的产生
SequentialFile: 用来从log文件中读取数据
Reader
类公开了两个接口ReadRecord
和LastRecordOffset
,比较重要的也是ReadRecord
,下面重点分析一下这个函数:
- 根据
initialoffset
跳转到调用者指定的位置,开始读取日志文件。跳转就是直接调用SequentialFile的Seek接口。另外,需要先调整调用者传入的initialoffset
参数,调整和跳转逻辑在SkipToInitialBlock
函数中。
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
SkipToInitialBlock
的函数逻辑如下
bool Reader::SkipToInitialBlock() {
// 计算在block内的偏移位置,并圆整到开始读取block的起始位置
size_t offset_in_block = initial_offset_ % kBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;
// 如果偏移在最后的6byte里,肯定不是一条完整的记录,跳到下一个block
if (offset_in_block > kBlockSize - 6) {
offset_in_block = 0;
block_start_location += kBlockSize;
}
// 设置读取偏移
end_of_buffer_offset_ = block_start_location;
// Skip to start of first block that can contain the initial record
if (block_start_location > 0) {
Status skip_status = file_->Skip(block_start_location);
if (!skip_status.ok()) {
ReportDrop(block_start_location, skip_status);
return false;
}
}
return true;
}
- 进入
while
循环之前进行一些标记
bool in_fragmented_record = false;// 是否遇到FIRST类型的type
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;//正在读取的逻辑record的偏移
- 进入到while(true)循环,直到读取到KLastType或者KFullType的record,或者到了文件结尾。读取出现错误时,并不会退出循环,而是汇报错误,继续执行,直到成功读取一条user record,或者遇到文件结尾。
- 首先读取一个
record
采用的是ReadPhysicalRecord
函数
- 首先读取一个
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < kHeaderSize) {
// 如果未到达文件结尾,清空buffer,读取数据
if (!eof_) {
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) { // 实际读取字节<指定(Block Size),表明到了文件结尾
eof_ = true;
}
continue;
} else {
buffer_.clear();
return kEof;
}
}
// 解析record头
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
// 长度超出,汇报错误
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
return kEof;
}
if (type == kZeroType && length == 0) { // 对于Zero Type类型处理
buffer_.clear();
return kBadRecord;
}
// 检查crc
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) { // 如果出错,汇报错误
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}
- 接下来,对所读取的
record
进行判断- 如果一开始就读到kMiddleType, kLastType的
record
显然是不正确(完整)的,所以简单的抛弃,继续读后面的。
- 如果一开始就读到kMiddleType, kLastType的
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}
- 根据类型采取相应动作(逻辑非常简单)
switch (record_type) {
case kFullType:
if (in_fragmented_record) {
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
if (scratch->empty()) {
in_fragmented_record = false;
} else {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}