1、定义路由表:
const (
//当路由到某个子树时,会从该子树随机抽取alpha个节点发送请求。
alpha = 3 // Kademlia concurrency factor
//每层保存节点数
bucketSize = 16 // Kademlia bucket size
//256长度的节点
hashBits = len(common.Hash{}) * 8
//多少层
nBuckets = hashBits + 1 // Number of buckets
//查找节点最多失败的次数
maxFindnodeFailures = 5
)
//内存路由表
type Table struct {
count int // number of nodes 节点的总个数
buckets [nBuckets]*bucket // index of known nodes by distance K桶
nodeAddedHook func(*Node) // for testing
self *Node // metadata of the local node //当前节点
}
2、KAD的路由表协议事件:
// RPC packet types
const (
/* PING事件 */
pingPacket = iota + 1 // zero is 'reserved' //初始化为1 iota 是初始化零标识符
/* PONG事件 */
pongPacket
/* FIND NODE事件 */
findnodePacket
/* NEIGHBORS事件 */
neighborsPacket
)
3、处理Ping消息
//如果接受到的消息类型是ping,则调用该方法处理该消息
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
//校验该消息是否过期
if expired(req.Expiration) {
return errExpired
}
//发送一个pong包
t.send(from, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
// 阻塞 --- 等待对方回复
// 这里是根据回复来判断是否需要加入到本地路由表,
// 返回是否接收到
if !t.handleReply(fromID, pingPacket, req) {
// Note: we're ignoring the provided IP address right now
// 绑定,将该节点加入到路由表中,其中参数true表示已经接收到对方的ping消息
go t.bond(true, fromID, from, req.From.TCP)
}
return nil
}
4、FINDNODE事件
func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
......
/* 从路由表中查找最短距离几点数据 */
closest := t.closest(target, bucketSize).entries
p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
var sent bool
/* 这里省略了部分代码,因为UPD有长度限制,这里采用了CHUNK方式发送 */
/* 这里只展示发送流程,故省略chunk方式逻辑 */
for _, n := range closest {
......
t.send(from, neighborsPacket, &p)
......
}
return nil
}
5、刷新机制
/* 三个数据源进行刷新 */
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
//启动一个单线程去从数据库加载节点-- 并且要绑定节点
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
tab.loadSeedNodes(true)
// 以自己为目标节点,去网络中找到离自己最近的节点列表
// false,如果找不到最近的节点,是否重新触发刷新,重数据库中从新加载
// Run self lookup to discover new neighbor nodes.
tab.lookup(tab.self.ID, false)
/* 随机一个节点,然后查找其对应附近节点 */
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}
6、XOR计算
/** 距离比较 **/
func distcmp(target, a, b common.Hash) int {
for i := range target {
da := a[i] ^ target[i]
db := b[i] ^ target[i]
if da > db {
return 1
} else if da < db {
return -1
}
}
return 0
}
7、路由表添加机制
// 添加一个节点到路由表中
func (tab *Table) add(n *Node) (contested *Node) {
//fmt.Println("add", n.addr().String(), n.ID.String(), n.sha.Hex())
if n.ID == tab.self.ID {
return
}
/* 找到第n层 */
b := tab.buckets[logdist(tab.self.sha, n.sha)]
switch {
case b.bump(n):
/* 已经存在 */
return nil
case len(b.entries) < bucketSize:
/* 不存在 但K桶中有空间存储 */
b.addFront(n)
tab.count++
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
return nil
default:
/* 这里以太坊实现中,增加了replacements列表,当K桶中已经满了后,先将节点放在replacements列表中 */
// b has no space left, add to replacement cache
// and revalidate the last entry.
b.replacements = append(b.replacements, n)
if len(b.replacements) > bucketSize {
//剔除掉最先进来的数据,整体向前移动一位
copy(b.replacements, b.replacements[1:])
//将末尾节点置空
b.replacements = b.replacements[:len(b.replacements)-1]
}
//返回最后一个节点
return b.entries[len(b.entries)-1]
}
}