RPC框架 rpcx

rpcx 的特性 简单易用 & 超快 & 高效 & 功能强大

1.简单易用

易于入门, 易于开发, 易于集成, 易于发布, 易于监控

2.高性能

性能远远高于 Dubbo、Motan、Thrift等框架,是gRPC性能的两倍

3.交叉平台,交叉语言

可以容易部署在Windows/Linux/MacOS等平台,支持各种编程语言的调用*

4.服务发现

除了直连外,还支持 Zookeeper、Etcd、 Consul、mDNS等注册中心

5.服务治理

支持 Failover、 Failfast、 Failtry、Backup等失败模式,支持 随机、 轮询、权重、网络质量, 一致性哈希,地理位置等路由算法

安装

对于初学者,我们不清楚用到哪些功能,索性就全部下下来

go get -u -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...

tags 对应:
quic: 支持 quic 协议
kcp: 支持 kcp 协议
zookeeper: 支持 zookeeper 注册中心
etcd: 支持 etcd 注册中心
consul: 支持 consul 注册中心
ping: 支持 网络质量负载均衡
reuseport: 支持 reuseport

简单的例子

需要有三个文件 我们的结构文件(结构体+函数表达式)、服务端、客户端
有点像MVC的结构
结构:

import "context"

type Args {
  A int
  B int
}

type Reply{
  C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context,args *Args,reply *Reply) error{
  reply.C = args.A*args.B
  return nil
}

server

package main

import (
    "flag"
    "github.com/my/repo/application/ceshi/RPC/rpcx"
    "github.com/smallnest/rpcx/server"
)
var (addr = flag.string("addr","lcoalhost:7777","sever addr"))

func main(){
  flag.Parse()
  s := sever.  NewServer()
  s.Register(new(rpcx.Arith),"")
  s.Server("tcp",*addr)
}

client

package main

import (
    "context"
    "flag"
    "github.com/my/repo/application/ceshi/RPC/rpcx"
    "github.com/smallnest/rpcx/client"
    "log"
)
var(
    addr = flag.String("addr","localhost:7777","server addr")
)

func main(){
  flag.Parse()
  d,_ := client.NewPeer2PeerDiscovery("tcp@"+*addr,"")
  xclient := client.NewXClient("Arith",client.Failtry,client.RondomSelect,d,client.DefaultOption)
  defer xclient.close
  args := &rpcx.Args{
    A:10,
    B:20,
  }
  reply := &rpcx.Reply{}
  err := xclient.Call(context.Background(),"Mul",args,reply)
  if err != nil {
        log.Fatalf("failed to call: %v", err)
  }
  log.Printf("%d + %d = %d", args.A, args.B, reply.C)

}

异步调用 Sever

d := client.NewPeer2PeerDiscovery("tcp@"+*add,"")
xclient := client.NewXClient("Arith",client.Failtry,client.RandomSelect,d,client.DefaultOption)
defer xclient.close()

args := &rpcx.Args{
  A:10,
  B:10,
}
reply := &rpcx.Reply{}

call,err := xclient.Go(context.Background(),"Mul",args,reply,nil)
if err != nil {
        log.Fatalf("failed to call: %v", err)
    }
replyCall := <- call.Done
if replyCall.Error != nil {
        log.Fatalf("failed to call: %v", replyCall.Error)
    } else {
        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
你必须使用 xclient.Go 来替换 xclient.Call, 然后把结果返回到一个channel里。你可以从chnanel里监听调用结果

通信传输类型

TCP
服务端使用tczuo wei做为网络名并在注册中心注册了名为 severName/tcp@ipaddr:port 的服务

s.Server("tcp",*addr)

客户端访问方式

d := client.NewPeer2PeerDiscovery("tcp@"+*addr,"")
xclient := client.NewXClient("Arith",client.Failtry,clinet.RandomSelect,client.DefaultOption)
defer xclient.Close() 

HTTP
通过http发送给rpcx服务器,rpcx服务器会劫持这个链接然后将它作为tcp链接来使用。
Unixdomain
server

s.Server("unix",*addr)

client

var (
  add = flag.String("addr","/tmp/rpcx.socket","server addr")
)
 flag.Parse()
d,_ := client.NewPeer2PeerDiscovery("unix@"+*addr,"")

KPC
sever

//go run -tags kcp server.go
package main

import (
    "crypto/sha1"
    "flag"
    "net"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
    kcp "github.com/xtaci/kcp-go"
    "golang.org/x/crypto/pbkdf2"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

const cryptKey = "rpcx-key"
const cryptSalt = "rpcx-salt"

func main() {
    flag.Parse()

    pass := pbkdf2.Key([]byte(cryptKey), []byte(cryptSalt), 4096, 32, sha1.New)
    bc, err := kcp.NewAESBlockCrypt(pass)
    if err != nil {
        panic(err)
    }

    s := server.NewServer(server.WithBlockCrypt(bc))
    s.RegisterName("Arith", new(example.Arith), "")

    cs := &ConfigUDPSession{}
    s.Plugins.Add(cs)

    err = s.Serve("kcp", *addr)
    if err != nil {
        panic(err)
    }
}

type ConfigUDPSession struct{}

func (p *ConfigUDPSession) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
    session, ok := conn.(*kcp.UDPSession)
    if !ok {
        return conn, true
    }

    session.SetACKNoDelay(true)
    session.SetStreamMode(true)
    return conn, true
}

client

//go run -tags kcp client.go
package main

import (
    "context"
    "crypto/sha1"
    "flag"
    "fmt"
    "log"
    "net"
    "time"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
    kcp "github.com/xtaci/kcp-go"
    "golang.org/x/crypto/pbkdf2"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

const cryptKey = "rpcx-key"
const cryptSalt = "rpcx-salt"

func main() {
    flag.Parse()

    pass := pbkdf2.Key([]byte(cryptKey), []byte(cryptSalt), 4096, 32, sha1.New)
    bc, _ := kcp.NewAESBlockCrypt(pass)
    option := client.DefaultOption
    option.Block = bc

    d, _ := client.NewPeer2PeerDiscovery("kcp@"+*addr, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RoundRobin, d, option)
    defer xclient.Close()

    // plugin
    cs := &ConfigUDPSession{}
    pc := client.NewPluginContainer()
    pc.Add(cs)
    xclient.SetPlugins(pc)

    args := &example.Args{
        A: 10,
        B: 20,
    }

    start := time.Now()
    for i := 0; i < 10000; i++ {
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }
        //log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    }
    dur := time.Since(start)
    qps := 10000 * 1000 / int(dur/time.Millisecond)
    fmt.Printf("qps: %d call/s", qps)
}

type ConfigUDPSession struct{}

func (p *ConfigUDPSession) ConnCreated(conn net.Conn) (net.Conn, error) {
    session, ok := conn.(*kcp.UDPSession)
    if !ok {
        return conn, nil
    }

    session.SetACKNoDelay(true)
    session.SetStreamMode(true)
    return conn, nil
}

函数为服务 将纯函数注册为服务

  • 函数可以是导出也可以是不导出的 也就是没有限制了呗
  • 接收三个参数,第一个context.Context,其他是不限制
  • 第3个参数是一个指针
  • 有一个 error 类型的返回值
    服务端必须使用一个RegisterFunction 来注册一个函数并且提供一个服务名
type Args struct{
  A:int,
  B:int,
}

type Reply struct{
  C int
}

func mul(ctx context.Context,args *Args,reply *Reply) error{
  reply := args.A * args*B
  return nil
}

func main(){
  flag.Parse()
  s := server.NewServer()
  s.RegisterFunction("a.f.a",mul,"")
  s.Server("tcp",*addr)
}

客户端可以通过服务名和函数名来调用

d := client.NewPeer2PeerDiscover("tcp@"+*addr,"")
xclient := client.NewXClient("a.f.a",client.Failtry,client.RandomSelect,client.DefaultOption)
注册中心

服务注册中心是用来实现服务发现和服务的元数据存储的
当前rpcx支持多种注册中心 并且支持进程内的注册中心,方便开发测试
rpcx会自动的将服务的信息比如服务名、监听地址、监听协议、权重等信息注册到注册中心,同时还会定时的将服务的吞吐率更新到注册中心
如果服务意外的中断或者宕机,注册中心能够监测到这个事件,他会通知客户端这个服务不可用,在服务调用的时候不再选择这个服务器

Peer2Peer

点对点最简单的注册方式

注意:rpcx使用network @ Host: port格式表示一项服务。在network 可以 tcp , http ,unix ,quic或kcp。该Host可以所主机名或IP地址。
MultipleServers 点对多的注册方式

多台服务的时候
sever

import (
    "flag"
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
)
var (
    addr1 = flag.String("addr1", "localhost:8972", "server1 address")
    addr2 = flag.String("addr2", "localhost:9981", "server2 address")
)
func main(){
  flag.Parser()
  go createServer(addr1)
  go createServer(addr2)
}

func createServer(addr String){
  s:= server.NewServer()
  s.RegisterName("Arith",new(explre.Arith),"")
  s.Server("tcp",addr)
}

client

var (
    addr1 = flag.String("addr1", "tcp@localhost:8972", "server1 address")
    addr2 = flag.String("addr2", "tcp@localhost:9981", "server2 address")
)
func main(){
  flag.Parser()
  d := client.NewMultipleServerDiscover([]*client.KVPair{{Key:*addr1},{Key:*addr2}})
xclient = client.NewXClient("Arith",client.Failtry,client.RoundRobin,client.DefaultOption)
defer xclient.close()
}
Zookeeper

主要配置参数:

  • ServerAddress:本机的监听地址,对外的暴漏的监听地址,格式:tcp@ipadderess:port
  • zookeeperServer:zk的集群地址
  • basePath: 服务前缀。如果有多个项目同时使用zookeeper,避免命名冲突,可以设置这个参数为当前的服务设置命名空间
  • Metrics:用来更新服务的TPS
    *updateInterval: 服务的刷新间隔,如果在一定的时间内没有刷新,服务就会从zookeeper中删除

server

import (
    "flag"
    "log"
    "time"

    metrics "github.com/rcrowley/go-metrics"
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
    "github.com/smallnest/rpcx/serverplugin"
)

var (
  addr = flag.String("addr","localhost:7777","server addr")
  zkAddr = flag.String("zkaddr","lcoalhost:7771","zk addr")
  basePath = flag.String("base","/rpcx_test","prefix path")
)
func main(){
  flag.Parser()
  addRegisterPlugin(s)
  s.RegisterName("Arith", new(example.Arith), "")
  s.Server("tcp",*addr)
}

func addRegisterPlugin(s *server.Server){
  r := &serverplugin.zookeeperReginsterPluin{
      ServerAddress:"tcp@"+*addr,
      ZookeeperServers:[]string{*zkAddr},
      BasePath:*basePath,
      Metrics:metrics.NewRegistry(),
      UpdateInterval:time.Minute,
  }
  err := r.start()
  if err != nil {
        log.Fatal(err)
    }
  s.Plugins.add(r)
}

client

var (
    zkAddr   = flag.String("zkAddr", "localhost:2181", "zookeeper address")
    basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func main(){
  flag.Parser()
  d,_ := client.NewZookeeperDiscover(*basePath,"Arith",[]string{*zkAddr},nil)
  xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d,   client.DefaultOption)
  defer xclient.Close()
}
Etcd

主要配置几个参数:

  • ServiceAddress: 本机的监听地址, 这个对外暴露的监听地址, 格式为tcp@ipaddress:port
  • EtcdServers: etcd集群的地址
  • BasePath: 服务前缀。 如果有多个项目同时使用zookeeper,避免命名冲突,可以设置这个参数,为当前的服务设置命名空间
  • Metrics: 用来更新服务的TPS
  • UpdateInterval: 服务的刷新间隔, 如果在一定间隔内(当前设为2 * UpdateInterval)没有刷新,服务就会从etcd中删除

插件必须在注册服务之前添加到Server中,否则插件没有办法获取注册的服务的信息

server

var (
    addr     = flag.String("addr", "localhost:8972", "server address")
    etcdAddr = flag.String("etcdAddr", "localhost:2379", "etcd address")
    basePath = flag.String("base", "/rpcx_test", "prefix path")
)

func addRegistryPlugin(s *server.Server) {
    r := &serverplugin.EtcdRegisterPlugin{
        ServiceAddress: "tcp@" + *addr,
        EtcdServers:    []string{*etcdAddr},
        BasePath:       *basePath,
        Metrics:        metrics.NewRegistry(),
        UpdateInterval: time.Minute,
    }
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r)
}
func main() {
    flag.Parse()

    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    go s.Serve("tcp", *addr)

    time.Sleep(time.Minute)

    err := s.UnregisterAll()
    if err != nil {
        panic(err)
    }
}

client

import (
    "context"
    "flag"
    "log"
    "time"
    etcd_client "github.com/rpcxio/rpcx-etcd/client"
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    etcdAddr = flag.String("etcdAddr", "localhost:2379", "etcd address")
    basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func main() {
    flag.Parse()

    d, _ := etcd_client.NewEtcdDiscovery(*basePath, "Arith", []string{*etcdAddr}, false, nil)
    xclient := client.NewXClient("Arith", client.Failover, client.RoundRobin, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    for {
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Printf("failed to call: %v\n", err)
            time.Sleep(5 * time.Second)
            continue
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(5 * time.Second)
    }
}

Consul

特性:

  • 服务发现: Consul提供了通过DNS或者HTTP接口的方式来注册服务和发现服务。一些外部的服务通过Consul很容易的找到它所依赖的服务。
  • 健康检测: Consul的Client提供了健康检查的机制,可以通过用来避免流量被转发到有故障的服务上。
  • Key/Value存储: 应用程序可以根据自己的需要使用Consul提供的Key/Value存储。 Consul提供了简单易用的HTTP接口,结合其他工具可以实现动态配置、功能标记、领袖选举等等功能。
  • 多数据中心: Consul支持开箱即用的多数据中心. 这意味着用户不需要担心需要建立额外的抽象层让业务扩展到多个区域。

主要配置几个参数:

  • ServiceAddress: 本机的监听地址, 这个对外暴露的监听地址, 格式为tcp@ipaddress:port
  • ConsulServers: consul集群的地址
  • BasePath: 服务前缀。 如果有多个项目同时使用consul,避免命名冲突,可以设置这个参数,为当前的服务设置命名空间
  • Metrics: 用来更新服务的TPS
  • UpdateInterval: 服务的刷新间隔, 如果在一定间隔内(当前设为2 * UpdateInterval)没有刷新,服务就会从consul中删除

server

import (
    "flag"
    "fmt"
    "log"
    "time"

    metrics "github.com/rcrowley/go-metrics"
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
    "github.com/smallnest/rpcx/serverplugin"
)

var (
    addr       = flag.String("addr", "localhost:8972", "server address")
    consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
    basePath   = flag.String("base", "/rpcx_test", "prefix path")
)

func main() {
    flag.Parse()

    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    err := s.Serve("tcp", *addr)
    if err != nil {
        fmt.Println(err)
    }
}

func addRegistryPlugin(s *server.Server) {

    r := &serverplugin.ConsulRegisterPlugin{
        ServiceAddress: "tcp@" + *addr,
        ConsulServers:  []string{*consulAddr},
        BasePath:       *basePath,
        Metrics:        metrics.NewRegistry(),
        UpdateInterval: time.Minute,
    }
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r)
}

client

import (
    "context"
    "flag"
    "log"
    "time"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
    basePath   = flag.String("base", "/rpcx_test", "prefix path")
)

func main() {
    flag.Parse()

    d, _ := client.NewConsulDiscovery(*basePath, "Arith", []string{*consulAddr}, nil)
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    for {
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Printf("ERROR failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
        time.Sleep(1e9)
    }

}

mdns

mdns 即多播dns(Multicast DNS),mDNS主要实现了在没有传统DNS服务器的情况下使局域网内的主机实现相互发现和通信,使用的端口为5353,遵从dns协议,使用现有的DNS信息结构、名语法和资源记录类型。并且没有指定新的操作代码或响应代码。
在局域网中,设备和设备之前相互通信需要知道对方的ip地址的,大多数情况,设备的ip不是静态ip地址,而是通过dhcp 协议动态分配的ip 地址,如何设备发现呢,就是要mdns大显身手,例如:现在物联网设备和app之间的通信,要么app通过广播,要么通过组播,发一些特定信息,感兴趣设备应答,实现局域网设备的发现,当然服务也一样。
mDns协议规定了消息的基本格式和消息的收发的基本顺序,DNS-SD 协议在这基础上,首先对实例名,服务名称,域名长度/顺序等作出了具体的定义,然后规定了如何方便地进行服务发现和描述。
服务实例名称 = <服务实例>.<服务类型>.<域名>
服务实例一般由一个或多个标签组成,标签之间用 . 隔开。
服务类型表明该服务是使用什么协议实现的,由 _ 下划线和服务使用的协议名称组成,如大部分使用的 _tcp 协议,另外,可以同时使用多个协议标签,如: “_http._tcp” 就表明该服务类型使用了基于tcp的http协议。
域名一般都固定为 “local”
DNS-SD 协议使用了PTR、SRV、TXT 3种类型的资源记录来完整地描述了一个服务。当主机通过查询得到了一个PTR响应记录后,就获得了一个它所关心服务的实例名称,它可以同通过继续获取 SRV 和 TXT 记录来拿到进一步的信息。其中的 SRV 记录中有该服务对应的主机名和端口号。TXT 记录中有该服务的其他附加信息。

server

package main

import (
    "flag"
    "log"
    "time"

    metrics "github.com/rcrowley/go-metrics"
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
    "github.com/smallnest/rpcx/serverplugin"
)

var (
    addr     = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    flag.Parse()

    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")
    s.Serve("tcp", *addr)
}

func addRegistryPlugin(s *server.Server) {

    r := serverplugin.NewMDNSRegisterPlugin("tcp@"+*addr, 8972, metrics.NewRegistry(), time.Minute, "")
    err := r.Start()
    if err != nil {
        log.Fatal(err)
    }
    s.Plugins.Add(r)
}

client

package main

import (
    "context"
    "flag"
    "log"
    "time"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    basePath = flag.String("base", "/rpcx_test/Arith", "prefix path")
)

func main() {
    flag.Parse()

    d, _ := client.NewMDNSDiscovery("Arith", 10*time.Second, 10*time.Second, "")
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    reply := &example.Reply{}
    err := xclient.Call(context.Background(), "Mul", args, reply)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }

    log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}

Inprocess 进程内调用

这个Registry用于进程内的测试。 在开发过程中,可能不能直接连接线上的服务器直接测试,而是写一些mock程序作为服务,这个时候就可以使用这个registry, 测试通过在部署的时候再换成相应的其它registry.

在这种情况下, client和server并不会走TCP或者UDP协议,而是直接进程内方法调用,所以服务器代码是和client代码在一起的。

func main() {
    flag.Parse()

    s := server.NewServer()
    addRegistryPlugin(s)

    s.RegisterName("Arith", new(example.Arith), "")

    go func() {
        s.Serve("tcp", *addr)
    }()

    d := client.NewInprocessDiscovery()
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 100; i++ {

        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

    }
}

func addRegistryPlugin(s *server.Server) {

    r := client.InprocessClient
    s.Plugins.Add(r)
}
失败模式

在实际运行的过程中,你不能保证,一切运行正常。比如服务宕机、网速变慢、
rpcx 支持四种调用模式,用来处理服务失败后的处理逻辑,在client 客户端设置
FailModel的设置仅仅对同步调用有效(xclient.call),异步调用,这个参数无效

  • Failfast

在这种模式下,如果遇到错误,rpcx会立即返回错误,

client

var (
    addr1 = flag.String("addr1", "tcp@localhost:8972", "server1 address")
    addr2 = flag.String("addr2", "tcp@localhost:9981", "server2 address")
)
func main(){
  flag.Parse()
  d,_ := client.NewMultipleServersDiscovery([]*client.KVPair{{Key:*addr1},{Key:*addr2}})
  option := client.DefaultOption
  option.Retries = 10
  xclient := client.NewXClient("Arith",client.FailFast,client.RandomSelect,d,option)
defer xclient.Close()
}
  • Failover

遇到错误,他会尝试调用另一个节点,直到调用成功。或者达到最大的重试次数。重试次数retries在参数option中设置,缺省值为3

client

d,_ := client.NewMultipeServerDiscovery([]*client.KVPair{{Key:*addr1},{Key:*addr2}})
  option := client.DefaultOption
  option.Retries = 10
xclient := client.NewXClient("Arith",client.Failover,client.RandomSelect,d,option)
  • Failtry

如果一个节点出现错误,他会在当前的节点进行重试,知道这个节点返回正常或者达到最大重试次数

xclient := client.NewXClient("Arith",client.Failtry,client.RandomSelect,d,option)
  • Failbackup

在限定的时间内,当前节点没有返回信息,则会发送相同的请求到另一个节点,只要这个几个节点有信息返回,则rpcx就算掉用成功

client

xclient := client.NewXClient("Arith", client.Failbackup, client.RandomSelect, d, client.DefaultOption)
广播模式

Broadcast 是xclient 的一个方法,你可以将一个请求发送到这个服务的所有节点,如果都正常,则返回其中一个节点的的结果。如果有错误的话,则返回其中一个节点的错误。

client

package main

import (
    "context"
    "flag"
    "log"
    "time"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "tcp@localhost:8972", "server1 address")
    addr2 = flag.String("addr2", "tcp@localhost:9981", "server2 address")
)

func main() {
    flag.Parse()

    d, _ := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
    xclient := client.NewXClient("Arith", client.Failover, client.RoundRobin, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    for {
        reply := &example.Reply{}
        err := xclient.Broadcast(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)
        time.Sleep(1e9)
    }

}

路由

在大型的微服务系统中,我们会为同一个服务部署到多个节点中,以便服务可以支持大并发访问,他们可能部署到同一个数据中心的同一个节点中,也可能是多个数据中。

那么客户端如何选择呢?rpcx通过Selector 来实现路由选择,他像一个负载均衡器,帮助你 选择一个合适的节点。

rpcx提供了多个路由策略算法,你可以在创建XClient来指定。

注意,这里的路由是针对 ServicePath 和 ServiceMethod的路由。

  • 随机 random
    从配置的节点随机获取一个,但是不能保证均匀,只有在大的数量请求下,才能理论上分配均匀
client.RandomSelect
  • 轮循 roundrobin
    能保证每个节点都能被访问,但是每个节点配置抗压能力不一样的时候,就不太好了
client.Roundrobin
  • 权重 WeightedRoundRobin
    比如三个节点 a,b,c 权重分别是5,1,1 这个算法的调用顺序是{ a, a, b, a, c, a, a }, 相比较 { c, b, a, a, a, a, a }, 虽然权重都一样,但是前者更好,不至于在一段时间内将请求都发送给a。
    client
var (
    addr1 = flag.String("addr1", "tcp@localhost:8972", "server address")
    addr2 = flag.String("addr2", "tcp@localhost:8973", "server address")
)
func main(){
  d,_ := client.NewMultipleSeverDiscovery([]*client.KVPair{{Key:*addr1,Value:"weight=7"},{Key:*addr2,Value:"weight=4"}})
  xclient := client.NewXClient("Arith",client.Failtry,client.WeightRoundRobin,d,client.DefaultOption)
  defer xclient.Close()
}
  • 网络质量优先 WeightedICMP
    首先客户端会基于ping探测各个节点的网络质量,越短的ping的时间,这个节点的质量就越高。
client.WeightedICMP
  • 一致性哈希
    使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和 参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由就不准确了,所以在节点有变动的时候它会重新计算一致性哈希
client.ConsisitentHash
  • 地理位置优先 geo
    如果我们希望的是客户端会优先选择离它最新的节点, 比如在同一个机房。 如果客户端在北京, 服务在上海和美国硅谷,那么我们优先选择上海的机房。

它要求服务在注册的时候要设置它所在的地理经纬度。

如果两个服务的节点的经纬度是一样的, rpcx会随机选择一个。

package main

import (
    "context"
    "flag"
    "log"
    "time"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "tcp@localhost:8972", "server address")
    addr2 = flag.String("addr2", "tcp@localhost:8973", "server address")
)

func main() {
    flag.Parse()

    d, _ := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1, Value: "latitude=39.9289&longitude=116.3883"},
        {Key: *addr2, Value: "latitude=139.3453&longitude=23.3243"}})
    xclient := client.NewXClient("Arith", client.Failtry, client.ConsistentHash, d, client.DefaultOption)
    defer xclient.Close()
    xclient.ConfigGeoSelector(39.30, 116.40)

    args := &example.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &example.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}

超时

他可以保护服务防止进入无限的等待中。超时限定了服务的最长等待时间,如果给定的时间没有响应,服务调用就进入下一个状态,或者重试,或者返回错误。

package main

import (
    "context"
    "flag"
    "log"
    "time"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    flag.Parse()

    d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")

    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    reply := &example.Reply{}

    ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
    err := xclient.Call(ctx, "Mul", args, reply)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }
    cancelFn()

    log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}

元数据

客户端和服务端可以互相传递元数据。
元数据不是服务请求和服务响应的业务数据,而是一些辅助性的数据。
元数据是一个键值队的列表,键和值都是字符串

在客户端读取、发送服务端:share.ReqMetaDataKey
、share.ResMetaDataKey
反过来一样

server

import (
    "context"
    "flag"
    "fmt"
    "github.com/smallnest/rpcx/share"
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)
type Arith int

func (t *Arith) Mul(ctx context.Context,args *ex.Args,reply *ex.Reply) error{
  reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
  resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)
  resMeta["echo"] = "from server"
  reply.C = args.A * args.B
    return nil
}
func main(){
  flag.parse()
  s := server.NewServer()
  s.RegisterName("Arith",new(Arith),"")
  s.Server("tcp",*addr)
}

client

package main

import (
    "context"
    "flag"
    "log"

    "github.com/smallnest/rpcx/share"

    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/client"
)

var (
    addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    flag.Parse()

    d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")

    option := client.DefaultOption
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
    defer xclient.Close()

    args := &example.Args{
        A: 10,
        B: 20,
    }

    reply := &example.Reply{}
    ctx := context.WithValue(context.Background,share.ReqMetaDataKey,map[string]string{"aaa","from client"})
    ctx = context.WithValue(ctx, share.ResMetaDataKey, make(map[string]string))

    err := xclient.Call(ctx, "Mul", args, reply)
    if err != nil {
        log.Fatalf("failed to call: %v", err)
    }

    log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    log.Printf("received meta: %+v", ctx.Value(share.ResMetaDataKey))
}

心跳

你可以设置自动的心跳来保持连接不断掉。 rpcx会自动处理心跳(事实上它直接就丢弃了心跳)。

客户端需要启用心跳选项,并且设置心跳间隔:

option := client.DefaultOption
option.HeartBeat =   true
option. HeartBeatInterval=time.Second
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容