背景
读写缓存,总的来说,写缓存肯定是为了读服务的,试想如果写入时候不写入缓存,那么查询会是什么情况呢
我们已经知道查询是分两步的
首先是查 MVCC,查出有哪些 key 及 Revision 满足 range 条件;这一步就是查内存,不需要缓存
其次是查 boltdb,使用 Revision 来查询对应的 value;这一步是需要查磁盘的,如果没有缓存,那么每次都需要先把 Bucket 读到内存中,然后获取 Bucket 中的 Cursor 游标,再通过游标来遍历这个 Bucket 找到满足条件的 KV
bn := bucketType.ID()
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
lockHeld := false
if !ok {
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
}
// ignore missing bucket since may have been created in this batch
if bucket == nil {
if lockHeld {
baseReadTx.txMu.Unlock()
}
return keys, vals
}
if !lockHeld {
baseReadTx.txMu.Lock()
}
c := bucket.Cursor()
baseReadTx.txMu.Unlock()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
写缓存
写缓存将 kv 写入到 Bucket 中后,还会将 kv 写入到 写事务的缓存中;需要注意的是这里的 key 对应的是 Revision ,所以肯定是有序的
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}
写事务的缓存其实就是一个数组,用于存放 kv ,以及记录存放的 kv 数量
func newBucketBuffer() *bucketBuffer {
return &bucketBuffer{buf: make([]kv, 512), used: 0}
}
在 put 操作完成后,释放独占锁之前,再将写事务中的缓存同步到 读事务 的缓存中;以此来加速读
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock()
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
t.batchTx.Unlock()
}
缓存的同步,其实就是将写事务缓存中的 KV 同步到读事务的缓存中
func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
for k, wb := range txw.buckets {
rb, ok := txr.buckets[k]
if !ok {
delete(txw.buckets, k)
txr.buckets[k] = wb
continue
}
rb.merge(wb)
}
txw.reset()
// increase the buffer version
txr.bufVersion++
}
但是这可能涉及到多种情形
读缓存为空,此时只需要将写缓存全部复制过去即可;然后清空写缓存,记录读缓存的版本号,从0开始自增
读缓存已经有数据了,此时需要进行 key 合并;首先将写缓存中的 kv 全部追加到读缓存中,如果追加后 key 仍然是有序的,说明新追加的 key 没有重复的,不需要额外处理;但是如果存在重复的 key ,那么需要进行 key 合并,用后面的 kv 覆盖前面的 kv
合并时候,先对 key 进行一次稳定排序;保障相同 key 的先后顺序,然后通过读写指针的方式,widx 是写指针,ridx 是读指针,用后面的 kv 覆盖前面的
func (bb *bucketBuffer) dedupe() {
if bb.used <= 1 {
return
}
sort.Stable(bb)
widx := 0
for ridx := 1; ridx < bb.used; ridx++ {
if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
widx++
}
bb.buf[widx] = bb.buf[ridx]
}
bb.used = widx + 1
}
由于这里需要操作读缓存,可以看到需要对读事务进行加锁
t.backend.readTx.Lock()
读缓存
读包括两种模式,一种是共享读,一种是并发读
共享读
共享读针对的是开启事务读的时候,指定的是 非 ConcurrentReadTxMode 模式,这时候就是直接使用 boltdb 中的读事务
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
}
可以看到读事务中是携带了缓存的,写事务就是同步到这里;
因此这种读事务是需要加锁的,只需要加只读锁,避免读写事务的并发冲突;写事务在同步缓存的时候会对读事务加独占锁,也就是写事务会阻塞读事务
func (rt *readTx) RLock() { rt.mu.RLock() }
他在读取的时候会先尝试从缓存中读取 kv ,如果从缓存中能够读到完整的 kv 数据,就不需要在读 Bucket 了
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
并发读
并发读针对的是开启事务读的时候,指定的是 ConcurrentReadTxMode 模式;这时候会把读事务中的缓存数据完整的 copy 一份,每个读请求都持有自己的 buf 数据,而不是共享一份缓存数据了
func (b *backend) ConcurrentReadTx() ReadTx {
b.readTx.RLock()
defer b.readTx.RUnlock()
b.readTx.txWg.Add(1)
b.txReadBufferCache.mu.Lock()
curCache := b.txReadBufferCache.buf
curCacheVer := b.txReadBufferCache.bufVersion
curBufVer := b.readTx.buf.bufVersion
isEmptyCache := curCache == nil
isStaleCache := curCacheVer != curBufVer
var buf *txReadBuffer
switch {
case isEmptyCache:
curBuf := b.readTx.buf.unsafeCopy()
buf = &curBuf
case isStaleCache:
b.txReadBufferCache.mu.Unlock()
curBuf := b.readTx.buf.unsafeCopy()
b.txReadBufferCache.mu.Lock()
buf = &curBuf
default:
buf = curCache
}
if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
// continue if the cache is never set or no one has modified the cache
b.txReadBufferCache.buf = buf
b.txReadBufferCache.bufVersion = curBufVer
}
b.txReadBufferCache.mu.Unlock()
// concurrentReadTx is not supposed to write to its txReadBuffer
return &concurrentReadTx{
baseReadTx: baseReadTx{
buf: *buf,
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
},
}
}
这种情形相对复杂一点,逐步解析下:
因为涉及到读事务中缓存数据的 copy,因此也需要对读事务进行加锁
并发读的过程中是无锁的,为了维持读的状态,通过 WaitGroup 来让读事务提交的时候等待当前读完成
为了避免不必要的缓存 copy,额外维护了一份缓存数据,可以称之为 backend 缓存,这份缓存数据不属于读事务,也不属于写事务
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
}
-
在缓存 copy 的时候
由于涉及到 backend 缓存的更新,因此需要加锁
b.txReadBufferCache.mu.Lock()
如果不存在 backend 缓存,说明当前协程是首次触发并发读的,直接 copy 读缓存到 backend 缓存
如果已经存在 backend 缓存,但是缓存版本不一致,说明 backend 缓存已经过期了,需要重新 copy ,此时会先释放锁,再进行 copy,copy 完成后在加锁更新,这样的好处是避免 copy 数量较多的时候长时间占用锁
其他情况说明 backend 缓存仍然可用,此时不需要重新 copy
在尝试更新 backend 缓存的时候,仅当 backend 缓存为空,或者没有其它协程同时更新时,当前协程才会更新 backend 缓存;因为在上述流程中,在 copy 过程中是会释放锁的,有可能其它协程在这个时候已经更新了 backend 缓存;其它协程更新 backend 缓存对当前协程来说也没影响,因为当前协程也是完整 copy 了一份读事务的缓存,而 backend 缓存只需要有一个协程更新即可
最终返回的时候,返回的是 copy 后的缓存数据,因此读的时候会先从 copy 后的缓存中读
由于每个读都从读各个协程自己的缓存,因此并发读是完全不需要加锁的,仅开始并发读的时候,创建 concurrentReadTx 时需要对读事务加锁进行缓存 copy;后续的读操作都不需要加锁完成
func (rt *concurrentReadTx) RLock() {}
// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
读完成之后,在释放 WaitGroup ;确保读事务提交不会被阻塞;在读事务通过 Rollback 提交时候,会等待当前读事务下的所有并发读都能够异步完成后再提交;然后再开始新的读事务;boltdb 中读事务是通过 Rollback 提交的,写事务才是通过 Commit 提交
if t.backend.readTx.tx != nil {
// wait all store read transactions using the current boltdb tx to finish,
// then close the boltdb tx
go func(tx *bolt.Tx, wg *sync.WaitGroup) {
wg.Wait()
if err := tx.Rollback(); err != nil {
t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
}
}(t.backend.readTx.tx, t.backend.readTx.txWg)
t.backend.readTx.reset()
}
事务TNX
在 etcdctl 中可以通过 tnx 命令来将多个命令以事务的形式一起执行;形式如下,通过 compares 指定多个条件,在条件全部满足时执行 success 中的命令,在一个或者多个条件不满足时,执行 failure 中的命令
$ etcdctl txn
# 输入 compare 条件
compares:
value("foo") = "bar"
# 输入 success 时的操作
success requests:
put foo new-bar
# 输入失败时的操作
failure requests:
put other fail
tnx 的实现也很简单,按照是否存在 put、delete 操作可以将事务分为只读事务和读写事务
和前面单独的 put 、get 命令类似,在开启读事务或者写事务的时候就会先对 boltdb 中的事务加锁,如写事务中 tx.LockInsideApply()
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
然后在依次执行 compares 以及 success 或者 failure 中的多条命令
最后才通过 End 来释放事务锁
func (tw *storeTxnWrite) End() {
tw.tx.Unlock()
}
而 boltdb 中的事务是异步、周期性的提交的;可以看到这里提交事务前会前去读取当前事务中的 put 操作数量 pending
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
而读取 pending 的时候是需要先获取到事务的锁的,如果获取不到就说明还有事务操作没执行完,那么当前事务不会提交;会一直等待事务锁被其它协程释放后,才会把 boltdb 中的事务提交,以此来达到事务的效果
func (t *batchTx) safePending() int {
t.Mutex.Lock()
defer t.Mutex.Unlock()
return t.pending
}