目录
- 一、P2P协议
- 1.1 简介
- 1.2 P2P网络模型
- 1.3 分布式哈希表(DHT)
- 1.4 kademlia算法
- 二、Reed项目网络层
- 2.1 kademlia
- 2.2 节点发现层(UDP)
- 2.2.1 kademlia实现
- 2.2.2 核心逻辑
- 2.2.3 节点状态机
- 2.3 网络通讯层(TCP)
- 2.4 演示
引言
Reed和其他公链一样是一个非认证准入的项目,也就是说任何节点随时都可以加入到网络,而节点之间的通讯是通过P2P协议来完成的。整个网络层又可以分为两大层:使用UDP协议的节点发现层和使用TCP协议的通讯层。这篇内容首先简单介绍下P2P协议,然后总体介绍Reed的网络层框架,再分别结合代码详细介绍节点发现以及数据传输的实现。
一、P2P协议
1.1 简介
P2P(Peer-to-Peer)早在WWW诞生前就已经出现(大约70年代就已有应用),P2P网络是一种位于应用层的分布式网络,网络的参与者称为对等节点(Peer),节点共享出他们所拥有的硬件资源(处理能力、存储能力、网络连接等),这些共享资源提供的服务被其他节点访问,在P2P网络里,节点可以既可以是资源的提供者,也可以是资源的获取者。
P2P的应用非常广泛,我们所熟知的包括迅雷、BT下载、电驴、PPS、ICQ、QQ、Skype都属于P2P的应用。
1.2 P2P网络模型
集中目录式网络模型
是最早出现的P2P网络模型,它采用中央目录服务器来管理P2P网络节点,仍然具有中心化的特点。但是和C/S结构不同的是:它只是把节点目录存放在服务器,而C/S结构所有数据都存放在服务器。
早期出现的一个叫做Napster的MP3下载软件就是使用的这种网络模型:用户首先访问目录服务器,从服务器获得自己需要下载的文件的节点地址,然后直接访问该节点下载文件。对比C/S结构好处是,大大减少了目录服务器的压力。-
全分布式(纯P2P)网络模型
这种网络模型下每个节点是完全对等的,即是服务端也是客户端,也就是没有所谓的服务器。从网络拓扑上区分,又可以分为非结构化模型和结构化模型。这两种模型在资源定位的操作上是不一样的。- 非结构化模型在资源的组织管理上非常的随意,当需要进行资源的定位时一般使用盲目的搜索算法,例如泛洪(flooding)搜索算法。由于在网络中都不知道资源的位置,所以当一个查询发起时,节点会广播给他的邻居节点,他的邻居节点又会广播,直到找到指定资源或TTL值为0。TTL(Time To Life)是为了限制搜索的范围而设置的一个数值,每到一个节点,TTL减1,以防止搜索永远循环下去。可以想象此算法随着跳数的增加,访问的节点呈指数级增长,所以具有覆盖节点范围大、节点加入或退出影响小等优点。缺点是网络宽带消耗严重,随着节点数增加,搜索时网络流量会急剧增加,导致一些节点速度慢甚至失效。
- 结构化模型与非结构化的区别是每个节点维护的邻居是根据一个全局规则固定的,资源存放在哪个节点也是有规则的,搜索资源是,根据规则就可以用较少的跳数定位到指定资源。目前结构化模型主流都是采用分布式哈希表(DHT)技术。DHT相关内容在下面详细介绍。
分层式网络模型
此网络模型吸取了集中式目录网络模型和纯P2P网络模型的优点,在设计和处理能力上都进行了优化,按节点的能力(计算能力、内存大小、网络等)区分超级节点和普通节点两大类。在资源共享方面,所有节点的地位相同,区别在于:发现算法只在超级节点之间进行,超级节点再将查询请求转发给适当的普通节点。这样,在超级节点之间就构成一个高速转发层,超级节点和普通节点构成若干层次。
1.3 分布式哈希表(DHT)
分布式哈希表(Distributed Hash Table)是纯P2P结构化模型下的一种分布式存储方法,每一个节点负责一个小范围的节点路由和资源数据,所有节点组合起来就是一个可提供快速搜索的网络。
DHT的原理是:每个参与者称为节点,存储的数据项称为对象,每个节点和对象都有一个全网唯一的ID标识,当需要查找某个对象时首先计算出它的ID,然后找出拥有此对象的节点(并非只有一个)。
根据现实的不同,DHT可以分以下三类
- Multi-hop DHT
即多跳的DHT,在一个N个节点的网络中,路由跳数为O(logN),且每个节点的路由表保存有0(logN)个邻居节点。下面我们介绍到的kademlia就是属于此类。 - One-hop DHT
One-hop算法的每个节点需要保存一份网络上所有节点O(N)的路由表,查询对象时,根据路由可以达到一跳的效果。代价是在节点加入和退出频繁时需要不停的维护路由表。 - O(1) DHT
表示路由跳数为常数,One-hop DHT是一个特殊的O(1)DHT,这样来理解就好了。
1.4 kademlia算法
kademlia是DHT网络Multi-hop分类下的一种算法实现,美国纽约大学的Petar Maymounkov和David Mazieres于2002年发表了一篇论文[1],论文主要介绍一种基于异或(XOR)算法来定位节点。
在kademlia算法中,信息以<key,value>的方式存储,可以使用160bit长度的值表示节点的ID和key,当需要发布或查找<key,value>信息时,使用两者间距离(Distance)来查找,这个距离并非物理上的概念,而是通过XOR(异或算法)计算得到,kademlia通过独特的XOR为距离度量基础,创新了一种能够大大提供查询速度的拓扑结构,计算距离公式为:d(x,y) = x⊕y。
XOR(异或)是一种位运算,两者运算,对应为相同时为0,不同时为1,例如:0111⊕1001=1110,异或在密码学领域也有广泛的应用。
如何计算距离我们知道了,接着就是如何来保存节点的路由表。在kademlia算法中,把路由表分成160层,序号从0到159,根据其他节点与自身距离值来确定应该把节点放在那一层,例如:我们的ID为0111,需要查找的目标ID为1001,则d(0111,1001)=0111⊕1001=14,表示目标ID应该存储在我们路由表#14层,而这里的层在kademlia算法称为k桶(k-buckets),下面可以更直观的表示k桶
上图每一行就是一个k桶,每个k桶存放的节点信息按照距离划分,那么每个k桶存放多少个节点信息合适呢,如果按照上图所说义,从上到下k桶的覆盖范围呈指数增长,#0号k桶存放1个,#1号k桶存2个,#2号k桶存放4个,可以看出是一个类似金字塔的形状,越往下数量越大,最底下的#159号桶可以存放2^159个节点,但在实际应用中基本不会达到这么大的数字。在论文中给出每个k桶存放节点数量的一种方法是:低序号的k桶可在上面的基础上乘于3,高序号的k桶给定一个常数数值k(例如k=20)。
接下来我们介绍k桶的维护,也就是何时在什么条件下添加移除节点。当接收到来自任何一个节点的信息,无论是请求还是响应,都尝试更新所在k桶:如果该节点已经存在于对应的k桶,则将该节点移到列表最后面(每个k桶结构可以看做一个按更新时间升序排序的列表)。如果该节点不存在对应k桶,并且该k桶内的节点数少于数值k,则直接插入到列表尾端;如果节点已满(>=k),则对若干个节点进行ping
操作,如果无响应则移除,然后插入新的节点;如果有回应就丢弃新节点。这样的策略有效的保证无效的节点会被移除,活动的节点会被保留。
最后也是最重要的节点的查找或者说定位,查找节点使用操作find_node
表示,它和上面说到的ping
一样是属于kademlia网络的RPC操作。kademlia使用递归算法来进行节点的查找,假设我们现在需要查找一个节点,先结算我们之间的距离序号,从该k桶挑选安排α个最接近的节点(如果该k桶的节点数量不足α个,则从附近k桶挑选,直至满足α),对着α节点进行find_node
操作,等待返回结果,如果超时没响应,则丢弃。接着对返回的节点也进行find_node
操作,一直递归下去。可以看出,其实α就是操作的并发数。
以上是论文里对kademlia算法的描述,论文中很多数值也是没有标准,实际应用中应该根据需要来做调整,论文地址已放在文末。Reed项目P2P模块中kademlia算法与论文所描述大致相同。下面会结合源代码,来对kademlia算法以及整个P2P模块进行介绍。
二、Reed项目网络层
unknown
到known
通过一个状态机来流转,状态机就像一个漏斗,试图将所有状态都转换为known
,这一层与节点间通讯使用UDP协议完成。中间一层是网络通讯层(network),这部分主要职责是通过下层提供的节点(node),保证有足够的活动的对等节点(peer),由于底层提供的node只是一些相关的信息,还不能直接用于TCP通讯,网络通讯层需要进行dial、handshake等操作,将node转换成peer状态(如图所示),以便上层调用。最后一层是同步管理层(sync),这部分主要职责是负责区块、交易的同步和广播。
2.1 节点发现层(UDP协议)
这一层核心代码在https://github.com/ReedWe/reed/tree/master/p2p/discover目录下
- kademlia.go: DHT网络kademlia算法功能相关实现部分
- node.go:已知节点的保存及相关操作
- seeds.go:节点种子文件相关操作
- transition.go:节点状态机
- udp.go:节点发现的核心逻辑部分,以及UDP协议通讯相关功能
- udp_kad.go:kademlia算法相关的通讯部分
- udp_listener.go:UDP协议端口监控实现
2.1.1 kademlia实现
kademlia算法这部分结合上面的介绍很好理解,这里挑两个比较重要的函数介绍:
func (t *Table) Add(n *Node) {
t.mutex.Lock()
defer t.mutex.Unlock()
if n == t.OurNode {
return
}
for _, b := range t.Bucket {
if contains(b, n.ID) {
log.Logger.Debug("node exists in table")
return
}
}
dist := logarithmDist(t.OurNode.ID, n.ID)
if len(t.Bucket[dist]) < kBucketSize {
t.Bucket[dist] = append(t.Bucket[dist], &bNode{node: n, lastConnAt: time.Now().UTC()})
}
log.Logger.WithFields(logrus.Fields{"ID": n.ID.ToString(), "IP": n.IP.String()}).Info("added node")
// TODO when len(kBucket) >= kBucketSize
// do something...
}
经过状态机层层转换最终达到known状态的节点,会被添加到kademlia表,函数通过sync.Mutex保证线程安全,如果被添加的节点是自己,直接终止。然后遍历kademlia表查看是否已存在,如果存在也直接终止,继续往下执行logarithmDist()
,此函数主要是计算两个节点的距离,下面有详细的介绍。以距离值为索引,标记节点存放的位置,如果所在k桶未满,将节点放到列表末尾;这里留下了一个TODO:k桶已满的情况还没有处理,按照论文中介绍,如果出现这种情况,首先需要对最近很少看到的节点(lastConnAt值较小)进行ping
操作,如果有响应,则将其移到k桶末尾并丢弃新的节点,如果没有响应,将其移出k桶并将新节点放到末尾。
// logarithmDist return distance between a and b
// return log2(a^b)
// k-bucket distance description
// 0 [2^0,2^1) 存放距离为1,且前255bit相同,第256bit开始不同(即前255bit为0)
// 1 [2^1,2^2) 存放距离为2~3,且前254bit相同,第255bit开始不同
// 2 [2^2,2^3) 存放距离为4~7,且前253bit相同,第254bit开始不同
// ...
// 提示:
// ID长度为32Byte,256bit。
// 上面循环每一位,进行异或(^)操作,结果0表示相同,1表示不同
// 所以“前导0个数为255”表示有255个bit是相同的
func logarithmDist(a, b NodeID) int {
for i := range a {
x := a[i] ^ b[i]
if x != 0 {
lz := i*8 + lzcount[x] // 256bit leading zero counts
return IDBits - 1 - lz
}
}
return 0
}
因为NodeID
是一个长度32的字节数组(即256位),这个NodeID
可以看成由256个0或1组成,根据上面代码的注释,我们可以将运算结果转化为:前多少位相同的说法,例如上面注释举例的如果距离为1,可以说两个ID前255位都相同,即前255位都为0。这样是为了方便在代码中进行运算,32个字节这个长度不可直接进行运算,我们把他按字节来拆开分别运算(14行),然后单个字节进行异或运算得到x(15行),如果x为0表示整一个字节都是相同的(8位全为0),如果x不为0,计算x的前导0数量,并加上前面字节的前导0数量(i*8)。
lzcount[x]
是一个256位的数组,里面保存了一个字节的所有值的前导0数量,例如:数值12的进制为00001100,所以前导0数量为4个,32个字节也是同样道理。
2.1.2 核心逻辑
这部分建议结合上面的架构图来看,这一层的入口处在udp.go文件的NewDiscover()
函数
func NewDiscover() (*UDP, error) {
// our node
o, err := getOurNode()
if err != nil {
return nil, err
}
// kademlia table
t, err := NewTable(o)
if err != nil {
return nil, err
}
// udp listener
l, err := NewUDPListener(o.IP, o.UDPPort)
if err != nil {
return nil, err
}
udp := &UDP{
OurNode: o,
Table: t,
conn: l.conn,
listener: l,
nodes: map[NodeID]*Node{},
timeoutEvents: map[timeoutEvent]*time.Timer{},
readCh: make(chan ingressPacket, 100),
timeoutCh: make(chan timeoutEvent),
queryCh: make(chan findNodeQuery),
quitCh: make(chan struct{}),
}
udp.BaseService = *common.NewBaseService(nil, "udp", udp)
return udp, nil
}
根据注释提示很清楚知道先是获得自己的节点信息,然后新建一个kademlia表,再新建一个UDP的监听器,跟着初始化UDP结构,最后将这一层以服务的方式启动,然后我们看到服务启动的代码:
func (u *UDP) OnStart() error {
go u.loop()
go u.readLoop()
u.refresh()
return nil
}
服务启动会分别另起两个个goroutine,一个执行loop()
函数,一个执行readLoop()
函数,最后再当前goroutine执行一遍刷新操作。loop()
函数内部只有一个for循环,内部有6个分支分别对应不同的chan:
func (u *UDP) loop() {
var refreshTableTicker = time.NewTicker(refreshTableInterval)
var refreshBucketTimer = time.NewTimer(refreshBucketInterval)
for {
select {
case pkt := <-u.readCh:
log.Logger.WithFields(logrus.Fields{"remoteIP": pkt.remoteAddr.IP, "Port": pkt.remoteAddr.Port, "event": pkt.event, "remoteID": pkt.remoteID.ToString()}).Info("->read a UDP message")
// TODO check packet
n := u.internNode(&pkt)
u.processPacket(n, pkt.event, &pkt)
case toe := <-u.timeoutCh:
log.Logger.WithFields(logrus.Fields{"remoteID": toe.node.ID.ToString(), "IP": toe.node.IP, "PORT": toe.node.UDPPort, "event": toe.event}).Info("UDP timeout event")
if u.timeoutEvents[toe] == nil {
break
}
delete(u.timeoutEvents, toe)
u.processPacket(toe.node, toe.event, nil)
case f := <-u.queryCh:
fmt.Println("<-queryCh")
if !f.maybeExecute(u) {
// delay execute
f.remote.pushToDefer(&f)
}
case <-refreshTableTicker.C:
// TODO if the prev refresh not done?
log.Logger.Info("time to refresh table")
u.refresh()
case <-refreshBucketTimer.C:
log.Logger.Info("time to refresh k-bucket")
targetNode := u.Table.chooseRandomNode()
if targetNode != nil {
go u.lookup(targetNode.ID)
} else {
log.Logger.Info("no target to lookup")
}
refreshBucketTimer.Reset(refreshBucketInterval)
case <-u.quitCh:
log.Logger.Info("udp.loop() quit")
return
default:
}
}
}
第1个分支是接收所有来自其他节点的UDP信息,该通道是通过UDP监听器写入的,也就是上面OnStart函数里面调用的readLoop()
,这个函数后面再详细说明。回到第1个分支,首先根据节点发送的包信息解析出一个临时节点:根据当前节点ID,如果节点已存在本地列表,直接返回,否则构建一个新的节点,这个新构建的节点初始状态为unknown
,接着就是根据节点的状态,执行对一个的操作,这个过程会交给状态机来执行,这部分在下面详细说明。第2个分支是处理那些超时没响应的请求,例如:向对方发送ping、findNode等操作都会设置超时时间,如果超时,节点状态也会流转。第3个分支处理节点查找的请求,maybeExecute()
尝试直接执行查找操作,如果需要查找的目标是本节点,直接从本地kademlia表返回最近的几个节点(目前常量kBucketSize=16),节点状态为unknown
、节点不可执行查询操作(!canQuery)或当前有正在执行的操作(pendingQuery!=nil)都返回false,这种情况需要将本次的查找操作加入到队列pendingQuery,等到节点满足条件后台,再执行查找操作。第4个分支是定时刷新整个kademlia表数据。 第5个分支是定时刷新k桶节点。第6个分支读取的是退出当前循环的通知,例如当我们的节点退出时,会依次执行OnStop函数通知quit通道来结束循环。
接着再回到OnStart里面的readLoop()
函数:
func (u *UDP) readLoop() {
defer u.conn.Close()
buf := make([]byte, 512)
for {
n, from, err := u.conn.ReadFromUDP(buf)
if err != nil {
log.Logger.Errorf("failed to read remoteAddr UDP:%s", err.Error())
}
log.Logger.WithFields(logrus.Fields{"remoteAddr": from}).Debug("read remoteAddr UDP")
u.handlePacket(from, buf[:n])
}
}
关键第5行,u.conn接收的是一个UDP监听器实例UDPConn,这里接收所有UDP请求,然后执行handlePacket()
函数解包,通过通道readCh到达上面loop()
函数的第1个分支。
接着再回到OnStart里面的refresh()
函数:
func (u *UDP) refresh() {
// lookup nodes
seeds := getSeeds()
log.Logger.Infof("node seed count:%d", len(seeds))
for _, seedNode := range seeds {
if seedNode.IP.Equal(u.OurNode.IP) && seedNode.UDPPort == u.OurNode.UDPPort {
continue
}
u.nodes[seedNode.ID] = seedNode
if seedNode.state == unknown {
transform(u, seedNode, verifyInit)
}
// Force-add the seed node so Lookup does something.
// It will be deleted again if verification fails.
u.Table.Add(seedNode)
}
// TODO get nodes from db?
go u.lookup(u.OurNode.ID)
}
节点启动的时候会先执行一遍refresh()
,启动完成后,通过一个timer来定时执行。首先从节点种子文件取出种子节点,u.nodes
保存了所有见过的节点,如果节点状态为unknown
,直接交给状态机来执行,然后可以马上加入到表(第15行),这里还不知道节点是否可用,为什么可以直接加入到表呢?因为在状态机执行过程中,如果节点不符合要求,最终会被移除。最后,新起一个goroutine执行lookup()
节点查找函数,入参是本节点ID。这里明明是刷新表的操作,为什么最终却去执行节点查找呢?等我们介绍完lookup()
函数的实现就知道了。
func (u *UDP) lookup(target NodeID) {
log.Logger.Infof("lookup target=%s", target.ToString())
replyCh := make(chan *findNodeRespReply, alpha)
asked := make(map[NodeID]bool)
pendingNodes := make(map[NodeID]*Node)
nd := nodesByDistance{target: target}
nd.push(u.OurNode)
for {
for i := 0; i < len(nd.entries) && len(pendingNodes) < alpha; i++ {
n := nd.entries[i]
if !asked[n.ID] {
asked[n.ID] = true
pendingNodes[n.ID] = n
u.queryCh <- findNodeQuery{remote: n, target: target, reply: replyCh}
}
}
// no more node
if len(pendingNodes) == 0 {
log.Logger.Info("no more node in pendingNodes,stop lookup")
break
}
select {
case r, ok := <-replyCh:
log.Logger.Debugf("lookup case:replyCh")
if ok && r != nil {
for _, n := range r.nodes {
if n != nil {
log.Logger.WithFields(logrus.Fields{"remoteIP": n.IP, "port": n.UDPPort, "remoteID": n.ID.ToString(), "state": n.state}).Info("--node")
nd.push(n)
}
}
delete(pendingNodes, r.remoteID)
} else {
log.Logger.Debug("reply chan closed")
}
case <-time.After(responseTimeout):
log.Logger.Infof("lookup timeout,pendingNodes count %d", len(pendingNodes))
for _, v := range pendingNodes {
if v.pendingQuery == nil {
continue
}
// forget all pending requests
close(v.pendingQuery.reply)
// if reply is nil,don't write in replyChan
// see func processFindNodeResp()
v.pendingQuery.reply = nil
}
// start new one
pendingNodes = make(map[NodeID]*Node)
case <-u.quitCh:
return
}
}
}
lookup()
职责是查找给定节点(入参target),这里面大致逻辑其实在[1.4 kademlia算法]里面也有介绍,这里实现上大致相同。分别声明几个变量:节点响应通道、记录已访问过节点Map、正在执行查找操作Map。nodesByDistance
是一个存放于目标节点(target)距离从短到远排列的结构。先把我们自己的节点push进去,这时候entries里面只有我们自己一个节点,来到for
循环(第10行),取出alpha
个节点,也就是执行的并发数,然后向取出的节点发送一个findNode
的请求,由于这时候entries只有我们自己的节点,所以就是对自己执行查找操作,具体发送请求的细节通过queryCh通道发送出去。继续往下来到第25行,如果被请求的节点有响应,会进入这里(具体的细节可以跟着源代码查看),接收的是一个节点列表,将他们全部push到entries列表,并被请求节点ID从pendingQuery队列移除。再下面是一个控制影响超时的分支,最后是一个退出循环信号,退出的时机和上面look()
函数说的一样。
到这里,第一轮循环结束,这时entries节点已经变成OurNode + 第一轮响应的多个节点,来到第10行,由于OurNode第一轮已被访问,所以这里排除,对剩余的节点执行相同操作。一直按这种逻辑递归下去,直到已没有可以执行的节点为止。了解这里面的逻辑后,再回头看上面的问题:为什么刷新表的操作最终会执行节点查找?刷新表其实就是将所有的节点更新一遍,在refresh()
里面调用lookup()
的入参是ourNodeID,也就是查找的是我们自己的节点,按照这逻辑,到第一轮循环时,相当于从我们的节点列表中,取出离我们节点最近的若干个节点进行访问,也就是以我们节点为中心,慢慢开始扩散,再回想kademlia表结构存储的逻辑:离我们节点越近会存储得越多,越远存储得越少。这样就相当于每次更新,大家都尽可能的存储离自己最近的节点,所有节点组合起来,就形成了一个kademlia网络。
目前为止,也只是介绍了整个节点发现层的很小一部分,这里目的也就是做个引导,展示的代码也都是功能的入口,更详细还是需要阅读源码。目前为止,还是有留下了不少TODO,这些点都是认为起码是不影响V1版本的功能,后面有时间再慢慢迭代吧。
2.1.3 节点状态机
状态机在这一层是十分重要的一部分,从上面介绍中不难看出,节点间最终核心的交互都是由状态机来完成,Reed的状态机参照以太坊,规定节点有7中状态:unknown、verifyInit、verifyWait、remoteVerifyWait、known、contested、unresponsive。为了方便理解,我们先看这一张状态机的流转图。
从上图可以发现,无论出于什么状态,最终都是可以流转到known,这很重要,因为节点必须出于known状态时,才能对它执行节点查找的操作,而处于其他状态的节点我们都可以认为暂时是不稳定的。状态机相关的代码在文件transition.go,由于代码太多就不贴了,内部实现是定义了7个变量对应7中状态,在init函数内部(init是比main函数还要早执行的函数)定义好了每种状态对应的操作,这里面操作有两种:enter和handle对应两种执行时机。enter表示进入到此状态之前需要执行一次此函数,handle表示处于当前状态时,任何操作都经由此函数完成。
结合上图简单介绍一下流转的流程,当一个从未见过的节点访问我们,此时他的状态是unknown,然后我们会对它进行一次ping
操作,此时状态变为verifyWait,如果对方响应我们(给我们发送一个pong
),就可以将状态直接变成known,如果超时就回到unknown。如果是我们主动发起的请求,一开始的状态会是verifyInit(例如刷新的时候,从种子列表的节点),而verifyInit状态的enter函数会执行一个发送ping
的操作,如果对方响应pong
,状态流转为remoteVerifyWait,这个状态表示等待对方认证我们,想象假设对方也没见过我们,他一样会走一遍unknown->verifyWait->known的过程。而此状态的enter函数会启动一个超时定时器,一旦timeout,就会变成known(见上图黑线),这里可以理解是为了延时变为known,为什么需要延时?因为虽然我们是已经认证了对方,但还需要等待对方也认证我们,这个延时就是等待认证的时间。最后,如果一个known状态的节点在执行findNode
操作连续超时次数达到5次,那么会被设置为contested,可以理解为处于待观察,会同时发送ping
操作,如果还是timeout,就会变成unresponsive表示节点已无响应,等到对方重新连接后,转换会known。
2.3 网络通讯层(TCP)
这一层源代码地址https://github.com/ReedWe/reed/tree/master/p2p,可以结合开头给出的架构图一起看(中间那一层)。
- connection.go:一个对等节点的TCP连接实例,与对等节点的通讯方法(read/write)都在这里吗。
- handler.go:这里是一个接口,表示接收到数据包需要做出哪些响应,Reed的网络层设计的时候考虑与业务功能分离,所以具体的实现交给上层。
- listener.go:TCP监听器,其他节点请求的入口
- net.go:这一层的核心,也是入口,包含控制服务器启动、停止、定时刷新对等节点、dial、handshake等功能。
- node_info.go:节点相关信息结构,用于与其他节点交互,对等节点间连接时会把此结构告诉对方。
- peer.go:对等节点结构信息,保存在本地使用。
- peer_dialing.go:保存了正处于dialing状态的对等节点。
- peer_map.go:保存了当前处于活动状态的节点。
- server.go:P2P层的服务实例,包括服务启动、停止操作,严格来说不属于这一层了。
我们依然是从这一层的入口开始介绍,位于在net.go文件:
func NewNetWork(ourNode *discover.Node, t *discover.Table, acceptCh <-chan net.Conn, handlerServ Handler) (*Network, error) {
n := &Network{
pm: NewPeerMap(),
dialing: NewPeerDialing(),
table: t,
ourNodeInfo: NewOurNodeInfo(ourNode.ID, ourNode.IP, ourNode.TCPPort),
handlerServ: handlerServ,
acceptCh: acceptCh,
disConnCh: make(chan string),
quitCh: make(chan struct{}),
}
n.BaseService = *common.NewBaseService(nil, "network", n)
return n, nil
}
func (n *Network) OnStart() error {
go n.loop()
go n.loopFillPeer()
return nil
}
NewNetWork()
是创建一个网络通讯层的实例,入参依次是:我们自己的节点信息、kademlia表、接收TCP监听器连接的通道、handle的具体实现。这里和UDP那一层几乎相同,就不再一一解析了,看到下面的启动函数OnStart里面有两个函数,相信都能猜到第一个是用来接收请求,第二个是定时更新对等节点,以保证活性。老规矩进入loop()
详细介绍:
func (n *Network) loop() {
for {
select {
case c, ok := <-n.acceptCh:
if !ok {
return
}
log.Logger.WithFields(logrus.Fields{"remote addr": c.RemoteAddr().String()}).Info("->accept peer")
if err := n.addPeerFromAccept(c); err != nil {
log.Logger.Error(err)
}
case addr := <-n.disConnCh:
log.Logger.WithField("peerAddr", addr).Info("remove disconnection peer")
peer := n.pm.get(addr)
if peer != nil {
n.releasePeer(peer)
}
case <-n.quitCh:
log.Logger.Info("[loop] quit")
return
}
}
}
依然是读取通读的信息,第一个分支处理的是那些尝试与我们建立通道的节点,第二个分支处理端口连接的分支,最后的分支是退出循环通知。回到第一个分支处,从acceptCH通道取出来的类型是个net.conn,接着交给addPeerFromAccept()
函数来处理:
func (n *Network) addPeerFromAccept(conn net.Conn) error {
if n.pm.peerCount() >= connectionSize {
_ = conn.Close()
return errors.Wrap(addPeerFromAcceptErr, "enough peers already exist")
}
return n.connectPeer(conn)
}
func (n *Network) connectPeer(rawConn net.Conn) error {
nodeInfo, err := n.handshake(rawConn)
if err != nil {
return err
}
peer := NewPeer(n.ourNodeInfo, nodeInfo, n.disConnCh, rawConn, n.handlerServ)
if err = peer.Start(); err != nil {
return err
}
n.pm.add(peer)
log.Logger.WithField("peerAddr", peer.nodeInfo.RemoteAddr).Info("Add a new peer and started.")
return nil
}
addPeerFromAccept()
函数首先判断当前对等节点连接数是否已满,我们设置的是30个,连接数是需要资源的,不是越多越好,因为TCP是基于字节流,通讯是连接会一直打开。如果未满则调用connectPeer()
函数,此函数主要职责有二:1、与对方进行握手,2、将对等节点启动并加入到本地列表。握手函数这里不详细展开,简单点讲就是向对方发送一个握手请求,然后对方返回他的节点信息(上面提到的node_info),这样握手就完成了。启动对等节点就是将当前打开的连接(rawConn)挂靠到peer结构下,并开启一个新的goroutine来与对方通讯,这一段是在connection.go文件实现的,也就是说当前有多个peer实例就会对应多少个connection实例,他们之间是1:1关系。
func (c *Conn) readGoroutine() {
input := bufio.NewScanner(c.rawConn)
for input.Scan() {
if c.specialMsg(input.Bytes()) {
continue
}
writeMsg := c.handlerServ.Receive(input.Bytes())
if writeMsg != nil {
if err := c.Write(writeMsg); err != nil {
log.Logger.Errorf("connection.read failed to write:%v", err)
}
}
}
if input.Err() != nil {
log.Logger.WithField("remoteAddr", c.rawConn.RemoteAddr().String()).Errorf("readGoroutine error:%v", input.Err())
} else {
log.Logger.WithField("remoteAddr", c.rawConn.RemoteAddr().String()).Errorf("readGoroutine has closed")
}
if c.getState() == connectSt {
// disconnection by the other side
c.setState(remoteDisConnSt)
c.disConnCh <- c.peerAddr
}
return
}
上面是一个连接通讯的函数,specialMsg()
表示特殊的消息,例如handshake,c.handlerServ是上面提到的handle接口,也就是说连接会将读取到的字节交给传入的handel实现类去执行,这里并不关心他是怎么执行的,最终会返回在一个字节数组,该字节数组需要返回给对方。如果连接中断,会跳出for循环,下面判断了连接状态,如果是对方导致的中断,需要将节点信息写入通道disConnCh,表示对方断开了连接,释放掉peer(releasePeer()
函数)。
到此,整体的建立连接以及通信流程已介绍完,接下来再回到一开始的OnStart的第二个执行函数loopFillPeer()
:
func (n *Network) loopFillPeer() {
timer := time.NewTimer(peerUpdateInterval)
for {
select {
case <-timer.C:
n.fillPeer()
timer.Reset(peerUpdateInterval)
case <-n.quitCh:
log.Logger.Info("[loopFillPeer] quit")
return
}
}
}
func (n *Network) fillPeer() {
log.Logger.Debug("time to fill Peer")
nodes := n.table.GetWithExclude(activelyPeerCount, n.pm.IDs())
if len(nodes) == 0 {
log.Logger.Debug("no available node to dial")
return
}
var wg sync.WaitGroup
var waitForProcess []string
for _, node := range nodes {
addr := toAddress(node.IP, node.TCPPort)
if n.dialing.exist(addr) {
log.Logger.WithField("peerAddr", addr).Info("peer is dialing")
continue
} else {
waitForProcess = append(waitForProcess, addr)
}
}
wg.Add(len(waitForProcess))
for _, addr := range waitForProcess {
go n.dialPeerAndAdd(addr, &wg)
}
wg.Wait()
}
loopFillPeer()
控制更新对等节点的定时执行,我们设置前后间隔为10秒,当一次执行完成后,时间才会重置,fillPeer()
函数首先从kademlia表获取指定数量的节点(排除已处于活动状态),然后逐个判断节点是否正在进行dial(第27行),如果不是,加入到待处理列表,最后并发执行对节点的dial和handshake操作(第37行)。
func (n *Network) dialPeerAndAdd(addr string, wg *sync.WaitGroup) {
defer func() {
n.dialing.remove(addr)
wg.Done()
}()
n.dialing.add(addr)
rawConn, err := n.dial(addr)
if err != nil {
log.Logger.WithField("peerAddr", addr).Errorf("failt to dial peer:%v", err)
return
}
if err = n.connectPeer(rawConn); err != nil {
log.Logger.WithField("peerAddr", addr).Errorf("failed to connect peer: %v", err)
return
}
}
这里逻辑相对简单,首先对节点执行dial尝试取得连接,如果成功,则执行connectPeer()
函数,这个函数前面已经介绍过了,defer声明的函数表示延时执行,在函数开始的时候将节点加入到dialing列表来标记它正在dial,在结束的时候(无论如何都会)再将它从列表移除。
2.4 演示
下面演示一下节点发现到建立TCP连接的过程:我们准备了三个节点程序,为了方便阅读ID分别为1000...00,2000...00,3000...00,端口分别是:30391、30392、30393,注意UDP和TCP使用相同的端口,按顺序从1、2、3开始启动,其中第1个节点是种子节点,等到3个节点都启动后,我们看看节点1的日志输出:
上图红框表示通过UDP连接状态流转到known,加入了kademlia表,绿框表示已经建立了TCP连接,可以看到节点1与其他两个已成功建立连接。
上图依然是节点1的日志输出,这次我们把节点2关闭了,这时候可以看到节点2的连接被移除,接着把节点2peer实例也一起移除,而在节点发现层,在刷新k桶的时候会把状态流转到unresponsive,直到节点2重新上线。
到这里,项目的整个网络层就介绍完了,至于数据同步管理(sync manager)这一层,目前还没开始开发,等开发完后再补回来。目前整个项目的进度大概是60%左右,网络层也只是实现了最小功能,还有部分需要但还没做的例如:通讯加密、节点存储优化、通讯数据的压塑、引入flowrate来检测流量、对节点的评分机制等。
博客地址已经迁移到:http://www.chenyuanjian.work,后续就直接在新地址更新了。
References
[1] Kademlia: A Peer-to-peer Information System Based on the XOR Metric http://www.scs.stanford.edu/~dm/home/papers/kpos.pdf