1.数据结构
//eth/handler.go
type ProtocolManager struct {
networkId uint64
fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txCh chan core.TxPreEvent
txSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
}
//eth/peer.go
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
closed bool
}
type peer struct {
id string
*p2p.Peer
rw p2p.MsgReadWriter
version int // Protocol version negotiated
forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
head common.Hash
td *big.Int
lock sync.RWMutex
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
}
ProtocolManager主要成员包括:
- peertSet{}类型成员用来缓存相邻个体列表,peer{}表示网络中的一个远端个体。
- 通过各种通道(chan)和事件订阅(subscription)的方式,接收和发送包括交易和区块在内的数据更新。当然在应用中,订阅也往往利用通道来实现事件通知。
- ProtocolManager用到的这些通道的另一端,可能是其他的个体peer,也可能是系统内单例的数据源比如txPool,或者是事件订阅的管理者比如event.Mux。
- Fetcher类型成员累积所有其他个体发送来的有关新数据的宣布消息,并在自身对照后,安排相应的获取请求。
- Downloader类型成员负责所有向相邻个体主动发起的同步流程。
2.启动
在geth启动(见https://www.jianshu.com/p/f04fd9095e74)过程中,在方法eth.New中,有
//eth/backend.go
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
此处就是p2p的初始化
在eth.New(返回*Ethereum)后,ethereum启动时会随之启动p2p
//eth/backend.go
// Start implements node.Service, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start(srvr *p2p.Server) error {
// Start the bloom bits servicing goroutines
s.startBloomHandlers()
// Start the RPC service
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
// Figure out a max peers count based on the server limits
maxPeers := srvr.MaxPeers
if s.config.LightServ > 0 {
if s.config.LightPeers >= srvr.MaxPeers {
return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
}
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)
if s.lesServer != nil {
s.lesServer.Start(srvr)
}
return nil
}
ProtocolManager的start函数代码如下
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
pm.txCh = make(chan core.TxPreEvent, txChanSize)
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
}
以上这四段相对独立的业务流程的逻辑分别是:
-
广播新出现的交易对象
txBroadcastLoop()会在txCh通道的收端持续等待,一旦接收到有关新交易的事件,会立即调用BroadcastTx()函数广播给那些尚无该交易对象的相邻个体。 -
广播新挖掘出的区块
minedBroadcastLoop()持续等待本个体的新挖掘出区块事件,然后立即广播给需要的相邻个体。当不再订阅新挖掘区块事件时,这个函数才会结束等待并返回。很有意思的是,在收到新挖掘出区块事件后,minedBroadcastLoop()会连续调用两次BroadcastBlock(),两次调用仅仅一个bool型参数@propagate不一样,当该参数为true时,会将整个新区块依次发给相邻区块中的一小部分;而当其为false时,仅仅将新区块的Hash值和Number发送给所有相邻列表。 -
定时与相邻个体进行区块全链的强制同步
syncer()首先启动fetcher成员,然后进入一个无限循环,每次循环中都会向相邻peer列表中“最优”的那个peer作一次区块全链同步。发起上述同步的理由分两种:如果有新登记(加入)的相邻个体,则在整个peer列表数目大于5时,发起之;如果没有新peer到达,则以10s为间隔定时的发起之。这里所谓"最优"指的是peer中所维护区块链的TotalDifficulty(td)最高,由于Td是全链中从创世块到最新头块的Difficulty值总和,所以Td值最高就意味着它的区块链是最新的,跟这样的peer作区块全链同步,显然改动量是最小的,此即"最优"。 -
将新出现的交易对象均匀的同步给相邻个体
txsyncLoop()主体也是一个无限循环,它的逻辑稍微复杂一些:首先有一个数据类型txsync{p, txs},包含peer和tx列表;通道txsyncCh用来接收txsync{}对象;txsyncLoop()每次循环时,如果从通道txsyncCh中收到新数据,则将它存入一个本地map[]结构,k为peer.ID,v为txsync{},并将这组tx对象发送给这个peer;每次向peer发送tx对象的上限数目100*1024,如果txsync{}对象中有剩余tx,则该txsync{}对象继续存入map[]并更新tx数目;如果本次循环没有新到达txsync{},则从map[]结构中随机找出一个txsync对象,将其中的tx组发送给相应的peer,重复以上循环。
以上四段流程就是ProtocolManager向相邻peer主动发起的通信过程。尽管上述各函数细节从文字阅读起来容易模糊,不过最重要的内容还是值得留意下的:本个体(peer)向其他peer主动发起的通信中,按照数据类型可分两类:交易tx和区块block;而按照通信方式划分,亦可分为广播新的单个数据和同步一组同类型数据,这样简单的两两配对,便可组成上述四段流程。