引言
之前大家是否遇到过"MOVED 1024 127.0.0.1"之类的错误或者proxy中的日志?
或者有没有想过Redis集群任意一节点都可以执行任何key的操作命令是如何实现的?
接下来我们来看下哈希槽和Redis集群的分片是如何匹配的?为何会出现moved这类的错误,一个Redis命令是如何在集群中路由和处理的。
Redis集群信息
这里的代码都是Redis的客户端代码,版本为"9.6.2"。
数据结构
集群状态
clusterState结构存储了整个集群的信息,这里可以看到有主从节点信息、哈希槽信息等结构,相当于一个全局的信息存储。
type clusterState struct{
nodes *clusterNodes
Masters []*clusterNode
Slaves []*clusterNode
slots []*clusterSlot
generation uint32
createdAt time.Time
}
哈希槽
clusterSlot记录了哈希槽的范围及所属node的地址(使用时取nodes首元素)。
type clusterSlot struct{
start, end int
nodes []*clusterNode
}
type clusterNode struct{
Client *Client
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
// last time the latency measurement was performed for the node, stored in nanoseconds// from epoch
lastLatencyMeasurement int64 // atomic
}
哈希tag
Redis的key和哈希槽的计算之前提到过,这里可以看下Key函数的计算会处理哈希tag。如果key为空则采用随机算法一次个节点去处理命令。
func Key(key string) string{
if s := strings.IndexByte(key, '{'); s > -1{
if e := strings.IndexByte(key[s+1:], '}'); e > 0{
return key[s+1 : s+e+1]
}
}
returnkey
}
func Slot(key string) int{
if key == ""{
returnRandomSlot()
}
key = Key(key)
return int(crc16sum(key)) % slotNumber
}
路由
根据哈希槽找对应的Node信息:匹配slot在该clusterSlot返回内则匹配成功,是不是也不复杂。大家可以看下其实客户端的源码或者说其他开源的中间件源码,代码都很简洁,这才是好代码一眼看穿。
func (c *clusterState) slotNodes(slot int) []*clusterNode {
i := sort.Search(len(c.slots), func(i int) bool{
returnc.slots[i].end >= slot
})
if i >= len(c.slots) {
return nil
}
x := c.slots[i]
ifslot >= x.start && slot <= x.end {
returnx.nodes
}
return nil
}
命令处理
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error{
slot := c.cmdSlot(ctx, cmd)
varnode *clusterNode
var moved bool
var ask bool
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
// MOVED and ASK responses are not transient errors that require retry delay; they
// should be attempted immediately.
if attempt > 0&& !moved && !ask {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil{
returnerr
}
}
if node == nil{
var err error
node, err = c.cmdNode(ctx, cmd.Name(), slot)
if err != nil{
returnerr
}
}
ifask {
ask = false
pipe := node.Client.Pipeline()
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx)
} else{
lastErr = node.Client.Process(ctx, cmd)
}
// If there is no error - we are done.
if lastErr == nil{
return nil
}
ifisReadOnly := isReadOnlyError(lastErr); isReadOnly || lastErr == pool.ErrClosed {
ifisReadOnly {
c.state.LazyReload()
}
node = nil
continue
}
var addr string
moved, ask, addr = isMovedError(lastErr)
ifmoved || ask {
c.state.LazyReload()
var err error
node, err = c.nodes.GetOrCreate(addr)
if err != nil{
returnerr
}
continue
}
returnlastErr
}
returnlastErr
}
上面是Redis集群客户端执行命令的大致流程:
计算命令对应的哈希槽
cmdNode函数会根据哈希槽找到对应的集群节点(或创建新节点)
如果slot正在迁移状态则需要向目标节点先执行asking命令再执行目标命令
如果不是则执行命令
对于命令的执行结果错误进行处理
只读错误或者是节点关闭则需要更新集群状态
然后循环再重试该命令的执行
Moved错误则更新集群状态然后重定向到目标node,重试执行。(如果重试次数为0则返回了Moved错误信息)
重试使用了指数级的延迟策略,细节了。
总结
redis集群采用去中心化的信息存储,每个节点都存储了全部的集群信息并采用最终一致和惰性加载的方式来提高访问效率。惰性加载的集群信息是客户端向服务端获取的,具体服务端如何同步和存储信息我们看Gossip协议时再分析~
更多精彩内容详见作者公众号: i技术之路