本文主要讲解了RocksDB中二阶段提交的实现。本文总结一下共有如下几个要点:
- Modification of the WAL format
- Extension of the existing transaction API
- Modification of the write path
- Modification of the recovery path
- Integration with MyRocks
1、 Modification of WAL Format
WAL包含一个或多个log文件,每个log的内容都是序列化后的WriteBatches,在执行recovery 时,WriteBatches 可以从logs种重建出来。要修改WAL的格式或者扩展其功能,只需要关注WriteBatch即可。
WriteBatch就是Records的有序集合,这些Record主要包括Put(k,v), Merge(k,v), Delete(k), SingleDelete(k),每一个都代表了RocksDB的一种写操作。每一个Record都有一个二进制的字符串表示。当Records 添加到WriteBatch时,他们的二进制表示也被append到WriteBatch的二进制字符串表示中。WriteBatch的二进制字符串前缀是其起始的序列号以及batch中的record 个数。每个record都会有一个column family modifier record(如果column family是default的话,可以省略)。
可以通过扩展WriteBatch::Handler来遍历WriteBatch并执行一些操作。MemTableInserter 就是WriteBatch::Handler的扩展,其功能就是将WriteBatch中的操作写入到对应的 column family的MemTable中。
WriteBatch的逻辑形式有可能是这样:
Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
2PC的WriteBatch format还包括另外四条Records
- Prepare(xid)
- EndPrepare()
- Commit(xid)
- Rollback(xid)
一个可以2PC的WriteBatch可能类似下面的逻辑:
Sequence(0);NumRecords(6);Prepare(foo);Put(a,b);Put(x,y);EndPrepare();Put(j,k);Commit(foo);
Prepare(foo)和EndPrepare()之间的记录是transaction (ID='foo')的操作。Commit(foo)表示提交这个transaction,Rollback(foo)表示回滚这个transaction。
Sequence ID Distribution
当WriteBatch通过MemTableInserter被写入到memtable时,WriteBatch中的每一个operation的sequence ID加上这个WriteBatch中的Oprator的index。但是,在2PC的WriteBatch中并没有继续保持这种sequence id的映射方法。Operations contained within a Prepare() enclosure will consume sequence IDs as if they were inserted starting at the location of their relative Commit() marker. This Commit() marker may be in a different WriteBatch or log from the prepared operations to which it applies.
Backwards Compatibility
WAL format并没有版本话,所以我们需要注意后相兼容。当前版本的RocksDB不能从一个包含2PC 标记的WAL 文件中recovery。在实际recover时,遇到不能识别的Record会打印fatal 信息。有点麻烦,但是开发者可以对当前的RocksDB版本打patch以便能够跳过prepared sections和不能识别的markers,这样就可以从新版本的WAL format 恢复数据。
2、Extension of Transaction API
当前我们只focus到乐观事物的2PC。client必须提前声明是否使用二阶段提交,例如以下代码:
TransactionDB* db;
TransactionDB::Open(Options(), TransactionDBOptions(), "foodb", &db);
TransactionOptions txn_options;
txn_options.two_phase_commit = tr
txn_options.xid = "12345";
Transaction* txn = db->BeginTransaction(write_options, txn_options);
txn->Put(...);
txn->Prepare();
txn->Commit();
transaction状态有:
enum ExecutionStatus {
STARTED = 0,
AWAITING_PREPARE = 1,
PREPARED = 2,
AWAITING_COMMIT = 3,
COMMITED = 4,
AWAITING_ROLLBACK = 5,
ROLLEDBACK = 6,
LOCKS_STOLEN = 7,
};
transaction API会调用一个Prepare()函数。Prepare函数会通过一个context调用WriteImpl,通过context,WriteImpl和WriteThread可以访问ExcutionStatus、XID和WriteBatch。WriteBatch会先写入一个Prepare(xid)标记,然后写入WriteBatch的内容,再写入EndPrepare()标记。这期间并没有memtable的写入。当transaction执行了commit时,会再次调用WriteImpl。此时,Commit()标记会写入WAL,WriteBatch的内容会写入相应的memtable。当transaction调用Rollback()时,transaction内容会被清除,然后调用WriteImpl,写入Rollback(xid)标记(如果当前事物处于Prepare状态)。
这些所谓的"meta markers"(Prepare(xid), EndPrepare(), Commit(xid), Rollback(xid))不会直接写入到write batch中。write path (WriteImpl())会持有正在写的事物的context,并使用这个context将相关的markers写入到WAL(所以这些标记在写入到WAL之前先写入到聚合后的WriteBatch)。在recovery时,这些标记会被MemTableInserter 用来重建prepared transactions。
Transaction Wallclock Expiration
在transaction 提交时,会有一个callback,这个callback在transaction过期后会fail掉整个写操作。如果transaction过期了,那么锁很容易被其他transction抢占。如果一个transaction在prepare阶段没有过期的话,那么也不可能在commit阶段过期。
TransactionDB Modification
使用transaction前,client必须打开一个TransactionDB。这个TransactionDB 实例接下来就可以创建Transactions。TransactionDB 会持有一个映射(from XID to 其创建的所有两阶段的Transaction)。当Transaction被删除或者Rollback时,就会从mapping中删除掉。RocksDB提供API来查询所有正在进行中的处于Prepare状态的transaction。
TransactionDB 记录着一个min heap(所有包含prepared section的log numbers)。当transaction处于prepared状态时,WriteBatch也会写入log,这个log number就会存储在transaction 对象中,随后存入到min heap。当transaction commit时,log number就会从min heap中删除,但是log number并不会用于被遗忘掉。接下来,就是各个memtable来记录the oldest log,直到memtable flush到L0为止。
3、Modification of the Write Path
write path可以被拆解为两个主要点:DBImpl::WriteImpl(...) and the MemTableInserter。多个client线程都会调用WriteImpl。第一个线程会被设定角色为 leader,剩余的线程会被设定为follower。leader和followers会被group到一起,成为一个逻辑上的write group。leader负责取出writegroup中的所有WriteBatches,聚合在一起,然后将blob写入到WAL。结合writegroup的大小和当前内存表对并行写的支持,leader可以将所有WriteBatches写入到memtable,也可以由各个线程写入线程自己负责的WrtieBatches到内存表中。
所有的memtable inserts都是由MemTableInserter负责。 a WriteBatch iterator handler也是WriteBatch::Handler的一种实现。这个handler遍历WriteBatch中的所有元素(Put, Delete, Merge),将每个call写入到对应的MemTable。MemTableInserter 也会处理已就绪的merges, deletes and updates。
Modification of the write path需要传入一个参数到DBImpl::WriteImpl,这个参数是一个指针,指向一个2PC的transaction实例。通过这个实例,可以查询到二阶段transaction的当前状态。一个2PC transaction会在preparation、commit和roll-back时各调用一次WriteImpl 。
Status DBImpl::WriteImpl(
const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback,
Transaction* txn
) {
WriteThread::Writer w;
//...
w.txn = txn; // writethreads also have txn context for memtable insert
// we are now the group leader
int total_count = 0;
uint64_t total_byte_size = 0;
for (auto writer : write_group) {
if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMem())
total_count += WriteBatchInternal::Count(writer->batch)
}
}
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count;
// now we produce the WAL entry from our write group
for (auto writer : write_group) {
// currently only optimistic transactions use callbacks
// and optimistic transaction do not support 2pc
if (writer->CallbackFailed()) {
continue;
} else if (writer->IsCommitPhase()) {
WriteBatchInternal::MarkCommit(merged_batch, writer->txn->XID_);
} else if (writer->IsRollbackPhase()) {
WriteBatchInternal::MarkRollback(merged_batch, writer->txn->XID_);
} else if (writer->IsPreparePhase()) {
WriteBatchInternal::MarkBeginPrepare(merged_batch, writer->txn->XID_);
WriteBatchInternal::Append(merged_batch, writer->batch);
WriteBatchInternal::MarkEndPrepare(merged_batch);
writer->txn->log_number_ = logfile_number_;
} else {
assert(writer->ShouldWriteToMem());
WriteBatchInternal::Append(merged_batch, writer->batch);
}
}
//now do MemTable Inserts for WriteGroup
}
WriteBatchInternal::InsertInto也可以调整为只遍历没有相关联的Transaction 或处于COMMIT状态的写。由上述代码可以看出,当transaction处于prepared状态时,transaction会记录log num。在insert时,每个Memtable都会记录最小的log number。
4、Modification of Recovery Path
当前的recovery path已经很好地适配了两阶段提交,按照顺序,依次遍历log中的所有batches,按照log number 依次feed到MemTableInserter。MemTableInserter 会遍历所有的batches,然后将值写入到正确的MemTable中。基于当前的log number,每个MemTable知道该忽略掉哪些values。
要想recovery 时可以处理2PC的一些操作,我们需要扩展MemTableInserter ,使其感知到4个新的meta markers。
需要记住的是:当2PC transaction commit时,就会包含一些操作在多个CF上的insertions。这些MemTable是在不同的时间点上执行flush。我们仍然可以使用CF的log number,在recovered, two phase, committed transaction时避免重复写入。
1、Two Phase Transactions TXN inserts into CFA and CFB
2、TXN prepared to LOG 1
3、TXN marked as COMMITTED in LOG 2
4、TXN is inserted into MemTables
5、CFA is flushed to L0
6、CFA log_number is now LOG 3
7、CFB has not been flushed and it still referencing LOG 1 prep section
8、CRASH RECOVERY
9、LOG 1 is still around because CFB was referencing LOG 1 prep section
10、Iterate over logs starting at LOG 1
11、CFB has prepared values reinserted into mem, again referencing LOG 1 prep section
12、CFA skips insertion from commit marker in LOG 2 because it is 13、consistent to LOG 3
13、CFB is flushed to L0 and is now consistent to LOG 3
14、LOG 1, LOG 2 can now be released
Rebuilding Transactions
如上所述,modification of the recovery path只需修改MemTableInserter ,使其可以handle 新的meta-markers即可。在recovery时,我们不能访问TransactionDB的实例,我们必须重建一个hollow ‘shill’的transaction。这就是所有recovered prepared transactions的衣蛾mapping(XID → (WriteBatch, log_number))。当遇到一个Commit(xid) marker时,就会尝试查找对应xid的shill transaction,然后写入到Mem。如果遇到一个rollback(xid) marker,我们就会delete 这个shill transaction。recovery末期,以shill的形式剩下一个所有处于Prepared状态的transaction。
log lifespan
要想知道最小的log,我们必须找到每个CF的最小的log number。我们也需要考虑TransactionDB的prepared sections heap中的最小value。这代表了最早的log(包含一个还没有提交的prepared section)。我们也需要考虑all MemTables和没有flush的ImmutableMemTables 的最小prep section。这三种value的最小值就是含有数据但是还没有flush到L0的最早的log。