ethereum p2p Kademlia的实现之一
ethereum p2p Kademlia的实现之二
1.初始化,seednode的添加
//p2p/discover/table.go
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error)
=>
tab.loadSeedNodes(false)
func (tab *Table) loadSeedNodes(bond bool) {
seeds := tab.db.querySeeds(seedCount, seedMaxAge)
seeds = append(seeds, tab.nursery...)
//bond为false
if bond {
seeds = tab.bondall(seeds)
}
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
tab.add(seed)
}
}
先从数据库中查得符合条件的节点,将bootsnode(nursery)一起添加到k桶中
// 首先获得需要将node放入k桶的哪一行,如果改行还有剩余空间,放入
// 如果没有剩余空间,从这一行的replacements中选出中选出活跃时间最早(最小)的一个节点,替换掉
func (tab *Table) add(new *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(new.sha)
if !tab.bumpOrAdd(b, new) {
// Node is not in table. Add it to the replacement list.
tab.addReplacement(b, new)
}
}
// 该方法用于确定将node放入k桶的哪一行
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(sha common.Hash) *bucket {
d := logdist(tab.self.sha, sha)
if d <= bucketMinDistance {
return tab.buckets[0]
}
return tab.buckets[d-bucketMinDistance-1]
}
可见每一行中replacements的作用
2.K桶的维护(检查,刷新等操作)
调用过程
//p2p/discover/table.go
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error)
=>
func (tab *Table) loop()
下面是对loop()方法的分析
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C:
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
case <-copyNodes.C:
go tab.copyBondedNodes()
case <-tab.closeReq:
break loop
}
}
if tab.net != nil {
tab.net.close()
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
tab.db.close()
close(tab.closed)
}
这个函数主要包含三个定时器
- revalidate = time.NewTimer(tab.nextRevalidateTime())
- refresh = time.NewTicker(refreshInterval)
- copyNodes = time.NewTicker(copyNodesInterval)
分别定时执行doRefresh,doRevalidate,copyBondedNodes等三个函数
2.1doRefresh
// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
tab.loadSeedNodes(true)
// Run self lookup to discover new neighbor nodes.
tab.lookup(tab.self.ID, false)
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}
主要调用三个方法,其中tab.loadSeedNodes,在之前已经分析,两外都调用了lookup方法,只是参数不同
// Run self lookup to discover new neighbor nodes.
tab.lookup(tab.self.ID, false)
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
下面分析lookup方法
- lookup方法
//loopup方法的目的是找到接近targetID的节点
//参数targetID不一定是一个真实存在的节点id
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true
for {
tab.mutex.Lock()
// generate initial result set
####
从ntab中获得接近target的节点,存入result中,最多bucketSize个
####
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
}
####
向results节点(接近target的节点)发出findnode消息
对返回的节点进行bond(ping pong)
####
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
go func() {
// Find potential neighbors to bond with
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil {
// Bump the failure counter to detect and evacuate non-bonded entries
fails := tab.db.findFails(n.ID) + 1
tab.db.updateFindFails(n.ID, fails)
log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
}
reply <- tab.bondall(r)
}()
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
return result.entries
}
####
从ntab中获得接近target的节点,最多bucketSize个
####
func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
// This is a very wasteful way to find the closest nodes but
// obviously correct. I believe that tree-based buckets would make
// this easier to implement efficiently.
close := &nodesByDistance{target: target}
for _, b := range tab.buckets {
for _, n := range b.entries {
close.push(n, nresults)
}
}
return close
}
// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
entries []*Node
target common.Hash
}
// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}
可知lookup的作用如下
- 在ntab中找到接近target的节点
- 像这些节点发出findnode消息
- 更新ntab
然后可知doRefresh作用如下
- 在ntab中找出接近本地节点的节点
- 对接近的节点发出findnode命令,对返回的节点完成udp握手后,放入ntab中
- 把距离范围限定在一个随机范围内