ETCD《一》--Client监听

开启Client监听

Client监听通常监听在2379端口上,用于对外提供http+grpc服务,如etcdctl客户端等

http监听

普通http监听会注册一些rest api

  • /debug/vars

  • /version

  • /metrics

  • /health

grpc监听

grpc基于http2协议

grpc层也实现了一些keepalive策略

定义 服务器端主动发送 ping 的时间间隔,用来确认客户端还在;Timeout: ping 发出后多长时间没收到 ACK 就断开连接

grpc.KeepaliveParams(keepalive.ServerParameters{
            Time:    e.cfg.GRPCKeepAliveInterval,  // 2h
            Timeout: e.cfg.GRPCKeepAliveTimeout,  // 20s
        }

限制客户端的 ping 频率,防止 DoS 攻击或滥用资源(否则断开连接)

grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
            MinTime:             e.cfg.GRPCKeepAliveMinTime,  // 5s
            PermitWithoutStream: false,
        }

grpc中的注册方法分为两类:

  • Unary:类似HTTP rest请求,一次请求,一次响应;但仍然是基于http2的,可以在保持连接的时候做到tcp连接复用,在同一个tcp连接上复用http多路传输

  • Stream:流式请求,连接不断开,采用推送的方式,类似SSE

在注册方法的时候可以选择注册为Unary方法或者stream方法,在调用的时候通过不同的拦截器进行处理

srv, knownService := s.services[service]
    if knownService {
        if md, ok := srv.methods[method]; ok {
            s.processUnaryRPC(ctx, stream, srv, md, ti)
            return
        }
        if sd, ok := srv.streams[method]; ok {
            s.processStreamingRPC(ctx, stream, srv, sd, ti)
            return
        }
    }

grpc监听和http监听是监听在同一个端口上的,通过 cmux 多路复用器同时运行 gRPC 和 HTTP 服务

m := cmux.New(sctx.l)
server = m.Serve
err = server()

在处理请求时,对于http2请求并且请求header中包含Content-Type=application/grpc的将其转发到grpc服务、否则将其转发到http服务

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
            grpcServer.ServeHTTP(w, r)
        } else {
            otherHandler.ServeHTTP(w, r)
        }
    })

grpc注册的服务端点:

  • KVServer:服务名称:/etcdserverpb.KV/ ;注册的Unary方法包括:

    • Range
    • Put
    • DeleteRange
    • Txn
    • Compact
  • WatchServer:服务名称:/etcdserverpb.Watch/ ;注册的Stream方法包括:

    • Watch
  • LeaseServer:服务名称:/etcdserverpb.Lease/ ;注册的Unary方法包括:

    • LeaseGrant
    • LeaseRevoke
    • LeaseTimeToLive
    • LeaseLeases
      注册的Stream方法包括:
    • LeaseKeepAlive
  • ClusterServer:服务名称:/etcdserverpb.Cluster/ ;注册的Unary方法包括:

    • MemberAdd
    • MemberRemove
    • MemberUpdate
    • MemberList
    • MemberPromote
  • AuthServer:服务名称:/etcdserverpb.Auth/ ;注册的Unary方法包括:AuthEnable、AuthDisable、UserAdd、RoleAdd等

  • HealthServer:服务名称:/grpc.health.v1.Health/ ;注册的Unary方法包括:

    • Check
    • List
      注册的Stream方法包括:
    • Watch
  • MaintenanceServer:服务名称:/etcdserverpb.Maintenance/ ;注册的Unary方法包括:

    • Alarm、Status、Defragment、Hash、HashKV、MoveLeader、Downgrade
      注册的Stream方法包括:
    • Snapshot
  • ElectionServer:服务名称:/v3electionpb.Election/ ;注册的Unary方法包括:

    • Campaign、Proclaim、Leader、Resign
      注册的Stream方法包括:
    • Observe
  • LockServer:服务名称:/v3lockpb.Lock/ ;注册的Unary方法包括:

    • Lock、Unlock

grpc-gateway

grpc-gateway用于将grpc上注册的方法同时注册为http rest端点,能够使用http rest 方式来请求grpc服务

将自身做为grpc客户端,dial client监听的端口,用于接收http请求,并转换为grpc请求转发到 client监听的端口

conn, err := dial(ctx)

构建grpc gateway,定义接收到的http请求参数反序列话方式,UseProtoNames直接使用protobuf定义的字段名称而不是驼峰命名等

gwmux := gw.NewServeMux(
        gw.WithMarshalerOption(gw.MIMEWildcard,
            &gw.HTTPBodyMarshaler{
                Marshaler: &gw.JSONPb{
                    MarshalOptions: protojson.MarshalOptions{
                        UseProtoNames:   true,
                        EmitUnpopulated: false,
                    },
                    UnmarshalOptions: protojson.UnmarshalOptions{
                        DiscardUnknown: true,
                    },
                },
            },
        ),
    )

将grpc服务注册为http rest 端点,对应的就是上述grpc服务上注册的那些服务方法

handlers := []registerHandlerFunc{
        etcdservergw.RegisterKVHandler,
        etcdservergw.RegisterWatchHandler,
        etcdservergw.RegisterLeaseHandler,
        etcdservergw.RegisterClusterHandler,
        etcdservergw.RegisterMaintenanceHandler,
        etcdservergw.RegisterAuthHandler,
        v3lockgw.RegisterLockHandler,
        v3electiongw.RegisterElectionHandler,
    }

注册格式:

  • 全部使用POST方式注册
mux.Handle(http.MethodPost, pattern_KV_Range_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
        //
    })
  • 映射路径解析:[]int{2, 0, 2, 1, 2, 2} 是 opcode(操作码),2 对应的是入栈操作,操作的是[]string{"v3", "kv", "range"}这个数组元素,2,0相当于把0号元素v3入栈,2,1相当于把1号元素kv入栈,2,2相当于把2号元素range入栈,最终构成的请求路径就是:/v3/kv/range

pattern_KV_Range_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v3", "kv", "range"}, ""))

因此可以通过POST http://127.0.0.1:2379/v3/kv/range来请求grpc方法:/etcdserverpb.KV/Range

还可以通过websocket来请求grpc方法,这里自动将”/v3“开头的 WebSocket 请求转成 HTTP POST请求,进而在转换为grpc请求

if gwmux != nil {
        httpmux.Handle(
            "/v3/",
            wsproxy.WebsocketProxy(
                gwmux,
                wsproxy.WithRequestMutator(
                    // Default to the POST method for streams
                    func(_ *http.Request, outgoing *http.Request) *http.Request {
                        outgoing.Method = "POST"
                        return outgoing
                    },
                ),
                wsproxy.WithMaxRespBodyBufferSize(0x7fffffff),
                wsproxy.WithLogger(wsProxyZapLogger{sctx.lg}),
            ),
        )
    }

总结

整个Client的监听中,可以归纳为这类型的请求:

  • 普通http rest请求,如/version、/metrics等,直接交给 http 服务器处理

  • grpc请求,这部分请求是http2的,并且header种携带Content-Type=application/grpc,直接交给grpc 服务器处理

  • http rest请求,请求类路是/v3开头的,请求方法是POST,这部分请求交给grpc-gateway处理,grpc-gateway会将自身作为grpc client,进而将http请求转发给grpc服务处理

  • websocket 请求,请求路径是/v3开头的,这部分请求也交给grpc-gateway处理,grpc-gateway会自动把websocket请求转换为 http post请求,进而交给grpc服务处理

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容