Rpcx源码之Client

一、概述

在Rpcx框架源码中,存在Client的角色,用来完成承担Client stub;相对来说Server,通过从注册中心获取已注册暴露对外提供功能的Service,本将已注册的service存放到本地,减少频繁的读取registry中心的记录,同时也通过watch机制获取registry中注册服务的变更(采用channel来监听service的变更),来同步本地缓存的记录,通过RPCClient与server完成通信采用同步或异步的通信方式。对应的Clinet提供了如下的功能:

func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string,
                        args interface{}, reply interface{}) error // 同步调用
func (client *Client) Close() error // 释放RPCClient相关资源
func (c *Client) Connect(network, address string) error // 连接server
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string,
           args interface{}, reply interface{}, done chan *Call) *Call // 异步
func (client *Client) IsClosing() bool  // 本地RPCClient是否已关闭
func (client *Client) IsShutdown() bool // RPCClient是否停止

func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) 
          (map[string]string, []byte, error) //原始信息发送

对应的XClinet提供了如下的功能:(XClient是在Client的基础上增加路由规则、失败模式、服务发现治理等功能)

func (c *xClient) SetSelector(s Selector)  // 设置路由规则
func (c *xClient) SetPlugins(plugins PluginContainer) // 设置插件 增强client特性
func (c *xClient) ConfigGeoSelector(latitude, longitude float64) // 就近路由规则
func (client *Client) Call(ctx context.Context, servicePath,serviceMethod string, 
      args interface{},reply interface{}) error // 同步调用
func (client *Client) Go(ctx context.Context, servicePath,serviceMethod string, 
      args interface{}, reply interface{}, done chan *Call) *Call // 异步执行调用
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) 
         (map[string]string, []byte, error)  // 发送原始信息
func (c *xClient) Broadcast(ctx context.Context, serviceMethod string, 
      args interface{},  reply interface{}) error  // 广播请求模式
func (c *xClient) Fork(ctx context.Context, serviceMethod string, 
    args interface{},  reply interface{}) error // Fock 模式请求

接下来详细剖析对应的源码

二、源码

本文源码主要位于:rpcx\client包中client.go 和 xclient.go
Client结构

// 支持同步、异步的调用模式;一个Client就代表一个RPCClient
type Client struct {
    option Option // 配置选项

    Conn net.Conn      // connection
    r    *bufio.Reader   // 读

    mutex        sync.Mutex // protects following
    seq          uint64
    pending      map[uint64]*Call    // 挂起的调用
    closing      bool      // user has called Close  client关闭
    shutdown     bool    // server has told us to stop  client停止
    pluginClosed bool  // the plugin has been called plugin禁用

    Plugins PluginContainer         // 插件容器

    ServerMessageChan chan<- *protocol.Message  // 接收server的通知channel
}

从上面的Client结构体的源码可以看到提供了Client只是具备了简单RPC通信功能:同步和异步,并以Plugin方式提供Client的额外特性、Client权限凭证等操作;具体的提供的函数如下:

1.1 新建Client

// 新建Client
// 目前支持Option:服务分组、重试、TLS配置、连接timeout、读取timeout、
// 写入timeout、调用备份失败有效期、断路器、协议序列化、协议压缩方式、心跳等相关参数设置
// 默认已提供DefaultOption满足基本参数设置需求,可以在此基础上按需修改.
func NewClient(option Option) *Client {
    return &Client{
        option: option,
    }
}

1.2 连接Server

// 连接server并准备发送client的请求:client是需要指定的network来连接server的
// 本部分源码位于:client包下的connection.go文件中
// 
func (c *Client) Connect(network, address string) error {
    var conn net.Conn
    var err error

    switch network {
    case "http":   
        conn, err = newDirectHTTPConn(c, network, address)
    case "kcp":
        conn, err = newDirectKCPConn(c, network, address)
    case "quic":
        conn, err = newDirectQuicConn(c, network, address)
    case "unix":
        conn, err = newDirectConn(c, network, address)
    default:  // 默认TCP
        fn := makeConnMap[network]
        if fn != nil {   
            conn, err = fn(c, network, address)
        } else {  // TLS方式连接 
            conn, err = newDirectConn(c, network, address)
        }
    }

    if err == nil && conn != nil {  // 设置connection的client端相关参数
        if c.option.ReadTimeout != 0 { // 读超时
            conn.SetReadDeadline(time.Now().Add(c.option.ReadTimeout))
        }
        if c.option.WriteTimeout != 0 { // 写超时
            conn.SetWriteDeadline(time.Now().Add(c.option.WriteTimeout))
        }

        c.Conn = conn
        c.r = bufio.NewReaderSize(conn, ReaderBuffsize)  // 设置buffer
        //c.w = bufio.NewWriterSize(conn, WriterBuffsize)

        // start reading and writing since connected
        go c.input()   // 等待connected后 执行read或write操作

                // 额外开启goroutine来执行heartbeat
        if c.option.Heartbeat && c.option.HeartbeatInterval > 0 { 
            go c.heartbeat()
        }

    }

    return err
}

等待连接正常进行read或write操作的入口

// 读取server的响应结果:
// 1、本地缓存不存在对应调用对象call记录
// 2、调用对象响应结果返回:a、出现了Error b、正常
func (client *Client) input() { //
    var err error
    var res = protocol.NewMessage() // 请求消息

        // for循环根据用户定义的内容来构建成发送给server端的protocol.Message信息
       //              读取server返回给客户端的protocol.Message信息
       // client相关的timeou参数设置
    for err == nil {
        if client.option.ReadTimeout != 0 { // 设置读取有效期
            client.Conn.SetReadDeadline(time.Now().Add(client.option.ReadTimeout))
        }

        // 完成将用户定义的request转为Message
        err = res.Decode(client.r) // 解码
        //res, err = protocol.Read(client.r)

        if err != nil {
            break
        }
        // 验证Message的合法性
        seq := res.Seq()
        var call *Call
        isServerMessage := (res.MessageType() == protocol.Request 
                   && !res.IsHeartbeat() && res.IsOneway()) // 是否为server消息
// 当前Message属于request的时  获取其关联的pending中对应call:
//     pending中记录的是待执行的call
        if !isServerMessage {  
            client.mutex.Lock()
            call = client.pending[seq]
            delete(client.pending, seq)
            client.mutex.Unlock()
        }

        switch {
        case call == nil: // 当调用对象call不存在本地缓存中
            if isServerMessage {  // 
                if client.ServerMessageChan != nil {
                                       // 新建go协程单独处理response结果
                    go client.handleServerRequest(res) 
                    res = protocol.NewMessage()
                }
                continue
            }
        case res.MessageStatusType() == protocol.Error: // 消息状态 = Error
            // We've got an error response. Give this to the request
            // 当server端输出一个错误响应 给到对应的请求
            if len(res.Metadata) > 0 { // 提取响应内容中meta data数据
                meta := make(map[string]string, len(res.Metadata))
                for k, v := range res.Metadata {
                    meta[k] = v
                }
                call.ResMetadata = meta
                call.Error = ServiceError(meta[protocol.ServiceError])
            }

            if call.Raw { // 调用使用的raw形式  需要将响应内容进行转换
                call.Metadata, call.Reply, _ = convertRes2Raw(res)
                call.Metadata[XErrorMessage] = call.Error.Error()
            }
            call.done() // 触发结果通道channel
        default: // 响应处理
            if call.Raw { // Raw形式单独提取出来 完成对应的转换:获取metadata、reply
                call.Metadata, call.Reply, _ = convertRes2Raw(res)
            } else {
                data := res.Payload // 提取响应体
                if len(data) > 0 {
                                       // 根据对应的解码来进行解码
                    codec := share.Codecs[res.SerializeType()] 
                    if codec == nil {
                        call.Error = ServiceError(ErrUnsupportedCodec.Error())
                    } else {
                  // 将对应的返回结果赋给本次rpc的Call中的Reply
                        err = codec.Decode(data, call.Reply) 
                        if err != nil {
                            call.Error = ServiceError(err.Error())
                        }
                    }
                }
                if len(res.Metadata) > 0 { // 提取metadata
                    meta := make(map[string]string, len(res.Metadata))
                    for k, v := range res.Metadata {
                        meta[k] = v
                    }
// 赋予本次call对应的响应元数据ResMetadata
                    call.ResMetadata = res.Metadata 
                }

            }

            call.done() // 通知结果通道
        }

        res.Reset() // 重置Message对象 便于下一次新的请求Message处理
    }

    ...省略代码
}

每次进行的RPC过程会被封装成Call对象,通过Done的channel来监控每次Call完成情况

// Call represents an active RPC.
type Call struct {
    ServicePath   string               // RPC调用的Service名字及其对应的Method.
    ServiceMethod string             // 
    Metadata      map[string]string  //metadata
    ResMetadata   map[string]string  // 响应结果的metadata
    Args          interface{} // 函数的参数(*struct).
    Reply         interface{} // 函数的应答结果 (*struct).
    Error         error       // 一次Call完成时对应的Error状态
    Done          chan *Call  // 存放每次完成的Call.
    Raw           bool        // 是否采用原始数据信息方式
}

发起远程调用(同步调用)

func (client *Client) call(ctx context.Context, servicePath, serviceMethod string,
     args interface{}, reply interface{}) error {
        seq := new(uint64)                          // 请求ID
     // 将seq添加到context中便于在Server端时获取
    ctx = context.WithValue(ctx, seqKey{}, seq) 
    Done := client.Go(ctx, servicePath, serviceMethod, args, 
            reply, make(chan *Call, 1)).Done
        // 异步请求
    var err error
    select {
    case <-ctx.Done(): //cancel by context  取消
        client.mutex.Lock()  
    // 使用互斥锁 剔除处于pending状态的call; 
    //  处于pending状态的call存放在client端的本地内存中map[uint64]*Call
        call := client.pending[*seq]
        delete(client.pending, *seq)
        client.mutex.Unlock()
        if call != nil { //  将本次Call投递到channel中
            call.Error = ctx.Err()
            call.done() // 执行Call投递到channel
        }

        return ctx.Err() // 输出本次request的错误状态
    case call := <-Done: // 从Done的channel获取Call:代表Call已完成
        err = call.Error
        meta := ctx.Value(share.ResMetaDataKey) // 获取response内容
        if meta != nil && len(call.ResMetadata) > 0 {
            resMeta := meta.(map[string]string)
            for k, v := range call.ResMetadata {
                resMeta[k] = v
            }
        }
    }

    return err

目前rpcx 支持许多服务发现机制,同时也可以自定义服务发现:

  • [Peer to Peer] 客户端直连每个服务节点。
  • [Peer to Multiple]客户端可以连接多个服务。服务可以被编程式配置。
  • [Zookeeper] 通过 zookeeper 寻找服务。
  • [Etcd] 通过 etcd 寻找服务。
  • [Consul] 通过 consul 寻找服务。
  • [mDNS] 通过 mDNS 寻找服务(支持本地服务发现)。
  • [In process]在同一进程寻找服务。客户端通过进程调用服务,不走TCP或UDP,方便调试使用。
rpcx 支持 故障模式:
  • [Failfast]:如果调用失败,立即返回错误
  • [Failover]:选择其他节点,直到达到最大重试次数
  • [Failtry]:选择相同节点并重试,直到达到最大重试次数
  • [Failbackup]:选择两个节点,当一个节点未能返回结果时,则请求第二个节点来获取结果
rpcx 支持 路由模式:

[Random]: 随机选择节点
[Roundrobin]: 用 [roundrobin] (https://zh.wikipedia.org/wiki/%E5%BE%AA%E7%92%B0%E5%88%B6) 算法选择节点
[Consistent hashing]: 如果服务路径、方法和参数一致,就选择同一个节点, 使用了非常快的jump consistent hash算法
[Weighted]: 根据元数据里配置好的权重(weight=xxx)来选择节点
[Network quality]: 根据ping的结果来选择节点。网络质量越好,该节点被选择的几率越大
[Geography]: 如果有多个数据中心,客户端趋向于连接同一个数据机房的节点
[Customized Selector]: 如果以上的选择器都不适合你,你可以自己定制选择器

rpcx特殊的两种请求方式
  • Broadcast 表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。此时FailMode 和 SelectMode的设置是无效的。请设置超时来避免阻塞。
  • Fork 表示向所有服务器发送请求,只要任意一台服务器正确返回就成功。此时FailMode 和 SelectMode的设置是无效的。

三、使用

实例server端代码

import (
    "context"
    "flag"
    "github.com/smallnest/rpcx/server"
    "log"
    "net"
    "rpcx/examples/models"
    )

var (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    flag.Parse()

    s := server.NewServer()
    //s.Plugins.Add(&ConnectionListerPlugin{})

    s.Register(new(models.Arith),"")  // 只提供rcvr不指定servicePath和method以及对应的name
    s.RegisterName("PBArith", new(models.PBArith),"") // 提供rcvr及其name
    s.RegisterFunction("PB-Mul",models.Mul,"") // 注入函数(对应的函数不需要提供调用方) 提供servicePath和method
    s.RegisterFunctionName("PMul","mul",models.Mul,"") // 提供servicePath和method及其name

    s.Serve("tcp", *addr)
    //log.Println(" Server Address= " + s.Address().String())
}

type ConnectionListerPlugin struct {
}

func (clis *ConnectionListerPlugin) HandleLister(conn net.Conn) (net.Conn, bool){
    log.Printf("Server Listener Address %v \n", conn.LocalAddr().String())
    return conn,true
}

func mul(ctx context.Context, args *models.Args, reply *models.Reply) error {
    reply.C = args.A * args.B
    return nil
}

实例client端代码

package main

import (
    "context"
    "flag"
    "rpcx/client"
    "rpcx/examples/models"
    "log"
    "time"
)

var  (
    addr = flag.String("addr","localhost:8972","server address")
)

func main() {
    flag.Parse()
    //
    d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for{
        reply := &models.Reply{}
        err := xclient.Call(context.Background(),"Mul", args, reply)
        if err != nil{
            log.Fatalf("failed to call:%v", err)
        }
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(1e9)
    }
}

辅助代码

package models

import (
    "context"
    "fmt"
    "rpcx/_testutils"
)

// 参数
type Args struct {
    A int
    B int
}

// 回复结果
type Reply struct {
    C int
}

// 服务
type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A * args.B
    fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
    return nil
}

func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
    reply.C = args.A + args.B
    fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
    return nil
}

func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
    *reply = "hello " + *args
    return nil
}

type PBArith int

func (t *PBArith) Mul(ctx context.Context, args *testutils.ProtoArgs, reply *testutils.ProtoReply) error {
    reply.C = args.A * args.B
    return nil
}

func (t *Arith) ThriftMul(ctx context.Context, args *testutils.ThriftArgs_, reply *testutils.ThriftReply) error {
    reply.C = args.A * args.B
    return nil
}

func Mul(ctx context.Context, args *testutils.ProtoArgs, reply *testutils.ProtoReply) error {
    reply.C = args.A * args.B
    return nil
}

四、其他

Client分析源码
Client执行链路如下:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容