ethereum p2p Kademlia的实现之三

ethereum p2p Kademlia的实现之一
ethereum p2p Kademlia的实现之二

1.初始化,seednode的添加

//p2p/discover/table.go
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) 
=>
tab.loadSeedNodes(false)

func (tab *Table) loadSeedNodes(bond bool) {
    seeds := tab.db.querySeeds(seedCount, seedMaxAge)
    seeds = append(seeds, tab.nursery...)
    //bond为false
    if bond {
        seeds = tab.bondall(seeds)
    }
    for i := range seeds {
        seed := seeds[i]
        age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.bondTime(seed.ID)) }}
        log.Debug("Found seed node in database", "id", seed.ID, "addr", seed.addr(), "age", age)
        tab.add(seed)
    }
}

先从数据库中查得符合条件的节点,将bootsnode(nursery)一起添加到k桶中

// 首先获得需要将node放入k桶的哪一行,如果改行还有剩余空间,放入
// 如果没有剩余空间,从这一行的replacements中选出中选出活跃时间最早(最小)的一个节点,替换掉
func (tab *Table) add(new *Node) {
    tab.mutex.Lock()
    defer tab.mutex.Unlock()

    b := tab.bucket(new.sha)
    if !tab.bumpOrAdd(b, new) {
        // Node is not in table. Add it to the replacement list.
        tab.addReplacement(b, new)
    }
}

// 该方法用于确定将node放入k桶的哪一行
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(sha common.Hash) *bucket {
    d := logdist(tab.self.sha, sha)
    if d <= bucketMinDistance {
        return tab.buckets[0]
    }
    return tab.buckets[d-bucketMinDistance-1]
}

可见每一行中replacements的作用

2.K桶的维护(检查,刷新等操作)

调用过程

//p2p/discover/table.go
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) 
=>
func (tab *Table) loop()

下面是对loop()方法的分析

// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
    var (
        revalidate     = time.NewTimer(tab.nextRevalidateTime())
        refresh        = time.NewTicker(refreshInterval)
        copyNodes      = time.NewTicker(copyNodesInterval)
        revalidateDone = make(chan struct{})
        refreshDone    = make(chan struct{})           // where doRefresh reports completion
        waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
    )
    defer refresh.Stop()
    defer revalidate.Stop()
    defer copyNodes.Stop()

    // Start initial refresh.
    go tab.doRefresh(refreshDone)

loop:
    for {
        select {
        case <-refresh.C:
            tab.seedRand()
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
        case req := <-tab.refreshReq:
            waiting = append(waiting, req)
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
        case <-refreshDone:
            for _, ch := range waiting {
                close(ch)
            }
            waiting, refreshDone = nil, nil
        case <-revalidate.C:
            go tab.doRevalidate(revalidateDone)
        case <-revalidateDone:
            revalidate.Reset(tab.nextRevalidateTime())
        case <-copyNodes.C:
            go tab.copyBondedNodes()
        case <-tab.closeReq:
            break loop
        }
    }

    if tab.net != nil {
        tab.net.close()
    }
    if refreshDone != nil {
        <-refreshDone
    }
    for _, ch := range waiting {
        close(ch)
    }
    tab.db.close()
    close(tab.closed)
}

这个函数主要包含三个定时器

  • revalidate = time.NewTimer(tab.nextRevalidateTime())
  • refresh = time.NewTicker(refreshInterval)
  • copyNodes = time.NewTicker(copyNodesInterval)

分别定时执行doRefresh,doRevalidate,copyBondedNodes等三个函数

2.1doRefresh

// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
    defer close(done)
    // Load nodes from the database and insert
    // them. This should yield a few previously seen nodes that are
    // (hopefully) still alive.
    tab.loadSeedNodes(true)
    // Run self lookup to discover new neighbor nodes.
    tab.lookup(tab.self.ID, false)
    // The Kademlia paper specifies that the bucket refresh should
    // perform a lookup in the least recently used bucket. We cannot
    // adhere to this because the findnode target is a 512bit value
    // (not hash-sized) and it is not easily possible to generate a
    // sha3 preimage that falls into a chosen bucket.
    // We perform a few lookups with a random target instead.
    for i := 0; i < 3; i++ {
        var target NodeID
        crand.Read(target[:])
        tab.lookup(target, false)
    }
}

主要调用三个方法,其中tab.loadSeedNodes,在之前已经分析,两外都调用了lookup方法,只是参数不同

// Run self lookup to discover new neighbor nodes.
    tab.lookup(tab.self.ID, false)

var target NodeID
        crand.Read(target[:])
        tab.lookup(target, false)

下面分析lookup方法

  • lookup方法
//loopup方法的目的是找到接近targetID的节点
//参数targetID不一定是一个真实存在的节点id
func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node {
    var (
        target         = crypto.Keccak256Hash(targetID[:])
        asked          = make(map[NodeID]bool)
        seen           = make(map[NodeID]bool)
        reply          = make(chan []*Node, alpha)
        pendingQueries = 0
        result         *nodesByDistance
    )
    // don't query further if we hit ourself.
    // unlikely to happen often in practice.
    asked[tab.self.ID] = true

    for {
        tab.mutex.Lock()
        // generate initial result set
####
从ntab中获得接近target的节点,存入result中,最多bucketSize个
####
        result = tab.closest(target, bucketSize)
        tab.mutex.Unlock()
        if len(result.entries) > 0 || !refreshIfEmpty {
            break
        }
        // The result set is empty, all nodes were dropped, refresh.
        // We actually wait for the refresh to complete here. The very
        // first query will hit this case and run the bootstrapping
        // logic.
        <-tab.refresh()
        refreshIfEmpty = false
    }
####
向results节点(接近target的节点)发出findnode消息
对返回的节点进行bond(ping pong)
####
    for {
        // ask the alpha closest nodes that we haven't asked yet
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID] {
                asked[n.ID] = true
                pendingQueries++
                go func() {
                    // Find potential neighbors to bond with
                    r, err := tab.net.findnode(n.ID, n.addr(), targetID)
                    if err != nil {
                        // Bump the failure counter to detect and evacuate non-bonded entries
                        fails := tab.db.findFails(n.ID) + 1
                        tab.db.updateFindFails(n.ID, fails)
                        log.Trace("Bumping findnode failure counter", "id", n.ID, "failcount", fails)

                        if fails >= maxFindnodeFailures {
                            log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
                            tab.delete(n)
                        }
                    }
                    reply <- tab.bondall(r)
                }()
            }
        }
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        // wait for the next reply
        for _, n := range <-reply {
            if n != nil && !seen[n.ID] {
                seen[n.ID] = true
                result.push(n, bucketSize)
            }
        }
        pendingQueries--
    }
    return result.entries
}


####
从ntab中获得接近target的节点,最多bucketSize个
####
func (tab *Table) closest(target common.Hash, nresults int) *nodesByDistance {
    // This is a very wasteful way to find the closest nodes but
    // obviously correct. I believe that tree-based buckets would make
    // this easier to implement efficiently.
    close := &nodesByDistance{target: target}
    for _, b := range tab.buckets {
        for _, n := range b.entries {
            close.push(n, nresults)
        }
    }
    return close
}


// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
    entries []*Node
    target  common.Hash
}

// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
    ix := sort.Search(len(h.entries), func(i int) bool {
        return distcmp(h.target, h.entries[i].sha, n.sha) > 0
    })
    if len(h.entries) < maxElems {
        h.entries = append(h.entries, n)
    }
    if ix == len(h.entries) {
        // farther away than all nodes we already have.
        // if there was room for it, the node is now the last element.
    } else {
        // slide existing entries down to make room
        // this will overwrite the entry we just appended.
        copy(h.entries[ix+1:], h.entries[ix:])
        h.entries[ix] = n
    }
}

可知lookup的作用如下

  • 在ntab中找到接近target的节点
  • 像这些节点发出findnode消息
  • 更新ntab

然后可知doRefresh作用如下

  • 在ntab中找出接近本地节点的节点
  • 对接近的节点发出findnode命令,对返回的节点完成udp握手后,放入ntab中
  • 把距离范围限定在一个随机范围内
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容

  • 官方文档 初始化 Initialization是为准备使用类,结构体或者枚举实例的一个过程。这个过程涉及了在实例里...
    hrscy阅读 1,134评论 0 1
  • 点击查看原文 Web SDK 开发手册 SDK 概述 网易云信 SDK 为 Web 应用提供一个完善的 IM 系统...
    layjoy阅读 13,745评论 0 15
  • 执法。
    云记者阅读 255评论 0 0
  • 赵子昱坐在公交站台的椅子上,任凭一辆辆公交车到来再开走,身边的路人也换了一拨又一拨。他抬起头看了看星空,心想这星星...
    我是十里阅读 529评论 0 0
  • 创业要成功,另一半真的很重要。
    senhoa阅读 192评论 0 0