前面的系列文章中我们介绍了Bitcoin网络中节点对区块的存取机制,本文开始我们将介绍Btcd节点如何组成P2P网络以及区块如何在P2P网络中传播。区块在网络上的传递过程涉及节点之间的连接管理、地址管理、Peer节点的管理和Peer之间同步区块的协议等问题,与之相关的代码在如下几个包或者文件中:
- btcd/wire: 实现了Bitcoin网络协议,定义了Peers之间的协议消息、消息格式及包的封装和解析等;
- btcd/peer: 实现了P2P网络中Peer之间维持连接及收发wire协议消息的机制;
- btcd/connmgr: 管理Peer节点之间的TCP连接,包括监听本地端口等待其他节点连接和主动连接Peer节点等等;
- btcd/addrmgr: 收集、存储Bitcoin网络上的节点的地址,随机从地址集中选择路由可达的地址建立Peer连接,这些地址包括IPv4、IPv6及洋葱地址(.onion address)等;
- btcd/server.go: Btcd节点启动后执行的主要入口,定义了server及serverPeer类型,负责初始化和启动connmgr、addrmgr,以及响应Peer的协议消息等。
这些模块之间的关系如下图所示:
接下来,我们将逐一阅读和分析各个包中的代码。我们首先来了解P2P网络是如何组网,然后再进一步了解Bitcoin网络协议的实现。btcd/peer是实现Bitcoin P2P网络的核心模块,我们先从它开始介绍。
Clone完btcd/peer的代码后,我们可以发现它包含如下一些文件:
- peer.go: 包含了几乎全部的Peer相关逻辑实现代码;
- mruinvmap.go: 实现了一个简易的缓存Inverntory的LRU Cache。请注意,虽然文件名由mru开头,但它实际上实现的是LRU Cache。Inventory是节点向Peer节点回复或者通知的己方已经知道的Tx或者Block,它通过inv协议消息以Inventory Vectors的形式发往Peer节点,我们将在介绍网络协议时进一步介绍它。之所以要使用缓存,是为了防止向Peer重复发送已经发送过的Inventory;
- mrunoncemap.go: 与mruinvmap.go类似,它实现了一个缓存nonce值的LRU Cache。这里的nonce值是一个64位随机整数值,用于填充Peer间握手交换Version信息的nonce字段,nonce字段用来判断欲连接的Peer是否就是自己:节点发出Version消息时,会填入一个随机的nonce值,连接不同Peer节点时,节点会缓存所有为Version消息生成的nonce值;如果节点收到了一个Version消息且其中的nonce值是自己缓存的nonce中的一个,那么可以判断这个Version消息由自己发送给自己了;
- log.go: 提供一些log方法;
- doc.go: 包btcd/peer的doc文件;
- example_test.go、export_test.go、mruinvmap_test.go、mrunoncemap_test.go、peer_test.go: 定义一些测试方法;
其中主要的类型为Peer、Config和MessageListeners,Peer类型定义了Peer相关的属性和方法,Config类型定义了与Peer相关的配置,MessageListeners定义了响应Peer消息的回调函数。它们定义的成员字段比较多,我们不打算一一介绍,将在分析具体代码时解释其字段的意义。我们先从创建Peer对象的newPeerBase()方法入手来分析Peer:
//btcd/peer/peer.go
// newPeerBase returns a new base bitcoin peer based on the inbound flag. This
// is used by the NewInboundPeer and NewOutboundPeer functions to perform base
// setup needed by both types of peers.
func newPeerBase(origCfg *Config, inbound bool) *Peer {
// Default to the max supported protocol version if not specified by the
// caller.
cfg := *origCfg // Copy to avoid mutating caller.
......
p := Peer{
inbound: inbound,
knownInventory: newMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
}
return &p
}
从上面创建Peer的代码中可以看出,Peer中的关键字段包括:
- inbound: 用于指示Peer是inbound还是outbound。如果当前节点主动连接Peer,则Peer为OutbandPeer;如果Peer主动连接当前节点,则Peer为InboundPeer;
- knownInventory: 已经发送给Peer的Inventory的缓存;
- stallControl: 带缓冲的stallControlMsg chan,在收、发消息的goroutine和超时控制goroutine之间通信,后面会进一步介绍;
- outputQueue: 带缓冲的outMsg chan,实现了一个发送队列;
- sendQueue: 缓冲大小为1的outMsg chan,用于将outputQueue中的outMsg按加入发送队列的顺序发送给Peer;
- sendDoneQueue: 带缓冲的channel,用于通知维护发送队列的goroutine上一个消息已经发送完成,应该取下一条消息发送;
- outputInvChan: 实现发送inv消息的发送队列,该队列以10s为周期向Peer发送inv消息;
- inQuit: 用于通知收消息的goroutine已经退出;
- outQuit: 用于通知发消息的goroutine已经退出,当收、发消息的goroutine均退出时,超时控制goroutine也将退出;
- quit:用于通知所有处理事务的goroutine退出;
- cfg: 与Peer相关的Config,其中比较重要是的Config中的MessageListeners,指明了处理从Peer收到的消息的响应函数;
- services: 用于记录Peer支持的服务,如: SFNodeNetwork表明Peer是一个全节点,SFNodeGetUTXO表明Peer支持getutxos和utxos命令,SFNodeBloom表明Peer支持Bloom过滤;
- protocolVersion:用于记录Peer所用的协议版本;
上述有些字段与Peer实现的消息收发机制有关系,读者可能会有些疑惑,我们将在下面详细介绍。同时,这里涉及到了Go中的channel和goroutine,它们是golang简化并发编程的关键工具,刚开始接触Go的读者可能觉得不好理解,也会对理解Peer的代码造成一些障碍。由于篇幅原因,本文不深入介绍Go的并发编程,读者可以翻阅相关书籍。为了方便大家理解代码,我们引用golang官方对它的并发机制的总结作一个启示:
Do not communicate by sharing memory; instead, share memory by communicating.
channel就是用来在goroutine之间进行通信的数据类型。接下来,我们来看Peer的start()方法以进一步了解它的实现:
// start begins processing input and output messages.
func (p *Peer) start() error {
log.Tracef("Starting peer %s", p)
negotiateErr := make(chan error)
go func() {
if p.inbound {
negotiateErr <- p.negotiateInboundProtocol()
} else {
negotiateErr <- p.negotiateOutboundProtocol()
}
}()
// Negotiate the protocol within the specified negotiateTimeout.
select {
case err := <-negotiateErr:
if err != nil {
return err
}
case <-time.After(negotiateTimeout):
return errors.New("protocol negotiation timeout")
}
log.Debugf("Connected to %s", p.Addr())
// The protocol has been negotiated successfully so start processing input
// and output messages.
go p.stallHandler()
go p.inHandler()
go p.queueHandler()
go p.outHandler()
go p.pingHandler()
// Send our verack message now that the IO processing machinery has started.
p.QueueMessage(wire.NewMsgVerAck(), nil)
return nil
}
上述代码主要包含:
- 起了一个goroutine来与Peer交换Version消息,调用goroutine与新的goroutine通过negotiateErr channel同步,调用goroutine阻塞等待Version握手完成;
- 如果Version握手失败或者超时,则返回错误,Peer关系建立失败;
- 如果握手成功,则启动5个新的goroutine来收发消息。其中,stallHandler()用于处理消息超时,inHandler()用于接收Peer消息,queueHandler()用于维护消息发送列队,outHandler用于向Peer发送消息,pingHandler()用于向Peer周期性地发送心跳;
- 最后,向Peer发送verack消息,双方完成握手。
Peer start()成功后,节点间的Peer关系便成功建立,可以进一步交换其他协议消息了,如果节点与不同的其他节点建立了Peer关系,其他节点又与新的节点建立Peer关系,所有节点会逐渐形成一张P2P网络。然而,节点在交换Version时从对方获取了哪些信息,为什么要等到Version交换完成后Peer关系才能正常建立?各个Handler又是如何实现收发消息的呢?我们将在下一篇文章《Btcd区块在P2P网络上的传播之Peer》中详细介绍。