etcd lock

Etcd Lock详解

分布式情况下最终都会面临一个资源抢占的问题,解决问题的方法为抽象一个分布式锁,持有锁则可以操作资源,本文使用etcd实现一个分布式锁

简介

在正式使用之前,先简单介绍一下etcd的clientv3,etcd的client和服务器通信在v3版本为grpc,所以我们在使用时实际上就是使用grpc和服务器通信.

etcd的github地址为: https://github.com/etcd-io/etcd

在github中源代码中存在两个目录:

$ tree -L 1
├── alarm
.....
├── client
├── clientv3
.....

其中 clientv3为我们需要使用的client,本文的主角就是clientv3/concurrency

下载依赖

在明确我们使用的包后,我们直接依赖clientv3包到我们的工程中.

go get github.com/etcd-io/etcd/clientv3

如果需要梯子,请参考: 终端中设置代理

连接到etcd

在很多环境中我们启动etcd都是通过配置tls方式进行的,所以在连接etcd的时候需要使用tls的方式连接(可是百度上很多文章居然都没写.),具体的连接方式如下:

    tlsInfo := transport.TLSInfo{
        CertFile:      "etcd-v3.3.12-linux-amd64/etcd.pem",
        KeyFile:       "etcd-v3.3.12-linux-amd64/etcd-key.pem",
        TrustedCAFile: "etcd-v3.3.12-linux-amd64/ca.pem",
    }
    tlsConfig, err := tlsInfo.ClientConfig()
    if err != nil {
        log.Fatal(err)
    }
    config := clientv3.Config{
        Endpoints: []string{"127.0.0.1:12379"},#此处为etcd server监听地址
        TLS:       tlsConfig,
    }
    client, e := clientv3.New(config)
    if e != nil {
        log.Fatal(e.Error())
    }
    defer client.Close()

使用Mutex获取锁

在使用证书连接到etcd的server后,我们使用clientv3/concurrency包中的Mutex进行分布式锁

    # 生成一个30s超时的上下文
    timeout, cancelFunc := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancelFunc()
    # 获取租约
    response, e := client.Grant(timeout, 30)
    if e != nil {
        log.Fatal(e.Error())
    }
    # 通过租约创建session
    session, e := concurrency.NewSession(client, concurrency.WithLease(response.ID))
    if e != nil {
        log.Fatal(e.Error())
    }
    defer session.Close()
    # 通过session和锁前缀
    mutex := concurrency.NewMutex(session, "/lock")
    e = mutex.Lock(timeout)
    if e != nil {
        log.Fatal(e.Error())
    }

    # 业务逻辑

    # 释放锁
    defer mutex.Unlock(timeout)

整体流程还是相对复杂的,接下来我们将一点一点解析:

1.生成30s超时的上下文(context)

锁的竞争是有时间的,不可能一直竞争下去,设定一个30s的超时时间,是让mutex.Lock()的时候有一个获取锁的时间,在此时间内,如果没有获取到锁则应该重试或者提示失败.

2.client.Grant获取租约

租约设置了一个和上下文一样的超时时间,保证租约有足够的时间

3.根据租约创建一个session回话,维护租约的过期时间

在client中抽象出了一个session对象来持续保持租约不过期,具体源码为:

//会话表示在客户端的生存期内保持活动的租约。
//应用程序可能会使用会话来解释活动性。
type Session struct {
    client *v3.Client
    opts   *sessionOptions
    id     v3.LeaseID

    cancel context.CancelFunc
    donec  <-chan struct{}
}

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops)
    }

    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = v3.LeaseID(resp.ID)
    }

    ctx, cancel := context.WithCancel(ops.ctx)
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
    //在客户端错误或取消上下文之前保持租约的活动状态
    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive {
            //在保持活动频道关闭前接收信息
            // eat messages until keep alive channel closes
        }
    }()

    return s, nil
}

其实就是在newSession中启动一个协程不断读取keepAlive这个channel的数据

4.concurrency.NewMutex创建一个锁,调用lock开始竞争锁

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    //生成锁的key
    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    //使用事务机制
    //比较key的revision为0(0标示没有key)
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    //则put key,并设置租约
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    //否则 获取这个key,重用租约中的锁(这里主要目的是在于重入)
    //通过第二次获取锁,判断锁是否存在来支持重入
    //所以只要租约一致,那么是可以重入的.
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    //通过前缀获取最先创建的key
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    //获取到自身的revision(注意,此处CreateRevision和Revision不一定相等)
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // 通过对比自身的revision和最先创建的key的revision得出谁获得了锁
    // 例如 自身revision:5,最先创建的key createRevision:3  那么不获得锁,进入waitDeletes
    //     自身revision:5,最先创建的key createRevision:5  那么获得锁

    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    // 等待其他程序释放锁,并删除其他revisions
    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}
// 等待持有锁的key删除
// 内部实现为等其他所有比当前createRevision小的key,监听删除事件
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
    //WithLastCreate 按照CreateRevision排序,降序 例如 5 4 3 2 1 
    //WithMaxCreateRev 获取比maxCreateRev小的key
    getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
    for {
        resp, err := client.Get(ctx, pfx, getOpts...)
        if err != nil {
            return nil, err
        }
        if len(resp.Kvs) == 0 {
            return resp.Header, nil
        }
        lastKey := string(resp.Kvs[0].Key)
        //监听删除事件
        if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
            return nil, err
        }
    }
}
//从revision开始监听删除事件,因为revision存在,所以也避免了ABA问题
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()

    var wr v3.WatchResponse
    wch := client.Watch(cctx, key, v3.WithRev(rev))
    for wr = range wch {
        for _, ev := range wr.Events {
            //遇到删除事件才返回
            if ev.Type == mvccpb.DELETE {
                return nil
            }
        }
    }
    if err := wr.Err(); err != nil {
        return err
    }
    if err := ctx.Err(); err != nil {
        return err
    }
    return fmt.Errorf("lost watcher waiting for delete")
}

CreateRevision: 创建时的revision

Header.Revision: etcd server现在的revision

ABA问题: CAS和ABA问题

5.释放锁

这里释放锁,比较简单,就是删除此key

//释放锁
func (m *Mutex) Unlock(ctx context.Context) error {
    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}

revision

刚才说了那么多revision,这里我们需要详细的了解一下etcd中的mvcc多版本机制

那么我们先来明确几个概念:

main ID:在etcd中每个事务的唯一id,全局递增不重复.

sub ID: 在事务中的连续多个修改操作会从0开始编号,这个编号就是sub ID

revision: 由(mainID,subID)组成的唯一标识

所以现在我们的revision就是这么来的啦…

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容