Rpcx源码之路由(Selector)

一、路由

一般在大型的微服务系统中,会为同一个服务部署到多个节点, 以便服务能够支持大并发的访问。可能部署在同一个数据中心的多个节点,或者多个数据中心。
那么,在rpcx来完成service调用时,该如何将求请求交给对应的服务节点来完成,在rpcx中通过 Selector来实现路由选择, 很像一个负载均衡器,来选择出一个合适的节点。
在rpcx提供了多个路由策略算法,可以在创建XClient来指定。

注意,在Rpcx的路由是针对 ServicePath 和 ServiceMethod的路由。

二、路由策略

1、[Random]随机策略: 从指定的服务节点集合中随机选择一个节点。

这也是最简单的,可能会导致性能、资源的那个节点的负载较重。主要由于该策略只能保证在大量的请求下路由的比较均匀,并不能保证在很短的时间内负载是均匀的。

random-client.go源码
package main

import (
    "context"
    "flag"
    "log"
    "github.com/smallnest/rpcx/client"
    "rpcx/examples/models"
    "time"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
    xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    defer xclient.Close()

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &models.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}
random-server.go源码
package main

import (
    "flag"
    "rpcx/examples/models"
    "github.com/smallnest/rpcx/server"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.Arith), "")
        s.Serve("tcp", *addr1)
    }()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.PBArith), "")  // 注意由于在本地测试没有使用注册中心 rcvr采用不用的类型 只是都具备Mul方法
        s.Serve("tcp", *addr2)
    }()

    select {}
}

2、[Roundrobin] 轮训策略

通过轮询的方式来依次调用节点,能保证每个节点都均匀的被访问。该路由策略常用在节点的服务能力都差不多的情况下

roundrobin-client.go源码
package main

import (
    "context"
    "flag"
    "log"
    "rpcx/examples/models"
    "time"

    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "tcp@localhost:8972", "server address")
    addr2 = flag.String("addr2", "tcp@localhost:8973", "server address")
)

func main() {
    flag.Parse()

    d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
    xclient := client.NewXClient("Arith", client.Failtry, client.RoundRobin, d, client.DefaultOption)
    defer xclient.Close()

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &models.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}
roundrobin-server.go源码
package main

import (
    "flag"
    "rpcx/examples/models"

    "github.com/smallnest/rpcx/server"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.Arith), "")
        s.Serve("tcp", *addr1)
    }()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.PBArith), "")
        s.Serve("tcp", *addr2)
    }()

    select {}
}

3、[WeightedRoundRobin] 平滑加权轮训策略:使用Nginx采用的[平滑加权的轮询算法]

例如当前有三个服务节点a、b、c的权重是{ 5, 1, 1 }, 最佳调用顺序是 { a, a, b, a, c, a, a }, 类比像 { c, b, a, a, a, a, a }这样的调用顺序比较来说权重虽一样,但相对调用负载来说前者更好,不至于在一段时间内将请求都发送给a。附上实现代码(仅用参考)

int matchedIndex = -1; // 代表轮训后选中执行服务的节点
int total = 0;
for (int i = 0; i < servers.Length; i++)
{
      servers[i].cur_weight += servers[i].weight;   //1、每次循环的时候做自增(步长=权重值)
      total += servers[i].weight;                      //2、将每个节点的权重值累加到汇总值中

    //3、如果 当前节点的自增数 > 当前待返回节点的自增数,则覆盖。
      if (matchedIndex == -1 || servers[matchedIndex].cur_weight < servers[i].cur_weight)
      {
            matchedIndex = i;
      }
}
//3被选取的节点减去2的汇总值,以降低下一次被选举时的初始权重值。
servers[matchedIndex].cur_weight -= total;
return servers[matchedIndex];
weighted-client.go源码
package main

import (
    "context"
    "flag"
    "log"
    "rpcx/examples/models"
    "time"

    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1, Value: "weight=7"}, 
    {Key: *addr2, Value: "weight=3"}})
    xclient := client.NewXClient("Arith", client.Failtry, client.WeightedRoundRobin, d, client.DefaultOption)
    defer xclient.Close()

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &models.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}
weighted-server.go源码
package main

import (
    "flag"
    "rpcx/examples/models"

    "github.com/smallnest/rpcx/server"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.Arith), "weight=7")
        s.Serve("tcp", *addr1)
    }()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.PBArith), "weight=3")
        s.Serve("tcp", *addr2)
    }()

    select {}
}

4、[网络质量优先]

客户端会基于ping(ICMP)探测各个节点的网络质量,越短的ping时间,对应服务节点的权重也就越高。同时,也会保证网络较差的节点也有被调用的机会。
例如:假定t是ping的返回时间, 如果超过1秒基本就没有调用机会了:
weight=191 if t <= 10
weight=201 -t if 10 < t <=200
weight=1 if 200 < t < 1000
weight=0 if t >= 1000

ping-client.go源码
package main

import (
    "context"
    "flag"
    "log"
    "rpcx/examples/models"
    "time"

    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "baidu.com:8080", "server address")
)

func main() {
    flag.Parse()

    d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
    xclient := client.NewXClient("Arith", client.Failtry, client.WeightedICMP, d, client.DefaultOption)
    defer xclient.Close()

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &models.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}
ping-server.go源码
package main
import (
    "flag"
    "rpcx/examples/models"

    "github.com/smallnest/rpcx/server"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "baidu.com:80", "server address")
)

func main() {
    flag.Parse()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.Arith), "weight=7")
        s.Serve("tcp", *addr1)
    }()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.PBArith), "weight=3")
        s.Serve("tcp", *addr2)
    }()

    select {}
}

5、[一致性哈希]

该策略是使用 [JumpConsistentHash]选择节点, 使得相同的servicePath, serviceMethod 和 参数会路由到同一个节点上。
JumpConsistentHash 是一个快速计算一致性哈希的算法,但是有一个缺陷是它不能删除节点,如果删除节点,路由就不准确了,所以在节点有变动的时候它会重新计算一致性哈希。

consistent-client.go源码
待定
consistent-server.go源码
待定

6、[地理位置优先]

在实际的使用过程中可能会碰到,希望client端就近获取对应的服务节点来完成服务调用, 例如在同一个服务分别提供了上海和美国硅谷服务中心,若是客户端在北京,那么则希望优先选择上海的机房,而非美国硅谷的。在rpcx中也提供了类似路由策略,不过要求服务在注册的时候要设置它所在的地理经纬度。若两个服务的节点的经纬度是一样的, rpcx会随机选择一个。
** 必须使用下面的方法配置client的经纬度信息**:

func (c *xClient) ConfigGeoSelector(latitude, longitude float64) 
geo-client.go源码
package main

import (
    "context"
    "flag"
    "log"
    "rpcx/examples/models"
    "time"

    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1, Value: "latitude=39.9289&longitude=116.3883"},
        {Key: *addr2, Value: "latitude=139.3453&longitude=23.3243"}})
    xclient := client.NewXClient("Arith", client.Failtry, client.ConsistentHash, d, client.DefaultOption)
    defer xclient.Close()
    xclient.ConfigGeoSelector(39.30, 116.40)

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &models.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}
geo-server.go源码
package main

import (
    "flag"
    "rpcx/examples/models"

    "github.com/smallnest/rpcx/server"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.Arith), "weight=7")
        s.Serve("tcp", *addr1)
    }()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.PBArith), "weight=3")
        s.Serve("tcp", *addr2)
    }()

    select {}
}

三、自定义路由策略

在上面内置的路由规则不满足用户的实际需求,则可以参考上面的路由器来实现自己的路由规则;
1、首先实现Selector接口

type Selector interface {
    Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
    UpdateServer(servers map[string]string)
}

2、设置自定义的Selector
xclient.SetSelector(&alwaysFirstSelector{})

3、构建xclient指定自定义的selector策略
xclient := client.NewXClient("Arith", client.Failtry, client.SelectByUser, d, client.DefaultOption)

自定义路由策略实现源码:位于user-client.go文件中
// 自定义selector
type alwaysFirstSelector struct { // 实现Selector接口
    servers []string
}

// 路由选择服务节点算法
func (s *alwaysFirstSelector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
    var ss = s.servers
    if len(ss) == 0 {
        return ""
    }

    return ss[0]
}

// 更新新的服务节点算法
func (s *alwaysFirstSelector) UpdateServer(servers map[string]string) {
    var ss = make([]string, 0, len(servers))
    for k := range servers {
        ss = append(ss, k)
    }

    sort.Slice(ss, func(i, j int) bool {
        return strings.Compare(ss[i], ss[j]) <= 0
    })
    s.servers = ss
}
user-client.go源码
package main

import (
    "context"
    "flag"
    "log"
    "rpcx/examples/models"
    "sort"
    "strings"
    "time"

    "github.com/smallnest/rpcx/client"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key: *addr1}, {Key: *addr2}})
    xclient := client.NewXClient("Arith", client.Failtry, client.SelectByUser, d, client.DefaultOption)
    defer xclient.Close()

    xclient.SetSelector(&alwaysFirstSelector{})

    args := &models.Args{
        A: 10,
        B: 20,
    }

    for i := 0; i < 10; i++ {
        reply := &models.Reply{}
        err := xclient.Call(context.Background(), "Mul", args, reply)
        if err != nil {
            log.Fatalf("failed to call: %v", err)
        }

        log.Printf("%d * %d = %d", args.A, args.B, reply.C)

        time.Sleep(time.Second)
    }

}
user-server.go源码
package main

import (
    "flag"
    "rpcx/examples/models"

    "github.com/smallnest/rpcx/server"
)

var (
    addr1 = flag.String("addr1", "localhost:8972", "server address")
    addr2 = flag.String("addr2", "localhost:8973", "server address")
)

func main() {
    flag.Parse()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.Arith), "")
        s.Serve("tcp", *addr1)
    }()

    go func() {
        s := server.NewServer()
        s.RegisterName("Arith", new(models.PBArith), "")
        s.Serve("tcp", *addr2)
    }()

    select {}
}

四、其他

通过增加路由策略能够提高服务对外的性能,保证来支撑更大的业务并发需求。上述所罗列的只是写常用的,也可参考nginx这些软件提供的路由算法。
select源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容