ETCD《八》--读写缓存及事务tnx

背景

读写缓存,总的来说,写缓存肯定是为了读服务的,试想如果写入时候不写入缓存,那么查询会是什么情况呢

我们已经知道查询是分两步的

  • 首先是查 MVCC,查出有哪些 key 及 Revision 满足 range 条件;这一步就是查内存,不需要缓存

  • 其次是查 boltdb,使用 Revision 来查询对应的 value;这一步是需要查磁盘的,如果没有缓存,那么每次都需要先把 Bucket 读到内存中,然后获取 Bucket 中的 Cursor 游标,再通过游标来遍历这个 Bucket 找到满足条件的 KV

bn := bucketType.ID()
    baseReadTx.txMu.RLock()
    bucket, ok := baseReadTx.buckets[bn]
    baseReadTx.txMu.RUnlock()
    lockHeld := false
    if !ok {
        baseReadTx.txMu.Lock()
        lockHeld = true
        bucket = baseReadTx.tx.Bucket(bucketType.Name())
        baseReadTx.buckets[bn] = bucket
    }

    // ignore missing bucket since may have been created in this batch
    if bucket == nil {
        if lockHeld {
            baseReadTx.txMu.Unlock()
        }
        return keys, vals
    }
    if !lockHeld {
        baseReadTx.txMu.Lock()
    }
    c := bucket.Cursor()
    baseReadTx.txMu.Unlock()

    k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))

写缓存

写缓存将 kv 写入到 Bucket 中后,还会将 kv 写入到 写事务的缓存中;需要注意的是这里的 key 对应的是 Revision ,所以肯定是有序的

func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
    t.batchTx.UnsafeSeqPut(bucket, key, value)
    t.buf.putSeq(bucket, key, value)
}

写事务的缓存其实就是一个数组,用于存放 kv ,以及记录存放的 kv 数量

func newBucketBuffer() *bucketBuffer {
    return &bucketBuffer{buf: make([]kv, 512), used: 0}
}

在 put 操作完成后,释放独占锁之前,再将写事务中的缓存同步到 读事务 的缓存中;以此来加速读

func (t *batchTxBuffered) Unlock() {
    if t.pending != 0 {
        t.backend.readTx.Lock() 
        t.buf.writeback(&t.backend.readTx.buf)
        t.backend.readTx.Unlock()
        
        if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
            t.commit(false)
        }
    }
    t.batchTx.Unlock()
}

缓存的同步,其实就是将写事务缓存中的 KV 同步到读事务的缓存中

func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
    for k, wb := range txw.buckets {
        rb, ok := txr.buckets[k]
        if !ok {
            delete(txw.buckets, k)
            
            txr.buckets[k] = wb
            continue
        }
        rb.merge(wb)
    }
    txw.reset()
    // increase the buffer version
    txr.bufVersion++
}

但是这可能涉及到多种情形

  • 读缓存为空,此时只需要将写缓存全部复制过去即可;然后清空写缓存,记录读缓存的版本号,从0开始自增

  • 读缓存已经有数据了,此时需要进行 key 合并;首先将写缓存中的 kv 全部追加到读缓存中,如果追加后 key 仍然是有序的,说明新追加的 key 没有重复的,不需要额外处理;但是如果存在重复的 key ,那么需要进行 key 合并,用后面的 kv 覆盖前面的 kv

合并时候,先对 key 进行一次稳定排序;保障相同 key 的先后顺序,然后通过读写指针的方式,widx 是写指针,ridx 是读指针,用后面的 kv 覆盖前面的

func (bb *bucketBuffer) dedupe() {
    if bb.used <= 1 {
        return
    }
    sort.Stable(bb)
    widx := 0
    for ridx := 1; ridx < bb.used; ridx++ {
        if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
            widx++
        }
        bb.buf[widx] = bb.buf[ridx]
    }
    bb.used = widx + 1
}

由于这里需要操作读缓存,可以看到需要对读事务进行加锁

t.backend.readTx.Lock()

读缓存

读包括两种模式,一种是共享读,一种是并发读

共享读

共享读针对的是开启事务读的时候,指定的是 非 ConcurrentReadTxMode 模式,这时候就是直接使用 boltdb 中的读事务

readTx: &readTx{
            baseReadTx: baseReadTx{
                buf: txReadBuffer{
                    txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
                    bufVersion: 0,
                },
                buckets: make(map[BucketID]*bolt.Bucket),
                txWg:    new(sync.WaitGroup),
                txMu:    new(sync.RWMutex),
            },
        }

可以看到读事务中是携带了缓存的,写事务就是同步到这里;

因此这种读事务是需要加锁的,只需要加只读锁,避免读写事务的并发冲突;写事务在同步缓存的时候会对读事务加独占锁,也就是写事务会阻塞读事务

func (rt *readTx) RLock()   { rt.mu.RLock() }

他在读取的时候会先尝试从缓存中读取 kv ,如果从缓存中能够读到完整的 kv 数据,就不需要在读 Bucket 了

keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
    if int64(len(keys)) == limit {
        return keys, vals
    }

并发读

并发读针对的是开启事务读的时候,指定的是 ConcurrentReadTxMode 模式;这时候会把读事务中的缓存数据完整的 copy 一份,每个读请求都持有自己的 buf 数据,而不是共享一份缓存数据了

func (b *backend) ConcurrentReadTx() ReadTx {
    b.readTx.RLock()
    defer b.readTx.RUnlock()
    
    b.readTx.txWg.Add(1)

    b.txReadBufferCache.mu.Lock()

    curCache := b.txReadBufferCache.buf
    curCacheVer := b.txReadBufferCache.bufVersion
    curBufVer := b.readTx.buf.bufVersion

    isEmptyCache := curCache == nil
    isStaleCache := curCacheVer != curBufVer

    var buf *txReadBuffer
    switch {
    case isEmptyCache:
        
        curBuf := b.readTx.buf.unsafeCopy()
        buf = &curBuf
    case isStaleCache:
        
        b.txReadBufferCache.mu.Unlock()
        curBuf := b.readTx.buf.unsafeCopy()
        b.txReadBufferCache.mu.Lock()
        buf = &curBuf
    default:
        
        buf = curCache
    }
    
    if isEmptyCache || curCacheVer == b.txReadBufferCache.bufVersion {
        // continue if the cache is never set or no one has modified the cache
        b.txReadBufferCache.buf = buf
        b.txReadBufferCache.bufVersion = curBufVer
    }

    b.txReadBufferCache.mu.Unlock()

    // concurrentReadTx is not supposed to write to its txReadBuffer
    return &concurrentReadTx{
        baseReadTx: baseReadTx{
            buf:     *buf,
            txMu:    b.readTx.txMu,
            tx:      b.readTx.tx,
            buckets: b.readTx.buckets,
            txWg:    b.readTx.txWg,
        },
    }
}

这种情形相对复杂一点,逐步解析下:

  • 因为涉及到读事务中缓存数据的 copy,因此也需要对读事务进行加锁

  • 并发读的过程中是无锁的,为了维持读的状态,通过 WaitGroup 来让读事务提交的时候等待当前读完成

  • 为了避免不必要的缓存 copy,额外维护了一份缓存数据,可以称之为 backend 缓存,这份缓存数据不属于读事务,也不属于写事务

readTx: &readTx{
            baseReadTx: baseReadTx{
                buf: txReadBuffer{
                    txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
                    bufVersion: 0,
                },
                buckets: make(map[BucketID]*bolt.Bucket),
                txWg:    new(sync.WaitGroup),
                txMu:    new(sync.RWMutex),
            },
        },
txReadBufferCache: txReadBufferCache{
            mu:         sync.Mutex{},
            bufVersion: 0,
            buf:        nil,
        }
  • 在缓存 copy 的时候

    • 由于涉及到 backend 缓存的更新,因此需要加锁b.txReadBufferCache.mu.Lock()

    • 如果不存在 backend 缓存,说明当前协程是首次触发并发读的,直接 copy 读缓存到 backend 缓存

    • 如果已经存在 backend 缓存,但是缓存版本不一致,说明 backend 缓存已经过期了,需要重新 copy ,此时会先释放锁,再进行 copy,copy 完成后在加锁更新,这样的好处是避免 copy 数量较多的时候长时间占用锁

    • 其他情况说明 backend 缓存仍然可用,此时不需要重新 copy

  • 在尝试更新 backend 缓存的时候,仅当 backend 缓存为空,或者没有其它协程同时更新时,当前协程才会更新 backend 缓存;因为在上述流程中,在 copy 过程中是会释放锁的,有可能其它协程在这个时候已经更新了 backend 缓存;其它协程更新 backend 缓存对当前协程来说也没影响,因为当前协程也是完整 copy 了一份读事务的缓存,而 backend 缓存只需要有一个协程更新即可

  • 最终返回的时候,返回的是 copy 后的缓存数据,因此读的时候会先从 copy 后的缓存中读

由于每个读都从读各个协程自己的缓存,因此并发读是完全不需要加锁的,仅开始并发读的时候,创建 concurrentReadTx 时需要对读事务加锁进行缓存 copy;后续的读操作都不需要加锁完成

func (rt *concurrentReadTx) RLock() {}

// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }

读完成之后,在释放 WaitGroup ;确保读事务提交不会被阻塞;在读事务通过 Rollback 提交时候,会等待当前读事务下的所有并发读都能够异步完成后再提交;然后再开始新的读事务;boltdb 中读事务是通过 Rollback 提交的,写事务才是通过 Commit 提交

if t.backend.readTx.tx != nil {
        // wait all store read transactions using the current boltdb tx to finish,
        // then close the boltdb tx
        go func(tx *bolt.Tx, wg *sync.WaitGroup) {
            wg.Wait()
            if err := tx.Rollback(); err != nil {
                t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
            }
        }(t.backend.readTx.tx, t.backend.readTx.txWg)
        t.backend.readTx.reset()
    }

事务TNX

在 etcdctl 中可以通过 tnx 命令来将多个命令以事务的形式一起执行;形式如下,通过 compares 指定多个条件,在条件全部满足时执行 success 中的命令,在一个或者多个条件不满足时,执行 failure 中的命令

$ etcdctl txn
# 输入 compare 条件
compares:
value("foo") = "bar"

# 输入 success 时的操作
success requests:
put foo new-bar

# 输入失败时的操作
failure requests:
put other fail

tnx 的实现也很简单,按照是否存在 put、delete 操作可以将事务分为只读事务和读写事务

和前面单独的 put 、get 命令类似,在开启读事务或者写事务的时候就会先对 boltdb 中的事务加锁,如写事务中 tx.LockInsideApply()

func (s *store) Write(trace *traceutil.Trace) TxnWrite {
    s.mu.RLock()
    tx := s.b.BatchTx()
    tx.LockInsideApply()
    tw := &storeTxnWrite{
        storeTxnCommon: storeTxnCommon{s, tx, 0, 0, trace},
        tx:             tx,
        beginRev:       s.currentRev,
        changes:        make([]mvccpb.KeyValue, 0, 4),
    }
    return newMetricsTxnWrite(tw)
}

然后在依次执行 compares 以及 success 或者 failure 中的多条命令

最后才通过 End 来释放事务锁

func (tw *storeTxnWrite) End() {
    tw.tx.Unlock()
}

而 boltdb 中的事务是异步、周期性的提交的;可以看到这里提交事务前会前去读取当前事务中的 put 操作数量 pending

for {
        select {
        case <-t.C:
        case <-b.stopc:
            b.batchTx.CommitAndStop()
            return
        }
        if b.batchTx.safePending() != 0 {
            b.batchTx.Commit()
        }
        t.Reset(b.batchInterval)
    }

而读取 pending 的时候是需要先获取到事务的锁的,如果获取不到就说明还有事务操作没执行完,那么当前事务不会提交;会一直等待事务锁被其它协程释放后,才会把 boltdb 中的事务提交,以此来达到事务的效果

func (t *batchTx) safePending() int {
    t.Mutex.Lock()
    defer t.Mutex.Unlock()
    return t.pending
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容