一致性hash原理 ---go简单实现
1. 什么是一致性hash
https://www.cnblogs.com/lpfuture/p/5796398.html
参考阅读 ,可以用于解决 多台服务器进行数据缓存是key的分配问题,如何使key更均匀的分配到不同的服务器上,并且当服务器进行增加或减少时,数据影响更小
2. go简单实现
/*
@Time : 2022/2/22 2:19 下午
@Author : yuyunqing
@File : consistenthashing
@Software: GoLand
*/
package main
import (
"crypto/md5"
"encoding/hex"
"log"
"strconv"
)
//数据节点node 用于存放hash数据
type Node struct {
ip string
count int
data map[string]string
}
type InventedNode struct {
key string
name string
node *Node
next *InventedNode
}
var ip_list []string
var ip_has_more []string //含有多个机器节点数据的ip
var min_hash *InventedNode
var max_hash *InventedNode
//简单实现一致性hash存储逻辑
//虚拟节点实现逻辑:
//少于三个机器节点时 每一个机器节点 增加 三个虚拟节点,
//少于三个机器节点时 每增加一个机器节点 实际增加一个虚拟节点
/**
* @Author yuyunqing
* @Description //TODO 机器注册
* @Date 6:16 下午 2022/2/22
**/
func reg(ip , name string) {
for _, s := range ip_list {
if s == ip {
log.Println("该机器已经注册",ip)
}
}
node := createNode(ip)
//定义生成虚拟节点个数
new_InventedNode_num := 1
//检测机器节点数量
if len(ip_list) < 3 {
new_InventedNode_num = 3
//将含有多个虚拟节点的ip写入
ip_has_more = append(ip_has_more, ip)
}
//将ip 放入列表中
ip_list = append(ip_list, ip)
for i := 0; i < new_InventedNode_num; i++ {
index := strconv.Itoa(i)
//根据下标生成新的虚拟节点
newInv := node.createInventedNode(ip+"#"+index , name+"#"+index)
newInv.setInventedInCycleLink()
}
log.Println(ip_list)
}
/**
* @Author yuyunqing
* @Description //TODO 根据ip地址创建一个机器节点
* @Date 6:20 下午 2022/2/22
**/
func createNode(ip string) *Node {
return &Node{
ip:ip,
count:0,
data: map[string]string{},
}
}
/**
* @Author yuyunqing
* @Description //TODO 根据机器节点生成虚拟节点
* @Date 6:35 下午 2022/2/22
**/
func(n *Node) createInventedNode(ip , name string) *InventedNode {
return &InventedNode{
key:getMD5String([]byte(ip)),
name: name,
node :n ,
next: nil,
}
}
/**
* @Author yuyunqing
* @Description //TODO 将虚拟节点写入环形链表
* @Date 6:39 下午 2022/2/22
**/
func (i *InventedNode) setInventedInCycleLink() {
//当最小hash值为空时,将节点插入 并自身成环
if min_hash == nil {
min_hash = i
i.next = i
max_hash = i
}else{
if i.key < min_hash.key { //如果i的hash 比 最小的hash小
i.next = min_hash
max_hash.next = i
min_hash = i
} else if i.key > max_hash.key { //如果i的hash 比 最小的hash小
i.next = min_hash
max_hash.next = i
max_hash = i
} else {
befor := getInventedNodeByhash(i.key)
i.next = befor.next
befor.next = i
}
//进行数据拷贝
i.Copy("new")
}
}
/**
* @Author yuyunqing
* @Description //TODO 删除机器节点
* @Date 11:57 上午 2022/2/23
**/
func del(ip string) string {
//判断 ip 是否注册
has_more := -1
ip_list_index := -1
for i, s := range ip_list {
if s==ip{
ip_list_index = i
}
}
for i, s := range ip_has_more {
if s==ip{
has_more = i
}
}
//机器节点不存在
if ip_list_index == -1 {
return "ip未注册"
}else{
//从列表中删除ip
ip_list = append(ip_list[:ip_list_index] ,ip_list[ip_list_index:]...)
has_Invented_count := 1
//判断是否存在多个虚拟节点
if has_more != -1 {
has_Invented_count =3
//从列表中删除ip
ip_has_more = append(ip_has_more[:ip_list_index] ,ip_has_more[ip_list_index:]...)
}
for i := 0; i < has_Invented_count; i++ {
index := strconv.Itoa(i)
//根据下标生成新的虚拟节点
hash := getMD5String([]byte(ip+"#"+index))
//获取删除节点的前置节点
i := getInventedNodeByhash(hash)
i.delInventedNodeBybefore()
}
return "删除节点 IP:" +ip +",虚拟节点个数:"+strconv.Itoa(has_Invented_count)
}
}
/**
* @Author yuyunqing
* @Description //TODO 根据前置节点删除虚拟节点
* @Date 12:07 下午 2022/2/23
**/
func (i *InventedNode) delInventedNodeBybefore() {
if i.next.key == min_hash.key {
//如果删除的节点是 最小节点 那么将最小节点至下一个
min_hash = i.next.next
}
if i.next.key == max_hash.key {
//如果删除节点是 最大节点,将最大节点至上一个
max_hash = i
}
//将删除节点的数据拷贝到 下一个节点
i.next.Copy("del")
i.next = i.next.next
}
/**
* @Author yuyunqing
* @Description //TODO 根据hash 获取他对应的上一个虚拟节点
* @Date 10:06 上午 2022/2/23
**/
func getInventedNodeByhash( key string) *InventedNode {
//从最小值开始循环
i_info := min_hash
for i_info.next.key != min_hash.key {
if i_info.next.key >= key {
return i_info
}else{
i_info = i_info.next
}
}
return i_info
}
/**
* @Author yuyunqing
* @Description //TODO 节点新增删除时进行数据拷贝
* @Date 10:27 上午 2022/2/23
**/
func (i *InventedNode)Copy(type_ string) {
//新增和删除拷贝方式不同 ,
//新增将后置节点数据 拷贝至当前节点,
//删除将当前节点数据拷贝至后置节点
if type_ == "new" {
i.node.NodeCopy(i.next.node)
}
if type_ == "del" {
i.next.node.NodeCopy(i.node)
}
}
/**
* @Author yuyunqing
* @Description //TODO 将参数中得机器节点数据,拷贝到当前机器节点
* @Date 10:46 上午 2022/2/23
**/
func (n *Node)NodeCopy(node *Node) {
//如果IP相同不存在数据拷贝问题
if n.ip == node.ip {
log.Println("相同机器节点数据拷贝 ip:" , n.ip)
}
for key, val := range node.data {
n.setData(key,val)
}
}
/**
* @Author yuyunqing
* @Description //TODO 用于展示虚拟节点环信息
* @Date 11:43 上午 2022/2/23
**/
func loop() {
info_i := min_hash
for info_i.key != max_hash.key {
log.Println(info_i.name ,"->" ,info_i.next.name)
info_i = info_i.next
}
log.Println(info_i.name ,"->" ,info_i.next.name)
}
/**
* @Author yuyunqing
* @Description //TODO 向机器节点同步数据
* @Date 11:04 上午 2022/2/23
**/
func (n *Node)setData(key,val string){
if _,ok :=n.data[key];ok {
log.Println("数据拷贝出现相同key ip:" , n.ip , "key ", key )
}else{
n.data[key] = val
n.count = len(n.data)
}
}
/**
* @Author yuyunqing
* @Description //TODO 从机器节点中获取数据
* @Date 11:23 上午 2022/2/23
**/
func (n *Node)getData(key string) string{
if _,ok :=n.data[key];ok {
return n.data[key]
}else{
return ""
}
}
/**
* @Author yuyunqing
* @Description //TODO 写入数据 根据key的hash 值获取虚拟节点
* @Date 11:20 上午 2022/2/23
**/
func setKey(key,val string) (string,int) {
hash := getMD5String([]byte(key))
i := getInventedNodeByhash(hash).next
i.node.setData(key,val )
return i.name, i.node.count
}
/**
* @Author yuyunqing
* @Description //TODO 获取数据
* @Date 11:20 上午 2022/2/23
**/
func getKey(key string) (string , string) {
hash := getMD5String([]byte(key))
i := getInventedNodeByhash(hash).next
val := i.node.getData(key)
return i.name , val
}
/*
使用MD5对数据进行哈希运算方法1:使用md5.Sum()方法
*/
func getMD5String(b[]byte) (result string) {
//给哈希算法添加数据
res:=md5.Sum(b) //返回值:[Size]byte 数组
result=hex.EncodeToString(res[:]) //对应的参数为:切片,需要将数组转换为切片。
return
}
func main() {
reg("127.0.0.1" ,"节点1")
reg("127.0.0.2" ,"节点2")
reg("127.0.0.3" ,"节点3")
reg("127.0.0.4" ,"节点4")
loop()
name,count := setKey("zhangsan","25")
log.Println("数据写入" ,name ,"机器节点数据量",count)
name,count = setKey("1zhangsan1","26")
log.Println("数据写入" ,name ,"机器节点数据量",count)
name,count = setKey("2zhangsan2","27")
log.Println("数据写入" ,name ,"机器节点数据量",count)
name,count = setKey("3zhangsan3","28")
log.Println("数据写入" ,name ,"机器节点数据量",count)
name,val := getKey("zhangsan")
log.Println("数据读取" ,name ,"数据结果",val)
name,val = getKey("1zhangsan1")
log.Println("数据读取" ,name ,"数据结果",val)
name,val = getKey("2zhangsan2")
log.Println("数据读取" ,name ,"数据结果",val)
name,val = getKey("3zhangsan3")
log.Println("数据读取" ,name ,"数据结果",val)
log.Println(ip_list)
log.Println(ip_has_more)
del("127.0.0.3")
log.Println(ip_list)
log.Println(ip_has_more)
loop()
name,val = getKey("zhangsan")
log.Println("数据读取" ,name ,"数据结果",val)
name,val = getKey("1zhangsan1")
log.Println("数据读取" ,name ,"数据结果",val)
name,val = getKey("2zhangsan2")
log.Println("数据读取" ,name ,"数据结果",val)
name,val = getKey("3zhangsan3")
log.Println("数据读取" ,name ,"数据结果",val)
}
备注
服务器注册,服务器退出,key的存储和查询,仅用于原理展示,有不少bug嘿嘿