公司的业务里面使用了Consul做服务发现, 发现其有一个watch机制.这个watch机制引起我的好奇, 因为刚好在看Etcd-raft的代码, Etcd也有类似的watch机制, 所以趁热打铁, 赶紧趁周末研究下etcd watch机制源码的实现.
在看源码之前, 我们通过一个简单的例子, 看看Etcd的watch是如何使用的.
- 先往Etcd写入一对KV
curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value="神蛋使者"
- Watch这对KV
如果一切正常, 这时候请求会被阻塞住.
- 新开一个终端, 修改存进去的KV
curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value=神蛋使者1号
- 阻塞的那个请求返回watch到的结果
{
"action":"set",
"node":{
"key":"/name",
"value":"神蛋使者1号",
"modifiedIndex":25,
"createdIndex":25
},
"prevNode": {
"key":"/name",
"value":"神蛋使者",
"modifiedIndex":24,
"createdIndex":24
}
}
体验流程大概就是这样, 下面正式看源码.
接口定义
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
}
该接口定义了两个方法, Watch 和 Close
Watch 方法返回一个WatchChan 类似的变量, WatchChan是一个channel, 定义如下:
type WatchChan <-chan WatchResponse
该通道传递WatchResponse类型
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
// CompactRevision is the minimum revision the watcher may receive.
CompactRevision int64
// Canceled is used to indicate watch failure.
// If the watch failed and the stream was about to close, before the channel is closed,
// the channel sends a final response that has Canceled set to true with a non-nil Err().
Canceled bool
// Created is used to indicate the creation of the watcher.
Created bool
closeErr error
}
其中Event类型是一个gRPC生成的消息对象
type Event struct {
// type is the kind of event. If type is a PUT, it indicates
// new data has been stored to the key. If type is a DELETE,
// it indicates the key was deleted.
Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
// kv holds the KeyValue for the event.
// A PUT event contains current kv pair.
// A PUT event with kv.Version=1 indicates the creation of a key.
// A DELETE/EXPIRE event contains the deleted key with
// its modification revision set to the revision of deletion.
Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
// prev_kv holds the key-value pair before the event happens.
PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
接下来看实现了Watcher接口的watcher类型
// watcher implements the Watcher interface
type watcher struct {
remote pb.WatchClient
// mu protects the grpc streams map
mu sync.RWMutex
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
}
watcher结构很简单, 只有3个字段. remote抽象了发起watch请求的客户端, streams是一个map, 这个map映射了交互的数据流.还有一个保护并发环境下数据流读写安全的读写锁.
streams所属的watchGrpcStream类型抽象了所有交互的数据, 它的结构定义如下:
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
// resuming holds all resuming watchers on this grpc stream
resuming []*watcherStream
// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream
// wg is Done when all substream goroutines have exited
wg sync.WaitGroup
// resumec closes to signal that all substreams should begin resuming
resumec chan struct{}
// closeErr is the error that closed the watch stream
closeErr error
}
比较有意思的是, watchGrpcStream也包含了一个watcher类型的owner字段, watcher和watchGrpcStream可以互相引用到对方.同时又定义了watcher类型中已经定义过的remote,而且还不是指针类型, 这点不大明白作用是啥.
还有几个字段值得关注, 一个是substreams, 看下它的定义和注释:
// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
再看看watcherStream类型的定义:
// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
initReq watchRequest
// outc publishes watch responses to subscriber
outc chan WatchResponse
// recvc buffers watch responses before publishing
recvc chan *WatchResponse
// donec closes when the watcherStream goroutine stops.
donec chan struct{}
// closing is set to true when stream should be scheduled to shutdown.
closing bool
// id is the registered watch id on the grpc stream
id int64
// buf holds all events received from etcd but not yet consumed by the client
buf []*WatchResponse
}
画个图整理下他们之间的关系:

接下来轮到watcher是如何watch方法的了:
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// 应用配置
ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
// 根据传入的参数构造watch请求
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
// 将请求上下文格式化为字符串
ctxKey := fmt.Sprintf("%v", ctx)
// find or allocate appropriate grpc watch stream
// 接下来配置对应的输出流, 注意得加锁
w.mu.Lock()
// 如果stream为空, 返回一个已经关闭的channel.
// 这种情况应该是防止streams为空的情况
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
// 注意这里, 前面我们提到streams是一个map,该map的key是请求上下文
// 如果该请求对应的流为空,则新建
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// couldn't create channel; return closed channel
// couldn't create channel; return closed channel
// 这里要设置为缓冲的原因可能与下面的两个
// closeCh <- WatchResponse{closeErr: wgs.closeErr}
// 语句有关,这里不理解
closeCh := make(chan WatchResponse, 1)
// submit request
select {
// 发送上面构造好的watch请求给对应的流
case reqc <- wr:
ok = true
// 请求断开(这里应该囊括了客户端请求断开的所有情况)
case <-wr.ctx.Done():
// watch完成
// 这里应该是处理非正常完成的情况
// 注意下面的重试逻辑
case <-donec:
if wgs.closeErr != nil {
// 如果不是空上下文导致流被丢弃的情况
// 则不应该重试
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
// receive channel
// 如果是初始请求顺利发送才会执行这里
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
}
close(closeCh)
return closeCh
}
还有Watcher接口的另一个方法Close:
func (w *watcher) Close() (err error) {
// 在锁内先将streams字段置为空
// 在锁外再将一个个流都关闭
// 这样做的意义在于不管哪个流关闭失败了
// 都能先保证streams与这些流的关系被切断
w.mu.Lock()
streams := w.streams
w.streams = nil
w.mu.Unlock()
for _, wgs := range streams {
if werr := wgs.Close(); werr != nil {
err = werr
}
}
// etcd竟然也只是返回一个error
// 虽然上面的for循环可能产生多个error
return err
}
这样watcher就实现了Watcher接口.大致的实现思路本文就介绍到这里,剩下的代码也都是对其他相关数据结构的逻辑包装操作.
简单阅读Etcd的这一小部分源码下来, 我看到他们源码中的两个东西,算是Golang或者编程上面的一些最佳实践:
对包外只暴露一个公共接口, 包内的结构体实现该接口即可.就像本文中的Watcher接口和watcher结构体.这样有两个好处, 一个就是代码能够解耦,还有就是可以省去命名的苦恼(__)
另一个是注释的书写方式,我发现etcd源码里的注释很大一部分写在变量的定义上面,而且变量的定义名都很清晰.
抽象得体.这个其实不只是Etcd, 其他任何优秀的开源作品都把他们的代码抽象得很到位.突然想起我写的那些渣渣代码%>_<%
最后, 总结下etcd的watch机制.其实归根结底, 它的watch是通过gRPC的多路复用实现的,这是一个基于HTTP/2的特性.所以本文可能有些偏离了主题,探讨Etcd的watch机制, 其实应该研究HTTP/2才是.
算是给自己挖个坑.