Etcd watch源码阅读

公司的业务里面使用了Consul做服务发现, 发现其有一个watch机制.这个watch机制引起我的好奇, 因为刚好在看Etcd-raft的代码, Etcd也有类似的watch机制, 所以趁热打铁, 赶紧趁周末研究下etcd watch机制源码的实现.

在看源码之前, 我们通过一个简单的例子, 看看Etcd的watch是如何使用的.

  1. 先往Etcd写入一对KV

curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value="神蛋使者"

  1. Watch这对KV

curl http://127.0.0.1:2379/v2/keys/name?wait=true

如果一切正常, 这时候请求会被阻塞住.

  1. 新开一个终端, 修改存进去的KV

curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value=神蛋使者1号

  1. 阻塞的那个请求返回watch到的结果
{
  "action":"set",
  "node":{ 
      "key":"/name",
      "value":"神蛋使者1号",
      "modifiedIndex":25,
     "createdIndex":25
  },
   "prevNode": {
     "key":"/name",
     "value":"神蛋使者",
     "modifiedIndex":24,
     "createdIndex":24
   }
  }

体验流程大概就是这样, 下面正式看源码.

接口定义

type Watcher interface {
    // Watch watches on a key or prefix. The watched events will be returned
    // through the returned channel.
    // If the watch is slow or the required rev is compacted, the watch request
    // might be canceled from the server-side and the chan will be closed.
    // 'opts' can be: 'WithRev' and/or 'WithPrefix'.
    Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

    // Close closes the watcher and cancels all watch requests.
    Close() error
}

该接口定义了两个方法, Watch 和 Close

Watch 方法返回一个WatchChan 类似的变量, WatchChan是一个channel, 定义如下:

type WatchChan <-chan WatchResponse

该通道传递WatchResponse类型

type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event

    // CompactRevision is the minimum revision the watcher may receive.
    CompactRevision int64

    // Canceled is used to indicate watch failure.
    // If the watch failed and the stream was about to close, before the channel is closed,
    // the channel sends a final response that has Canceled set to true with a non-nil Err().
    Canceled bool

    // Created is used to indicate the creation of the watcher.
    Created bool

    closeErr error
}

其中Event类型是一个gRPC生成的消息对象

type Event struct {
    // type is the kind of event. If type is a PUT, it indicates
    // new data has been stored to the key. If type is a DELETE,
    // it indicates the key was deleted.
    Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
    // kv holds the KeyValue for the event.
    // A PUT event contains current kv pair.
    // A PUT event with kv.Version=1 indicates the creation of a key.
    // A DELETE/EXPIRE event contains the deleted key with
    // its modification revision set to the revision of deletion.
    Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
    // prev_kv holds the key-value pair before the event happens.
    PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

接下来看实现了Watcher接口的watcher类型

// watcher implements the Watcher interface
type watcher struct {
    remote pb.WatchClient

    // mu protects the grpc streams map
    mu sync.RWMutex

    // streams holds all the active grpc streams keyed by ctx value.
    streams map[string]*watchGrpcStream
}

watcher结构很简单, 只有3个字段. remote抽象了发起watch请求的客户端, streams是一个map, 这个map映射了交互的数据流.还有一个保护并发环境下数据流读写安全的读写锁.

streams所属的watchGrpcStream类型抽象了所有交互的数据, 它的结构定义如下:

type watchGrpcStream struct {
    owner  *watcher
    remote pb.WatchClient

    // ctx controls internal remote.Watch requests
    ctx context.Context
    // ctxKey is the key used when looking up this stream's context
    ctxKey string
    cancel context.CancelFunc

    // substreams holds all active watchers on this grpc stream
    substreams map[int64]*watcherStream
    // resuming holds all resuming watchers on this grpc stream
    resuming []*watcherStream

    // reqc sends a watch request from Watch() to the main goroutine
    reqc chan *watchRequest
    // respc receives data from the watch client
    respc chan *pb.WatchResponse
    // donec closes to broadcast shutdown
    donec chan struct{}
    // errc transmits errors from grpc Recv to the watch stream reconn logic
    errc chan error
    // closingc gets the watcherStream of closing watchers
    closingc chan *watcherStream
    // wg is Done when all substream goroutines have exited
    wg sync.WaitGroup

    // resumec closes to signal that all substreams should begin resuming
    resumec chan struct{}
    // closeErr is the error that closed the watch stream
    closeErr error
}

比较有意思的是, watchGrpcStream也包含了一个watcher类型的owner字段, watcher和watchGrpcStream可以互相引用到对方.同时又定义了watcher类型中已经定义过的remote,而且还不是指针类型, 这点不大明白作用是啥.

还有几个字段值得关注, 一个是substreams, 看下它的定义和注释:

// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream

再看看watcherStream类型的定义:

// watcherStream represents a registered watcher
type watcherStream struct {
    // initReq is the request that initiated this request
    initReq watchRequest

    // outc publishes watch responses to subscriber
    outc chan WatchResponse
    // recvc buffers watch responses before publishing
    recvc chan *WatchResponse
    // donec closes when the watcherStream goroutine stops.
    donec chan struct{}
    // closing is set to true when stream should be scheduled to shutdown.
    closing bool
    // id is the registered watch id on the grpc stream
    id int64

    // buf holds all events received from etcd but not yet consumed by the client
    buf []*WatchResponse
}

画个图整理下他们之间的关系:

下载.png

接下来轮到watcher是如何watch方法的了:

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    // 应用配置
    ow := opWatch(key, opts...)

    var filters []pb.WatchCreateRequest_FilterType
    if ow.filterPut {
        filters = append(filters, pb.WatchCreateRequest_NOPUT)
    }
    if ow.filterDelete {
        filters = append(filters, pb.WatchCreateRequest_NODELETE)
    }

    // 根据传入的参数构造watch请求
    wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
    }

    ok := false
    // 将请求上下文格式化为字符串
    ctxKey := fmt.Sprintf("%v", ctx)

    // find or allocate appropriate grpc watch stream
    // 接下来配置对应的输出流, 注意得加锁
    w.mu.Lock()

    // 如果stream为空, 返回一个已经关闭的channel.
    // 这种情况应该是防止streams为空的情况
    if w.streams == nil {
        // closed
        w.mu.Unlock()
        ch := make(chan WatchResponse)
        close(ch)
        return ch
    }

    // 注意这里, 前面我们提到streams是一个map,该map的key是请求上下文
    // 如果该请求对应的流为空,则新建
    wgs := w.streams[ctxKey]
    if wgs == nil {
        wgs = w.newWatcherGrpcStream(ctx)
        w.streams[ctxKey] = wgs
    }
    donec := wgs.donec
    reqc := wgs.reqc
    w.mu.Unlock()

    // couldn't create channel; return closed channel
        // couldn't create channel; return closed channel
    // 这里要设置为缓冲的原因可能与下面的两个
    // closeCh <- WatchResponse{closeErr: wgs.closeErr}
    // 语句有关,这里不理解
    closeCh := make(chan WatchResponse, 1)

    // submit request
    select {
    // 发送上面构造好的watch请求给对应的流
    case reqc <- wr:
        ok = true
    // 请求断开(这里应该囊括了客户端请求断开的所有情况)
    case <-wr.ctx.Done():
    // watch完成
    // 这里应该是处理非正常完成的情况
    // 注意下面的重试逻辑
    case <-donec:
        if wgs.closeErr != nil {
            // 如果不是空上下文导致流被丢弃的情况
            // 则不应该重试
            closeCh <- WatchResponse{closeErr: wgs.closeErr}
            break
        }
        // retry; may have dropped stream from no ctxs
        return w.Watch(ctx, key, opts...)
    }

    // receive channel
    // 如果是初始请求顺利发送才会执行这里
    if ok {
        select {
        case ret := <-wr.retc:
            return ret
        case <-ctx.Done():
        case <-donec:
            if wgs.closeErr != nil {
                closeCh <- WatchResponse{closeErr: wgs.closeErr}
                break
            }
            // retry; may have dropped stream from no ctxs
            return w.Watch(ctx, key, opts...)
        }
    }

    close(closeCh)
    return closeCh
}

还有Watcher接口的另一个方法Close:

func (w *watcher) Close() (err error) {
    // 在锁内先将streams字段置为空
    // 在锁外再将一个个流都关闭
    // 这样做的意义在于不管哪个流关闭失败了
    // 都能先保证streams与这些流的关系被切断
    w.mu.Lock()
    streams := w.streams
    w.streams = nil
    w.mu.Unlock()
    for _, wgs := range streams {
        if werr := wgs.Close(); werr != nil {
            err = werr
        }
    }
    // etcd竟然也只是返回一个error
    // 虽然上面的for循环可能产生多个error
    return err
}

这样watcher就实现了Watcher接口.大致的实现思路本文就介绍到这里,剩下的代码也都是对其他相关数据结构的逻辑包装操作.

简单阅读Etcd的这一小部分源码下来, 我看到他们源码中的两个东西,算是Golang或者编程上面的一些最佳实践:

  1. 对包外只暴露一个公共接口, 包内的结构体实现该接口即可.就像本文中的Watcher接口和watcher结构体.这样有两个好处, 一个就是代码能够解耦,还有就是可以省去命名的苦恼(__)

  2. 另一个是注释的书写方式,我发现etcd源码里的注释很大一部分写在变量的定义上面,而且变量的定义名都很清晰.

  3. 抽象得体.这个其实不只是Etcd, 其他任何优秀的开源作品都把他们的代码抽象得很到位.突然想起我写的那些渣渣代码%>_<%

最后, 总结下etcd的watch机制.其实归根结底, 它的watch是通过gRPC的多路复用实现的,这是一个基于HTTP/2的特性.所以本文可能有些偏离了主题,探讨Etcd的watch机制, 其实应该研究HTTP/2才是.

算是给自己挖个坑.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容