实现grpc名称解析器插件

在实际的请求调用过程中,出于容错考虑,服务提供方会部署多台server。借助名称解析器,client可以向多台server发起请求,避免单台server故障导致请求失败。
grpc默认使用的名称解析器是dns,除此之外,我们也可以实现自定义的名称解析器。

conn, err := grpc.Dial("dns:///localhost:8000",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)

自定义名称解析器

为了实现自定义名称解析器,我们需要实现Builder接口:

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
    // Build creates a new resolver for the given target.
    // gRPC dial calls Build synchronously, and fails if the returned error is
    // not nil.
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    // Scheme returns the scheme supported by this resolver.
    // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
    Scheme() string
}

Scheme():返回自定义解析器的唯一标识。这样在grpc.Dial()时,才能通过唯一标识找到对应的解析器。
Build()grpc.Dial()会调用Build(),以此获取服务端地址,这个过程称为服务发现。Build()需要返回Reslover接口,因此还需要实现Reslover

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
    // ResolveNow will be called by gRPC to try to resolve the target name
    // again. It's just a hint, resolver can ignore this if it's not necessary.
    //
    // It could be called multiple times concurrently.
    ResolveNow(ResolveNowOptions)
    // Close closes the resolver.
    Close()
}

从注释中可以看出,ResolveNow() 不是必须的,Close()接口用于关闭解析器,这两个接口可以简单实现。
如下所示是自定义名称解析器"cindy"的实现。

package client

import (
    "google.golang.org/grpc/resolver"
)

type cindyResolver struct {
    target     resolver.Target
    cc         resolver.ClientConn
    addrsStore map[string][]string
}

const (
    SchemeName = "cindy"
    TargetName = "/resolver.cindy"
)

// Build 实现resolver.Builder接口
func (c *cindyResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    c.target = target
    c.cc = cc
    c.addrsStore = map[string][]string{
        TargetName: {"127.0.0.1:1000", "127.0.0.1:1001"},
    }

    // 获取地址
    addrStrs := c.addrsStore[c.target.URL.Path]
    addrList := make([]resolver.Address, len(addrStrs))
    for i, s := range addrStrs {
        addrList[i] = resolver.Address{Addr: s}
    }
    // 写入地址
    c.cc.UpdateState(resolver.State{Addresses: addrList})
    return c, nil
}

func (c *cindyResolver) ResolveNow(o resolver.ResolveNowOptions) {
}

func (c *cindyResolver) Close() {
}

func (c *cindyResolver) Scheme() string {
    return SchemeName
}

func init() {
    // 注册builder
    resolver.Register(&cindyResolver{})
}

demo的核心逻辑是Build()的实现:通过Path获取到对应的地址,此处硬编码两个地址"127.0.0.1:1000"和"127.0.0.1:1001",最终调用UpdateState()写入地址。在实际的使用过程中,服务提供方会上下线服务实例,因此可以结合consul、zk等服务注册中心动态获取最新的服务端ip。

负载均衡策略

既然有多个服务实例,每次client在发起请求时,要如何选择后端节点呢?gRPC-go内置pick_firstround_robin这两种负载均衡策略。
- pick_first:尝试连接第一个地址,如果成功,会将后续的请求都发送到此地址;如果失败,则尝试下一个地址。
- round_robin:连接获取到的所有后端地址,并尝试依次向后端发送rpc请求。比如第一个请求发送给server 1,第二个请求发送给server 2。

client/server示例

接下来,我们使用自定义名称解析器向服务端发起连接。Dial的地址为"cindy:///resolver.cindy","cindy"就是自定义名称解析器的名字,需要和Scheme()返回的内容完全一致,"/resolver.cindy"对应Build()中的c.target.URL.Path,负载均衡策略选择round_robin

// StartClient 启动客户端
func StartClient() {
    conn, err := grpc.Dial("cindy:///resolver.cindy",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
    if err != nil {
        fmt.Printf("new client err: %s", err.Error())
    }

    client := pb.NewGreetServiceClient(conn)

    for i := 0; i < 10; i++ {
        resp, err := client.Greet(context.Background(), &pb.GreetRequest{Name: "alice"})
        if err != nil {
            fmt.Printf("call err: %s", err.Error())
            return
        }
        fmt.Printf("resp: %+v\n", resp)
    }
}
// StartServer 启动服务
func StartServer(port int) {
    addr := fmt.Sprintf("0.0.0.0:%d", port)
    // 启动服务
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        fmt.Printf("failed to listen, err: %v\n", err)
        return
    }
    defer listener.Close()

    s := grpc.NewServer() // 创建grpc服务
    // 注册服务
    pb.RegisterGreetServiceServer(s, &Server{Addr: addr})
    // 启动服务
    if err := s.Serve(listener); err != nil {
        fmt.Printf("failed to serve, err: %v\n", err)
        return
    }
}

参考

gRPC中的名称解析和负载均衡

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容