rpcx 的特性 简单易用 & 超快 & 高效 & 功能强大
1.简单易用
易于入门, 易于开发, 易于集成, 易于发布, 易于监控
2.高性能
性能远远高于 Dubbo、Motan、Thrift等框架,是gRPC性能的两倍
3.交叉平台,交叉语言
可以容易部署在Windows/Linux/MacOS等平台,支持各种编程语言的调用*
4.服务发现
除了直连外,还支持 Zookeeper、Etcd、 Consul、mDNS等注册中心
5.服务治理
支持 Failover、 Failfast、 Failtry、Backup等失败模式,支持 随机、 轮询、权重、网络质量, 一致性哈希,地理位置等路由算法
安装
对于初学者,我们不清楚用到哪些功能,索性就全部下下来
go get -u -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
tags 对应:
quic: 支持 quic 协议
kcp: 支持 kcp 协议
zookeeper: 支持 zookeeper 注册中心
etcd: 支持 etcd 注册中心
consul: 支持 consul 注册中心
ping: 支持 网络质量负载均衡
reuseport: 支持 reuseport
简单的例子
需要有三个文件 我们的结构文件(结构体+函数表达式)、服务端、客户端
有点像MVC的结构
结构:
import "context"
type Args {
A int
B int
}
type Reply{
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context,args *Args,reply *Reply) error{
reply.C = args.A*args.B
return nil
}
server
package main
import (
"flag"
"github.com/my/repo/application/ceshi/RPC/rpcx"
"github.com/smallnest/rpcx/server"
)
var (addr = flag.string("addr","lcoalhost:7777","sever addr"))
func main(){
flag.Parse()
s := sever. NewServer()
s.Register(new(rpcx.Arith),"")
s.Server("tcp",*addr)
}
client
package main
import (
"context"
"flag"
"github.com/my/repo/application/ceshi/RPC/rpcx"
"github.com/smallnest/rpcx/client"
"log"
)
var(
addr = flag.String("addr","localhost:7777","server addr")
)
func main(){
flag.Parse()
d,_ := client.NewPeer2PeerDiscovery("tcp@"+*addr,"")
xclient := client.NewXClient("Arith",client.Failtry,client.RondomSelect,d,client.DefaultOption)
defer xclient.close
args := &rpcx.Args{
A:10,
B:20,
}
reply := &rpcx.Reply{}
err := xclient.Call(context.Background(),"Mul",args,reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d + %d = %d", args.A, args.B, reply.C)
}
异步调用 Sever
d := client.NewPeer2PeerDiscovery("tcp@"+*add,"")
xclient := client.NewXClient("Arith",client.Failtry,client.RandomSelect,d,client.DefaultOption)
defer xclient.close()
args := &rpcx.Args{
A:10,
B:10,
}
reply := &rpcx.Reply{}
call,err := xclient.Go(context.Background(),"Mul",args,reply,nil)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
replyCall := <- call.Done
if replyCall.Error != nil {
log.Fatalf("failed to call: %v", replyCall.Error)
} else {
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
你必须使用 xclient.Go 来替换 xclient.Call, 然后把结果返回到一个channel里。你可以从chnanel里监听调用结果
通信传输类型
TCP
服务端使用tczuo wei做为网络名并在注册中心注册了名为 severName/tcp@ipaddr:port 的服务
s.Server("tcp",*addr)
客户端访问方式
d := client.NewPeer2PeerDiscovery("tcp@"+*addr,"")
xclient := client.NewXClient("Arith",client.Failtry,clinet.RandomSelect,client.DefaultOption)
defer xclient.Close()
HTTP
通过http发送给rpcx服务器,rpcx服务器会劫持这个链接然后将它作为tcp链接来使用。
Unixdomain
server
s.Server("unix",*addr)
client
var (
add = flag.String("addr","/tmp/rpcx.socket","server addr")
)
flag.Parse()
d,_ := client.NewPeer2PeerDiscovery("unix@"+*addr,"")
KPC
sever
//go run -tags kcp server.go
package main
import (
"crypto/sha1"
"flag"
"net"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
kcp "github.com/xtaci/kcp-go"
"golang.org/x/crypto/pbkdf2"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
const cryptKey = "rpcx-key"
const cryptSalt = "rpcx-salt"
func main() {
flag.Parse()
pass := pbkdf2.Key([]byte(cryptKey), []byte(cryptSalt), 4096, 32, sha1.New)
bc, err := kcp.NewAESBlockCrypt(pass)
if err != nil {
panic(err)
}
s := server.NewServer(server.WithBlockCrypt(bc))
s.RegisterName("Arith", new(example.Arith), "")
cs := &ConfigUDPSession{}
s.Plugins.Add(cs)
err = s.Serve("kcp", *addr)
if err != nil {
panic(err)
}
}
type ConfigUDPSession struct{}
func (p *ConfigUDPSession) HandleConnAccept(conn net.Conn) (net.Conn, bool) {
session, ok := conn.(*kcp.UDPSession)
if !ok {
return conn, true
}
session.SetACKNoDelay(true)
session.SetStreamMode(true)
return conn, true
}
client
//go run -tags kcp client.go
package main
import (
"context"
"crypto/sha1"
"flag"
"fmt"
"log"
"net"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
kcp "github.com/xtaci/kcp-go"
"golang.org/x/crypto/pbkdf2"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
const cryptKey = "rpcx-key"
const cryptSalt = "rpcx-salt"
func main() {
flag.Parse()
pass := pbkdf2.Key([]byte(cryptKey), []byte(cryptSalt), 4096, 32, sha1.New)
bc, _ := kcp.NewAESBlockCrypt(pass)
option := client.DefaultOption
option.Block = bc
d, _ := client.NewPeer2PeerDiscovery("kcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RoundRobin, d, option)
defer xclient.Close()
// plugin
cs := &ConfigUDPSession{}
pc := client.NewPluginContainer()
pc.Add(cs)
xclient.SetPlugins(pc)
args := &example.Args{
A: 10,
B: 20,
}
start := time.Now()
for i := 0; i < 10000; i++ {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
//log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
dur := time.Since(start)
qps := 10000 * 1000 / int(dur/time.Millisecond)
fmt.Printf("qps: %d call/s", qps)
}
type ConfigUDPSession struct{}
func (p *ConfigUDPSession) ConnCreated(conn net.Conn) (net.Conn, error) {
session, ok := conn.(*kcp.UDPSession)
if !ok {
return conn, nil
}
session.SetACKNoDelay(true)
session.SetStreamMode(true)
return conn, nil
}
函数为服务 将纯函数注册为服务
- 函数可以是导出也可以是不导出的 也就是没有限制了呗
- 接收三个参数,第一个context.Context,其他是不限制
- 第3个参数是一个指针
- 有一个 error 类型的返回值
服务端必须使用一个RegisterFunction 来注册一个函数并且提供一个服务名
type Args struct{
A:int,
B:int,
}
type Reply struct{
C int
}
func mul(ctx context.Context,args *Args,reply *Reply) error{
reply := args.A * args*B
return nil
}
func main(){
flag.Parse()
s := server.NewServer()
s.RegisterFunction("a.f.a",mul,"")
s.Server("tcp",*addr)
}
客户端可以通过服务名和函数名来调用
d := client.NewPeer2PeerDiscover("tcp@"+*addr,"")
xclient := client.NewXClient("a.f.a",client.Failtry,client.RandomSelect,client.DefaultOption)
注册中心
服务注册中心是用来实现服务发现和服务的元数据存储的
当前rpcx支持多种注册中心 并且支持进程内的注册中心,方便开发测试
rpcx会自动的将服务的信息比如服务名、监听地址、监听协议、权重等信息注册到注册中心,同时还会定时的将服务的吞吐率更新到注册中心
如果服务意外的中断或者宕机,注册中心能够监测到这个事件,他会通知客户端这个服务不可用,在服务调用的时候不再选择这个服务器
Peer2Peer
点对点最简单的注册方式
注意:rpcx使用network @ Host: port格式表示一项服务。在network 可以 tcp , http ,unix ,quic或kcp。该Host可以所主机名或IP地址。
MultipleServers 点对多的注册方式
多台服务的时候
sever
import (
"flag"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var (
addr1 = flag.String("addr1", "localhost:8972", "server1 address")
addr2 = flag.String("addr2", "localhost:9981", "server2 address")
)
func main(){
flag.Parser()
go createServer(addr1)
go createServer(addr2)
}
func createServer(addr String){
s:= server.NewServer()
s.RegisterName("Arith",new(explre.Arith),"")
s.Server("tcp",addr)
}
client
var (
addr1 = flag.String("addr1", "tcp@localhost:8972", "server1 address")
addr2 = flag.String("addr2", "tcp@localhost:9981", "server2 address")
)
func main(){
flag.Parser()
d := client.NewMultipleServerDiscover([]*client.KVPair{{Key:*addr1},{Key:*addr2}})
xclient = client.NewXClient("Arith",client.Failtry,client.RoundRobin,client.DefaultOption)
defer xclient.close()
}
Zookeeper
主要配置参数:
- ServerAddress:本机的监听地址,对外的暴漏的监听地址,格式:tcp@ipadderess:port
- zookeeperServer:zk的集群地址
- basePath: 服务前缀。如果有多个项目同时使用zookeeper,避免命名冲突,可以设置这个参数为当前的服务设置命名空间
- Metrics:用来更新服务的TPS
*updateInterval: 服务的刷新间隔,如果在一定的时间内没有刷新,服务就会从zookeeper中删除
server
import (
"flag"
"log"
"time"
metrics "github.com/rcrowley/go-metrics"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
"github.com/smallnest/rpcx/serverplugin"
)
var (
addr = flag.String("addr","localhost:7777","server addr")
zkAddr = flag.String("zkaddr","lcoalhost:7771","zk addr")
basePath = flag.String("base","/rpcx_test","prefix path")
)
func main(){
flag.Parser()
addRegisterPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
s.Server("tcp",*addr)
}
func addRegisterPlugin(s *server.Server){
r := &serverplugin.zookeeperReginsterPluin{
ServerAddress:"tcp@"+*addr,
ZookeeperServers:[]string{*zkAddr},
BasePath:*basePath,
Metrics:metrics.NewRegistry(),
UpdateInterval:time.Minute,
}
err := r.start()
if err != nil {
log.Fatal(err)
}
s.Plugins.add(r)
}
client
var (
zkAddr = flag.String("zkAddr", "localhost:2181", "zookeeper address")
basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func main(){
flag.Parser()
d,_ := client.NewZookeeperDiscover(*basePath,"Arith",[]string{*zkAddr},nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
}
Etcd
主要配置几个参数:
- ServiceAddress: 本机的监听地址, 这个对外暴露的监听地址, 格式为tcp@ipaddress:port
- EtcdServers: etcd集群的地址
- BasePath: 服务前缀。 如果有多个项目同时使用zookeeper,避免命名冲突,可以设置这个参数,为当前的服务设置命名空间
- Metrics: 用来更新服务的TPS
- UpdateInterval: 服务的刷新间隔, 如果在一定间隔内(当前设为2 * UpdateInterval)没有刷新,服务就会从etcd中删除
插件必须在注册服务之前添加到Server中,否则插件没有办法获取注册的服务的信息
server
var (
addr = flag.String("addr", "localhost:8972", "server address")
etcdAddr = flag.String("etcdAddr", "localhost:2379", "etcd address")
basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func addRegistryPlugin(s *server.Server) {
r := &serverplugin.EtcdRegisterPlugin{
ServiceAddress: "tcp@" + *addr,
EtcdServers: []string{*etcdAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
go s.Serve("tcp", *addr)
time.Sleep(time.Minute)
err := s.UnregisterAll()
if err != nil {
panic(err)
}
}
client
import (
"context"
"flag"
"log"
"time"
etcd_client "github.com/rpcxio/rpcx-etcd/client"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
etcdAddr = flag.String("etcdAddr", "localhost:2379", "etcd address")
basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func main() {
flag.Parse()
d, _ := etcd_client.NewEtcdDiscovery(*basePath, "Arith", []string{*etcdAddr}, false, nil)
xclient := client.NewXClient("Arith", client.Failover, client.RoundRobin, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Printf("failed to call: %v\n", err)
time.Sleep(5 * time.Second)
continue
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(5 * time.Second)
}
}
Consul
特性:
- 服务发现: Consul提供了通过DNS或者HTTP接口的方式来注册服务和发现服务。一些外部的服务通过Consul很容易的找到它所依赖的服务。
- 健康检测: Consul的Client提供了健康检查的机制,可以通过用来避免流量被转发到有故障的服务上。
- Key/Value存储: 应用程序可以根据自己的需要使用Consul提供的Key/Value存储。 Consul提供了简单易用的HTTP接口,结合其他工具可以实现动态配置、功能标记、领袖选举等等功能。
- 多数据中心: Consul支持开箱即用的多数据中心. 这意味着用户不需要担心需要建立额外的抽象层让业务扩展到多个区域。
主要配置几个参数:
- ServiceAddress: 本机的监听地址, 这个对外暴露的监听地址, 格式为tcp@ipaddress:port
- ConsulServers: consul集群的地址
- BasePath: 服务前缀。 如果有多个项目同时使用consul,避免命名冲突,可以设置这个参数,为当前的服务设置命名空间
- Metrics: 用来更新服务的TPS
- UpdateInterval: 服务的刷新间隔, 如果在一定间隔内(当前设为2 * UpdateInterval)没有刷新,服务就会从consul中删除
server
import (
"flag"
"fmt"
"log"
"time"
metrics "github.com/rcrowley/go-metrics"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
"github.com/smallnest/rpcx/serverplugin"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
err := s.Serve("tcp", *addr)
if err != nil {
fmt.Println(err)
}
}
func addRegistryPlugin(s *server.Server) {
r := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + *addr,
ConsulServers: []string{*consulAddr},
BasePath: *basePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
client
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
basePath = flag.String("base", "/rpcx_test", "prefix path")
)
func main() {
flag.Parse()
d, _ := client.NewConsulDiscovery(*basePath, "Arith", []string{*consulAddr}, nil)
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Printf("ERROR failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(1e9)
}
}
mdns
mdns 即多播dns(Multicast DNS),mDNS主要实现了在没有传统DNS服务器的情况下使局域网内的主机实现相互发现和通信,使用的端口为5353,遵从dns协议,使用现有的DNS信息结构、名语法和资源记录类型。并且没有指定新的操作代码或响应代码。
在局域网中,设备和设备之前相互通信需要知道对方的ip地址的,大多数情况,设备的ip不是静态ip地址,而是通过dhcp 协议动态分配的ip 地址,如何设备发现呢,就是要mdns大显身手,例如:现在物联网设备和app之间的通信,要么app通过广播,要么通过组播,发一些特定信息,感兴趣设备应答,实现局域网设备的发现,当然服务也一样。
mDns协议规定了消息的基本格式和消息的收发的基本顺序,DNS-SD 协议在这基础上,首先对实例名,服务名称,域名长度/顺序等作出了具体的定义,然后规定了如何方便地进行服务发现和描述。
服务实例名称 = <服务实例>.<服务类型>.<域名>
服务实例一般由一个或多个标签组成,标签之间用 . 隔开。
服务类型表明该服务是使用什么协议实现的,由 _ 下划线和服务使用的协议名称组成,如大部分使用的 _tcp 协议,另外,可以同时使用多个协议标签,如: “_http._tcp” 就表明该服务类型使用了基于tcp的http协议。
域名一般都固定为 “local”
DNS-SD 协议使用了PTR、SRV、TXT 3种类型的资源记录来完整地描述了一个服务。当主机通过查询得到了一个PTR响应记录后,就获得了一个它所关心服务的实例名称,它可以同通过继续获取 SRV 和 TXT 记录来拿到进一步的信息。其中的 SRV 记录中有该服务对应的主机名和端口号。TXT 记录中有该服务的其他附加信息。
server
package main
import (
"flag"
"log"
"time"
metrics "github.com/rcrowley/go-metrics"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
"github.com/smallnest/rpcx/serverplugin"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
s.Serve("tcp", *addr)
}
func addRegistryPlugin(s *server.Server) {
r := serverplugin.NewMDNSRegisterPlugin("tcp@"+*addr, 8972, metrics.NewRegistry(), time.Minute, "")
err := r.Start()
if err != nil {
log.Fatal(err)
}
s.Plugins.Add(r)
}
client
package main
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
basePath = flag.String("base", "/rpcx_test/Arith", "prefix path")
)
func main() {
flag.Parse()
d, _ := client.NewMDNSDiscovery("Arith", 10*time.Second, 10*time.Second, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
Inprocess 进程内调用
这个Registry用于进程内的测试。 在开发过程中,可能不能直接连接线上的服务器直接测试,而是写一些mock程序作为服务,这个时候就可以使用这个registry, 测试通过在部署的时候再换成相应的其它registry.
在这种情况下, client和server并不会走TCP或者UDP协议,而是直接进程内方法调用,所以服务器代码是和client代码在一起的。
func main() {
flag.Parse()
s := server.NewServer()
addRegistryPlugin(s)
s.RegisterName("Arith", new(example.Arith), "")
go func() {
s.Serve("tcp", *addr)
}()
d := client.NewInprocessDiscovery()
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for i := 0; i < 100; i++ {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
}
func addRegistryPlugin(s *server.Server) {
r := client.InprocessClient
s.Plugins.Add(r)
}
失败模式
在实际运行的过程中,你不能保证,一切运行正常。比如服务宕机、网速变慢、
rpcx 支持四种调用模式,用来处理服务失败后的处理逻辑,在client 客户端设置
FailModel的设置仅仅对同步调用有效(xclient.call),异步调用,这个参数无效
- Failfast
在这种模式下,如果遇到错误,rpcx会立即返回错误,
client
var (
addr1 = flag.String("addr1", "tcp@localhost:8972", "server1 address")
addr2 = flag.String("addr2", "tcp@localhost:9981", "server2 address")
)
func main(){
flag.Parse()
d,_ := client.NewMultipleServersDiscovery([]*client.KVPair{{Key:*addr1},{Key:*addr2}})
option := client.DefaultOption
option.Retries = 10
xclient := client.NewXClient("Arith",client.FailFast,client.RandomSelect,d,option)
defer xclient.Close()
}
- Failover
遇到错误,他会尝试调用另一个节点,直到调用成功。或者达到最大的重试次数。重试次数retries在参数option中设置,缺省值为3
client
d,_ := client.NewMultipeServerDiscovery([]*client.KVPair{{Key:*addr1},{Key:*addr2}})
option := client.DefaultOption
option.Retries = 10
xclient := client.NewXClient("Arith",client.Failover,client.RandomSelect,d,option)
- Failtry
如果一个节点出现错误,他会在当前的节点进行重试,知道这个节点返回正常或者达到最大重试次数
xclient := client.NewXClient("Arith",client.Failtry,client.RandomSelect,d,option)
- Failbackup
在限定的时间内,当前节点没有返回信息,则会发送相同的请求到另一个节点,只要这个几个节点有信息返回,则rpcx就算掉用成功
client
xclient := client.NewXClient("Arith", client.Failbackup, client.RandomSelect, d, client.DefaultOption)
广播模式
Broadcast 是xclient 的一个方法,你可以将一个请求发送到这个服务的所有节点,如果都正常,则返回其中一个节点的的结果。如果有错误的话,则返回其中一个节点的错误。
client
package main
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
addr1 = flag.String("addr1", "tcp@localhost:8972", "server1 address")
addr2 = flag.String("addr2", "tcp@localhost:9981", "server2 address")
)
func main() {
flag.Parse()
d, _ := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
xclient := client.NewXClient("Arith", client.Failover, client.RoundRobin, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for {
reply := &example.Reply{}
err := xclient.Broadcast(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(1e9)
}
}
路由
在大型的微服务系统中,我们会为同一个服务部署到多个节点中,以便服务可以支持大并发访问,他们可能部署到同一个数据中心的同一个节点中,也可能是多个数据中。
那么客户端如何选择呢?rpcx通过Selector 来实现路由选择,他像一个负载均衡器,帮助你 选择一个合适的节点。
rpcx提供了多个路由策略算法,你可以在创建XClient来指定。
注意,这里的路由是针对 ServicePath 和 ServiceMethod的路由。
- 随机 random
从配置的节点随机获取一个,但是不能保证均匀,只有在大的数量请求下,才能理论上分配均匀
client.RandomSelect
- 轮循 roundrobin
能保证每个节点都能被访问,但是每个节点配置抗压能力不一样的时候,就不太好了
client.Roundrobin
- 权重 WeightedRoundRobin
比如三个节点 a,b,c 权重分别是5,1,1 这个算法的调用顺序是{ a, a, b, a, c, a, a }, 相比较 { c, b, a, a, a, a, a }, 虽然权重都一样,但是前者更好,不至于在一段时间内将请求都发送给a。
client
var (
addr1 = flag.String("addr1", "tcp@localhost:8972", "server address")
addr2 = flag.String("addr2", "tcp@localhost:8973", "server address")
)
func main(){
d,_ := client.NewMultipleSeverDiscovery([]*client.KVPair{{Key:*addr1,Value:"weight=7"},{Key:*addr2,Value:"weight=4"}})
xclient := client.NewXClient("Arith",client.Failtry,client.WeightRoundRobin,d,client.DefaultOption)
defer xclient.Close()
}
- 网络质量优先 WeightedICMP
首先客户端会基于ping探测各个节点的网络质量,越短的ping的时间,这个节点的质量就越高。
client.WeightedICMP
- 一致性哈希
使用 JumpConsistentHash 选择节点, 相同的servicePath, serviceMethod 和 参数会路由到同一个节点上。 JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由就不准确了,所以在节点有变动的时候它会重新计算一致性哈希
client.ConsisitentHash
- 地理位置优先 geo
如果我们希望的是客户端会优先选择离它最新的节点, 比如在同一个机房。 如果客户端在北京, 服务在上海和美国硅谷,那么我们优先选择上海的机房。
它要求服务在注册的时候要设置它所在的地理经纬度。
如果两个服务的节点的经纬度是一样的, rpcx会随机选择一个。
package main
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
addr1 = flag.String("addr1", "tcp@localhost:8972", "server address")
addr2 = flag.String("addr2", "tcp@localhost:8973", "server address")
)
func main() {
flag.Parse()
d, _ := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1, Value: "latitude=39.9289&longitude=116.3883"},
{Key: *addr2, Value: "latitude=139.3453&longitude=23.3243"}})
xclient := client.NewXClient("Arith", client.Failtry, client.ConsistentHash, d, client.DefaultOption)
defer xclient.Close()
xclient.ConfigGeoSelector(39.30, 116.40)
args := &example.Args{
A: 10,
B: 20,
}
for i := 0; i < 10; i++ {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(time.Second)
}
}
超时
他可以保护服务防止进入无限的等待中。超时限定了服务的最长等待时间,如果给定的时间没有响应,服务调用就进入下一个状态,或者重试,或者返回错误。
package main
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
reply := &example.Reply{}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
err := xclient.Call(ctx, "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
cancelFn()
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}
元数据
客户端和服务端可以互相传递元数据。
元数据不是服务请求和服务响应的业务数据,而是一些辅助性的数据。
元数据是一个键值队的列表,键和值都是字符串
在客户端读取、发送服务端:share.ReqMetaDataKey
、share.ResMetaDataKey
反过来一样
server
import (
"context"
"flag"
"fmt"
"github.com/smallnest/rpcx/share"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
type Arith int
func (t *Arith) Mul(ctx context.Context,args *ex.Args,reply *ex.Reply) error{
reqMeta := ctx.Value(share.ReqMetaDataKey).(map[string]string)
resMeta := ctx.Value(share.ResMetaDataKey).(map[string]string)
resMeta["echo"] = "from server"
reply.C = args.A * args.B
return nil
}
func main(){
flag.parse()
s := server.NewServer()
s.RegisterName("Arith",new(Arith),"")
s.Server("tcp",*addr)
}
client
package main
import (
"context"
"flag"
"log"
"github.com/smallnest/rpcx/share"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
option := client.DefaultOption
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, option)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
reply := &example.Reply{}
ctx := context.WithValue(context.Background,share.ReqMetaDataKey,map[string]string{"aaa","from client"})
ctx = context.WithValue(ctx, share.ResMetaDataKey, make(map[string]string))
err := xclient.Call(ctx, "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
log.Printf("received meta: %+v", ctx.Value(share.ResMetaDataKey))
}
心跳
你可以设置自动的心跳来保持连接不断掉。 rpcx会自动处理心跳(事实上它直接就丢弃了心跳)。
客户端需要启用心跳选项,并且设置心跳间隔:
option := client.DefaultOption
option.HeartBeat = true
option. HeartBeatInterval=time.Second