以太坊源码阅读-网络处理-P2P

Kademlia协议
https://zhuanlan.zhihu.com/p/38425656 给出了专业的解读。

discover/discv5节点发现
database.go实现了节点的持久化,使用了单独的leveldb实例。
// newNodeDB creates a new node database for storing and retrieving infos about
// known peers in the network. If no path is given, an in-memory, temporary
// database is constructed.
func newNodeDB(path string, version int, self NodeID) (*nodeDB, error) {
if path == "" {
return newMemoryNodeDB(self)
}
return newPersistentNodeDB(path, version, self)
}

// newMemoryNodeDB creates a new in-memory node database without a persistent
// backend.
func newMemoryNodeDB(self NodeID) (*nodeDB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return &nodeDB{
lvl: db,
self: self,
quit: make(chan struct{}),
}, nil
}

// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentNodeDB(path string, version int, self NodeID) (nodeDB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(
errors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
// The nodes contained in the cache correspond to a certain protocol version.
// Flush all nodes if the version doesn't match.
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(version))]

blob, err := db.Get(nodeDBVersionKey, nil)
switch err {
case leveldb.ErrNotFound:
    // Version not found (i.e. empty cache), insert it
    if err := db.Put(nodeDBVersionKey, currentVer, nil); err != nil {
        db.Close()
        return nil, err
    }
case nil:
    // Version present, flush if different
    if !bytes.Equal(blob, currentVer) {
        db.Close()
        if err = os.RemoveAll(path); err != nil {
            return nil, err
        }
        return newPersistentNodeDB(path, version, self)
    }
}
return &nodeDB{
    lvl:  db,
    self: self,
    quit: make(chan struct{}),
}, nil

}

node的存储、查询和删除
// node retrieves a node with a given id from the database.
func (db *nodeDB) node(id NodeID) *Node {
blob, err := db.lvl.Get(makeKey(id, nodeDBDiscoverRoot), nil)
if err != nil {
return nil
}
node := new(Node)
if err := rlp.DecodeBytes(blob, node); err != nil {
log.Error("Failed to decode node RLP", "err", err)
return nil
}
node.sha = crypto.Keccak256Hash(node.ID[:])
return node
}

// updateNode inserts - potentially overwriting - a node into the peer database.
func (db *nodeDB) updateNode(node *Node) error {
blob, err := rlp.EncodeToBytes(node)
if err != nil {
return err
}
return db.lvl.Put(makeKey(node.ID, nodeDBDiscoverRoot), blob, nil)
}

// deleteNode deletes all information/keys associated with a node.
func (db *nodeDB) deleteNode(id NodeID) error {
deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
for deleter.Next() {
if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
return err
}
}
return nil
}

node的数据结构
type Node struct {
IP net.IP // len 4 for IPv4 or 16 for IPv6
UDP, TCP uint16 // port numbers
ID NodeID // the node's public key
// This is a cached copy of sha3(ID) which is used for node
// distance calculations. This is part of Node in order to make it
// possible to write tests that need a node at a certain distance.
// In those tests, the content of sha will not actually correspond
// with ID.
sha common.Hash
// whether this node is currently being pinged in order to replace
// it in a bucket
contested bool
}

// ensureExpirer is a small helper method ensuring that the data expiration
// mechanism is running. If the expiration goroutine is already running, this
// method simply returns.
//
// The goal is to start the data evacuation only after the network successfully
// bootstrapped itself (to prevent dumping potentially useful seed nodes). Since
// it would require significant overhead to exactly trace the first successful
// convergence, it's simpler to "ensure" the correct state when an appropriate
// condition occurs (i.e. a successful bonding), and discard further events.
func (db *nodeDB) ensureExpirer() {
db.runner.Do(func() { go db.expirer() })
}

// expirer should be started in a go routine, and is responsible for looping ad
// infinitum and dropping stale data from the database.
func (db *nodeDB) expirer() {
tick := time.NewTicker(nodeDBCleanupCycle)
defer tick.Stop()
for {
select {
case <-tick.C:
if err := db.expireNodes(); err != nil {
log.Error("Failed to expire nodedb items", "err", err)
}
case <-db.quit:
return
}
}
}

// expireNodes iterates over the database and deletes all nodes that have not
// been seen (i.e. received a pong from) for some allotted time.
//这个方法遍历所有的节点,如果某个节点最后接收消息超过指定值,那么就删除这个节点。
func (db *nodeDB) expireNodes() error {
threshold := time.Now().Add(-nodeDBNodeExpiration)
// Find discovered nodes that are older than the allowance
it := db.lvl.NewIterator(nil, nil)
defer it.Release()
for it.Next() {
// Skip the item if not a discovery node
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
// Skip the node if not expired yet (and not self)
if !bytes.Equal(id[:], db.self[:]) {
if seen := db.lastPong(id); seen.After(threshold) {
continue
}
}
// Otherwise delete all associated information
db.deleteNode(id)
}
return nil
}

some state-update method
// lastPingReceived retrieves the time of the last ping packet sent by the remote node.
func (db *nodeDB) lastPingReceived(id NodeID) time.Time {
return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPing)), 0)
}

// updateLastPing updates the last time remote node pinged us.
func (db *nodeDB) updateLastPingReceived(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPing), instance.Unix())
}

// lastPongReceived retrieves the time of the last successful pong from remote node.
func (db *nodeDB) lastPongReceived(id NodeID) time.Time {
return time.Unix(db.fetchInt64(makeKey(id, nodeDBDiscoverPong)), 0)
}

// hasBond reports whether the given node is considered bonded.
func (db *nodeDB) hasBond(id NodeID) bool {
return time.Since(db.lastPongReceived(id)) < nodeDBNodeExpiration
}

// updateLastPongReceived updates the last pong time of a node.
func (db *nodeDB) updateLastPongReceived(id NodeID, instance time.Time) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverPong), instance.Unix())
}

// findFails retrieves the number of findnode failures since bonding.
func (db *nodeDB) findFails(id NodeID) int {
return int(db.fetchInt64(makeKey(id, nodeDBDiscoverFindFails)))
}

// updateFindFails updates the number of findnode failures since bonding.
func (db *nodeDB) updateFindFails(id NodeID, fails int) error {
return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails))
}

// querySeeds retrieves random nodes to be used as potential seed nodes
// for bootstrapping.
func (db nodeDB) querySeeds(n int, maxAge time.Duration) []Node {
var (
now = time.Now()
nodes = make([]*Node, 0, n)
it = db.lvl.NewIterator(nil, nil)
id NodeID
)
defer it.Release()

seek:
for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ {
// Seek to a random entry. The first byte is incremented by a
// random amount each time in order to increase the likelihood
// of hitting all existing nodes in very small databases.
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
it.Seek(makeKey(id, nodeDBDiscoverRoot))

    n := nextNode(it)
    if n == nil {
        id[0] = 0
        continue seek // iterator exhausted
    }
    if n.ID == db.self {
        continue seek
    }
    if now.Sub(db.lastPongReceived(n.ID)) > maxAge {
        continue seek
    }
    for i := range nodes {
        if nodes[i].ID == n.ID {
            continue seek // duplicate
        }
    }
    nodes = append(nodes, n)
}
return nodes

}

// reads the next node record from the iterator, skipping over other
// database entries.
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
id, field := splitKey(it.Key())
if field != nodeDBDiscoverRoot {
continue
}
var n Node
if err := rlp.DecodeBytes(it.Value(), &n); err != nil {
log.Warn("Failed to decode node RLP", "id", id, "err", err)
continue
}
return &n
}
return nil
}

table.go kademlia协议实现
基本数据结构
const (
alpha = 3 // Kademlia concurrency factor
bucketSize = 16 // Kademlia bucket size
hashBits = len(common.Hash{}) * 8
nBuckets = hashBits + 1 // Number of buckets

maxBondingPingPongs = 16
maxFindnodeFailures = 5

autoRefreshInterval = 1 * time.Hour
seedCount           = 30
seedMaxAge          = 5 * 24 * time.Hour

)

type Table struct {
mutex sync.Mutex // protects buckets, their content, and nursery
buckets [nBuckets]bucket // index of known nodes by distance
nursery []
Node // bootstrap nodes
db *nodeDB // database of known nodes

refreshReq chan chan struct{}
closeReq   chan struct{}
closed     chan struct{}

bondmu    sync.Mutex
bonding   map[NodeID]*bondproc
bondslots chan struct{} // limits total number of active bonding processes

nodeAddedHook func(*Node) // for testing

net  transport
self *Node // metadata of the local node

}

func newTable(t transport, ourID NodeID, ourAddr net.UDPAddr, nodeDBPath string, bootnodes []Node) (*Table, error) {
// If no node database was given, use an in-memory one
db, err := newNodeDB(nodeDBPath, nodeDBVersion, ourID)
if err != nil {
return nil, err
}
tab := &Table{
net: t,
db: db,
self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)),
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
tab.seedRand()
tab.loadSeedNodes()
// Start the background expiration goroutine after loading seeds so that the search for
// seed nodes also considers older nodes that would otherwise be removed by the
// expiration.
tab.db.ensureExpirer()
go tab.loop()
return tab, nil
}

go tab.loop(),这个函数主要完成以下的工作:
1.每半小时进行一次刷新工作(autoRefreshInterval)
2.如果接收到refreshReq/revalidate/copyNode等请求。那么进行刷新/重新生效/复制等工作。
3.如果接收到关闭消息。那么进行关闭。
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
revalidateDone = make(chan struct{})
refreshDone = make(chan struct{}) // where doRefresh reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
case <-revalidate.C:
go tab.doRevalidate(revalidateDone)
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
case <-copyNodes.C:
go tab.copyLiveNodes()
case <-tab.closeReq:
break loop
}
}
if tab.net != nil {
tab.net.close()
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
tab.db.close()
close(tab.closed)
}

// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
tab.loadSeedNodes()
// Run self lookup to discover new neighbor nodes.
tab.lookup(tab.self.ID, false)
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
//我们不可以坚持这个,因为findnode目标是512位值(不是哈希大小)并且
//不容易生成sha3-preimage落入选定的存储桶中,所以这里lookup 3个随机的
//目标
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}

lookup函数从table中最近的16个节点开始查找并结果排序,取钱16个节点。
func (tab Table) lookup(targetID NodeID, refreshIfEmpty bool) []Node {
var (
target = crypto.Keccak256Hash(targetID[:])
asked = make(map[NodeID]bool)
seen = make(map[NodeID]bool)
reply = make(chan []*Node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self.ID] = true
for {
tab.mutex.Lock()
// generate initial result set
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
}
for {
// ask the alpha closest nodes that we haven't asked yet
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingQueries++
go tab.findnode(n, targetID, reply)
}
}
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
// wait for the next reply
for _, n := range <-reply {
if n != nil && !seen[n.ID] {
seen[n.ID] = true
result.push(n, bucketSize)
}
}
pendingQueries--
}
return result.entries
}

findnode基于一个查询失败记录来查询节点,如果找到,将nodes添加到table中,同时向reply channel发送nodes。
func (tab *Table) findnode(n Node, targetID NodeID, reply chan<- []Node) {
fails := tab.db.findFails(n.ID)
r, err := tab.net.findnode(n.ID, n.addr(), targetID)
if err != nil || len(r) == 0 {
fails++
tab.db.updateFindFails(n.ID, fails)
log.Trace("Findnode failed", "id", n.ID, "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID, "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.updateFindFails(n.ID, fails-1)
}

// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
for _, n := range r {
    tab.add(n)
}
reply <- r

}

add查询节点是否在table中,如果没有的话,分成两种情况,第一是bucket有空间,则直接添加;第二是bucket没有空间,则先添加进replacement list,替换掉bucket的活跃节点中最久没有受到ping包的节点。
func (tab *Table) add(n *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()

b := tab.bucket(n.sha)
if !tab.bumpOrAdd(b, n) {
    // Node is not in table. Add it to the replacement list.
    tab.addReplacement(b, n)
}

}

result.push方法,这个方法会根据 所有的节点对于target的距离进行排序。 按照从近到远的方式决定新节点的插入顺序。(队列中最大会包含16个元素)。 这样会导致队列里面的元素和target的距离越来越近。距离相对远的会被踢出队列。
// nodesByDistance is a list of nodes, ordered by
// distance to target.
type nodesByDistance struct {
entries []*Node
target common.Hash
}

// push adds the given node to the list, keeping the total size below maxElems.
func (h *nodesByDistance) push(n *Node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return distcmp(h.target, h.entries[i].sha, n.sha) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
// slide existing entries down to make room
// this will overwrite the entry we just appended.
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容