死磕以太坊源码分析之区块和交易广播

死磕以太坊源码分析之区块和交易广播

ProtocolManager详解

ProtocolManager,从字面上看是协议管理器,负责着p2p通信协议的管理。它连接了p2p的逻辑层peer与顶层peer之间的调用,从顶层将协议传递至逻辑层,再从逻辑层得到message传递到顶层。

image-20201202142450663
  1. fastSync规定了同步的模式 ;
  2. acceptTxs是节点是否接受交易的阀门,只有当pm.acceptTxs == 1时,节点才会接受交易。这个操作只会在同步结束后再开始,即同步的时候节点是不会接受交易的;
  3. SubProtocols中是以太坊的通讯协议,通常只有一个值,即eth63
  4. downloader是一个下载器,用于主动从远程节点中获取hashesblocks
  5. fetcher则被动的收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。

ProtocolManager.Start()启动了四条go程,分别是交易订阅广播协程(txBroadcastLoop)、挖矿订阅协程(minedBroadcastLoop)、节点定期同步协程(syncer)和交易同步协程(txsyncLoop)

image-20201202143101376
  1. ==txBroadcastLoop==:广播新出现的交易对象。txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。
  2. ==minedBroadcastLoop==:广播新挖掘出的区块。minedBroadcastLoop()持续等待本节点的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。
  3. ==syncer==:定时的和网络其他节点同步,并处理网络节点的相关通知。定时与相邻个体进行区块全链的强制同步。syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。
  4. ==txsyncLoop==:把新的交易均匀的同步给网路节点

广播的情形

  1. minedBroadcastLoop()监听到新区块事件后,把新区块和区块hash分别广播出去;
  2. 从远程节点同步完成后,将CurrentBlock广播出去,此时广播的是区块hash
  3. txBlockcastLoop()监听到区块池的新增交易事件时会广播交易;

广播区块及区块哈希

广播区块的入口在pm.minedBroadcastLoop(),进入到BroadcastBlock,这里的参数为bool值,如果传入的为true,则将区块block和总难度td发送给一部分节点,节点数为根号n;如果传入的为false,则将区块的hash发送给所有的节点。需要注意的是两个广播函数都执行

进入到true分支:代表只传播区块给一部分节点

①:首先计算一个临时的TD

if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
            td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
        } 

②:发送块到peers的子集

对节点数进行开方,16开方得4,然后取前4个节点。

transferLen := int(math.Sqrt(float64(len(peers))))
        if transferLen < minBroadcastPeers {
            transferLen = minBroadcastPeers
        }
        if transferLen > len(peers) {
            transferLen = len(peers)
        }
        transfer := peers[:transferLen]
        for _, peer := range transfer {
            peer.AsyncSendNewBlock(block, td) // 块传播
        }

执行完之后直接return出去,再次执行此函数,此时不会走ture分支,直接判断判断本地是否有区块,如果有则发送区区块哈希给剩下的节点,如果没有,则不做发送哈希的操作。

如果本地存在这个要广播的区块(很可能就是出块节点,或者接受块的节点已经插入到区块链中),那就还要像其他没有被广播到区块的节点发送区块哈希。

image-20201125150810051

如果本地不存在这个要广播的区块哈希(应该是还没接收到区块或者区块哈希的节点),那它只要向它的节点列表里发送区块即可。

image-20201125151546035

接下来就是重点分析AsyncSendNewBlockAsyncSendNewBlockHash两个函数了。

AsyncSendNewBlock

发送块到需要广播的节点的广播队列中

select {
    case p.queuedProps <- &propEvent{block: block, td: td}:
        p.knownBlocks.Add(block.Hash())
        for p.knownBlocks.Cardinality() >= maxKnownBlocks {
            p.knownBlocks.Pop()
        }

这里的queuedProps是用来存放要广播的块的队列,同时,要把广播的块标记为已知,还不能超过1024(maxKnownBlocks)个。超过就会弹出队列第一个propEvent() 。接下来就是处理队列中的块了。

eth/peer.go中,有个专门处理广播的循环brodcast

func (p *peer) broadcast() {
    for {
        select {
        case txs := <-p.queuedTxs:
            if err := p.SendTransactions(txs); err != nil {
                return
            }
            p.Log().Trace("Broadcast transactions", "count", len(txs))

        case prop := <-p.queuedProps:
            if err := p.SendNewBlock(prop.block, prop.td); err != nil {
                return
            }
            p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)

        case block := <-p.queuedAnns:
            if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
                return
            }
            p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())

        case <-p.term:
            return
        }
    }
}

广播新块到远程节点

p.SendNewBlock(prop.block, prop.td);

远程节点收到块后同样也会标记哈希存入队列,并且不会超过最大,同时发送一个NewBlockMsgmsgcode0x07,同时数据会被RLP编码。

p.knownBlocks.Add(block.Hash())
    for p.knownBlocks.Cardinality() >= maxKnownBlocks {
        p.knownBlocks.Pop()
    }
    return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
func Send(w MsgWriter, msgcode uint64, data interface{}) error {
    size, r, err := rlp.EncodeToReader(data)
    if err != nil {
        return err
    }
    return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
}

到此广播区块的过程结束,交由远程节点去处理NewBlockMsg消息。


AsyncSendNewBlockHash

广播哈希的过程跟广播区块的过程非常的类似,最终是由远程节点去处理NewBlockHashesMsg消息。

广播区块的过程完毕之后,会直接进入下一个阶段,调用fetcher模块去同步这些广播的区块,接下的文章会讲到。


广播交易

广播交易的入口在pm.txBroadcastLoop(),直接进入到pm.BroadcastTxs(event.Txs),大概做了以下几件事:

①:将交易广播给一批没有这个交易的节点

for _, tx := range txs {
        peers := pm.peers.PeersWithoutTx(tx.Hash())
        for _, peer := range peers {
            txset[peer] = append(txset[peer], tx)
        }
        log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
    }

②:异步发送交易给这些节点

for peer, txs := range txset {
        peer.AsyncSendTransactions(txs)
    }

接着进入到AsyncSendTransactions:

将所有交易标记为已知交易,同时还要保证没有超过最大的已知交易(32768笔)

case p.queuedTxs <- txs:
        for _, tx := range txs {
            p.knownTxs.Add(tx.Hash())
        }
        for p.knownTxs.Cardinality() >= maxKnownTxs {
            p.knownTxs.Pop()
        }
    case txs := <-p.queuedTxs:
            if err := p.SendTransactions(txs); err != nil {
                return
            }
func (p *peer) SendTransactions(txs types.Transactions) error {
...
  return p2p.Send(p.rw, TxMsg, txs)
}

发送交易最终会发送一个TxMsg消息,接收到这个消息的节点会通过pm.txpool.AddRemotes(txs)处理交易。


消息处理(handleMsg)

handleMsg从对方连接中读取消息,根据消息码的不同进行处理,从而将广播和同步之间来回的消息进行处理。

func (pm *ProtocolManager) handleMsg(p *peer) error {
    msg, err := p.rw.ReadMsg()
    if err != nil {
        return err
    }
    if msg.Size > ProtocolMaxMsgSize {
        return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
    }
    defer msg.Discard()

    switch {
    case msg.Code == StatusMsg: ......
    case msg.Code == GetBlockHeadersMsg: ......
    case msg.Code == BlockHeadersMsg: ......
    case msg.Code == GetBlockBodiesMsg: ......
    case msg.Code == BlockBodiesMsg: ......
    case p.version >= eth63 && msg.Code == GetNodeDataMsg: ......
    case p.version >= eth63 && msg.Code == NodeDataMsg: ......
    case p.version >= eth63 && msg.Code == GetReceiptsMsg: ......
    case p.version >= eth63 && msg.Code == ReceiptsMsg: ......
    case msg.Code == NewBlockHashesMsg: ......
    case msg.Code == NewBlockMsg: ......
    case msg.Code == TxMsg: ......
    default: return errResp(ErrInvalidMsgCode, "%v", msg.Code)
    }
    return nil
}

参考

https://mindcarver.cn 最新发布

https://github.com/blockchainGuide/ 资料更新

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容