开启Client监听
Client监听通常监听在2379端口上,用于对外提供http+grpc服务,如etcdctl客户端等
http监听
普通http监听会注册一些rest api
/debug/vars
/version
/metrics
/health
grpc监听
grpc基于http2协议
grpc层也实现了一些keepalive策略
定义 服务器端主动发送 ping 的时间间隔,用来确认客户端还在;Timeout: ping 发出后多长时间没收到 ACK 就断开连接
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: e.cfg.GRPCKeepAliveInterval, // 2h
Timeout: e.cfg.GRPCKeepAliveTimeout, // 20s
}
限制客户端的 ping 频率,防止 DoS 攻击或滥用资源(否则断开连接)
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: e.cfg.GRPCKeepAliveMinTime, // 5s
PermitWithoutStream: false,
}
grpc中的注册方法分为两类:
Unary:类似HTTP rest请求,一次请求,一次响应;但仍然是基于http2的,可以在保持连接的时候做到tcp连接复用,在同一个tcp连接上复用http多路传输
Stream:流式请求,连接不断开,采用推送的方式,类似SSE
在注册方法的时候可以选择注册为Unary方法或者stream方法,在调用的时候通过不同的拦截器进行处理
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(ctx, stream, srv, md, ti)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(ctx, stream, srv, sd, ti)
return
}
}
grpc监听和http监听是监听在同一个端口上的,通过 cmux 多路复用器同时运行 gRPC 和 HTTP 服务
m := cmux.New(sctx.l)
server = m.Serve
err = server()
在处理请求时,对于http2请求并且请求header中包含Content-Type=application/grpc
的将其转发到grpc服务、否则将其转发到http服务
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
grpcServer.ServeHTTP(w, r)
} else {
otherHandler.ServeHTTP(w, r)
}
})
grpc注册的服务端点:
-
KVServer:服务名称:/etcdserverpb.KV/ ;注册的Unary方法包括:
- Range
- Put
- DeleteRange
- Txn
- Compact
-
WatchServer:服务名称:/etcdserverpb.Watch/ ;注册的Stream方法包括:
- Watch
-
LeaseServer:服务名称:/etcdserverpb.Lease/ ;注册的Unary方法包括:
- LeaseGrant
- LeaseRevoke
- LeaseTimeToLive
- LeaseLeases
注册的Stream方法包括: - LeaseKeepAlive
-
ClusterServer:服务名称:/etcdserverpb.Cluster/ ;注册的Unary方法包括:
- MemberAdd
- MemberRemove
- MemberUpdate
- MemberList
- MemberPromote
AuthServer:服务名称:/etcdserverpb.Auth/ ;注册的Unary方法包括:AuthEnable、AuthDisable、UserAdd、RoleAdd等
-
HealthServer:服务名称:/grpc.health.v1.Health/ ;注册的Unary方法包括:
- Check
- List
注册的Stream方法包括: - Watch
-
MaintenanceServer:服务名称:/etcdserverpb.Maintenance/ ;注册的Unary方法包括:
- Alarm、Status、Defragment、Hash、HashKV、MoveLeader、Downgrade
注册的Stream方法包括: - Snapshot
- Alarm、Status、Defragment、Hash、HashKV、MoveLeader、Downgrade
-
ElectionServer:服务名称:/v3electionpb.Election/ ;注册的Unary方法包括:
- Campaign、Proclaim、Leader、Resign
注册的Stream方法包括: - Observe
- Campaign、Proclaim、Leader、Resign
-
LockServer:服务名称:/v3lockpb.Lock/ ;注册的Unary方法包括:
- Lock、Unlock
grpc-gateway
grpc-gateway用于将grpc上注册的方法同时注册为http rest端点,能够使用http rest 方式来请求grpc服务
将自身做为grpc客户端,dial client监听的端口,用于接收http请求,并转换为grpc请求转发到 client监听的端口
conn, err := dial(ctx)
构建grpc gateway,定义接收到的http请求参数反序列话方式,UseProtoNames直接使用protobuf定义的字段名称而不是驼峰命名等
gwmux := gw.NewServeMux(
gw.WithMarshalerOption(gw.MIMEWildcard,
&gw.HTTPBodyMarshaler{
Marshaler: &gw.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: true,
EmitUnpopulated: false,
},
UnmarshalOptions: protojson.UnmarshalOptions{
DiscardUnknown: true,
},
},
},
),
)
将grpc服务注册为http rest 端点,对应的就是上述grpc服务上注册的那些服务方法
handlers := []registerHandlerFunc{
etcdservergw.RegisterKVHandler,
etcdservergw.RegisterWatchHandler,
etcdservergw.RegisterLeaseHandler,
etcdservergw.RegisterClusterHandler,
etcdservergw.RegisterMaintenanceHandler,
etcdservergw.RegisterAuthHandler,
v3lockgw.RegisterLockHandler,
v3electiongw.RegisterElectionHandler,
}
注册格式:
- 全部使用POST方式注册
mux.Handle(http.MethodPost, pattern_KV_Range_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
//
})
- 映射路径解析:[]int{2, 0, 2, 1, 2, 2} 是 opcode(操作码),2 对应的是入栈操作,操作的是[]string{"v3", "kv", "range"}这个数组元素,
2,0
相当于把0号元素v3入栈,2,1
相当于把1号元素kv入栈,2,2
相当于把2号元素range入栈,最终构成的请求路径就是:/v3/kv/range
pattern_KV_Range_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3", "kv", "range"}, ""))
因此可以通过POST http://127.0.0.1:2379/v3/kv/range
来请求grpc方法:/etcdserverpb.KV/Range
还可以通过websocket来请求grpc方法,这里自动将”/v3“开头的 WebSocket 请求转成 HTTP POST请求,进而在转换为grpc请求
if gwmux != nil {
httpmux.Handle(
"/v3/",
wsproxy.WebsocketProxy(
gwmux,
wsproxy.WithRequestMutator(
// Default to the POST method for streams
func(_ *http.Request, outgoing *http.Request) *http.Request {
outgoing.Method = "POST"
return outgoing
},
),
wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
wsproxy.WithLogger(wsProxyZapLogger{sctx.lg}),
),
)
}
总结
整个Client的监听中,可以归纳为这类型的请求:
普通http rest请求,如/version、/metrics等,直接交给 http 服务器处理
grpc请求,这部分请求是http2的,并且header种携带
Content-Type=application/grpc
,直接交给grpc 服务器处理http rest请求,请求类路是/v3开头的,请求方法是POST,这部分请求交给grpc-gateway处理,grpc-gateway会将自身作为grpc client,进而将http请求转发给grpc服务处理
websocket 请求,请求路径是/v3开头的,这部分请求也交给grpc-gateway处理,grpc-gateway会自动把websocket请求转换为 http post请求,进而交给grpc服务处理