以太坊节点间通信是指本地节点和peer节点之间的按照p2p线上协议标准实现的数据收发过程。其中peer节点从发现协议维护的活跃节点列表中获取。
p2p消息传输协议详细介绍请看:https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol
ethereum线上传输协议详细介绍请看:https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
要说明p2p和eth线上协议,涉及到软件架构中的3层:
- service层: 提供PeerManager和ChainService
- protocol层: 实现p2p-protocol 和eth-protocol命令报文的收发
- network层: 接收tcp连接的原始数据并解析,分发给相应的协议
下面主要分析p2p-protocol和PeerManager,来说明这几层之间的交互流程。
1. 概述
本地节点需要和指定数量的peer维持tcp连接, 如果连接的节点数不足, 从节点发现协议维护的活跃节点列表中动态获取节点信息。
本地节点监听peer的tcp连接, 当有peer连接时,为每个peer都维护一个Peer数据结构,在这个Peer结构中实现了protocol层和network层:network层负责消息的收发, 并分发给protocol层的p2pProtocol和ethProtocol。
2. network层
网络层的详细功能将下图:
- PeerManager服务启动时,在p2p端口上启动tcp连接的监听;
- PeerManager服务启动时,如果配置有bootstrap, 会主动进行连接;
- 无论主动还是被动连接到一个peer, 都会建立一个Peer结构, 配置protocol层的协议(
p2pprotocol,ethprotocol
); - 在Peer结构初始化时,使能network层的数据收发功能;
- 本node和peer建立tcp连接后, 首先需要进行加密认证:
5.1 在初始化Peer结构的多路复用的会话分发器时, 作为连接的发起者(发起者知道对端的公钥), 会发送一个hello认证消息;
5.2 等发送协程起来后,hello认证消息就会被发送出去;
5.3 对端的Peer结构也是首次收到消息,即该hello认证消息, 在对端的分发器层需要专门处理该hello认证消息;
5.4 对端验证该消息的签名, 回复一个认证确认消息;
5.5 然后本端和对端都发一个hello认证消息,触发定时发送ping包,进行心跳保活; - 消息的发送和接收都是通过缓存队列异步执行的,方便控制发送速率和数据块大小。
- 首次和peer通信时,peer发过来的hello报文后, 该函数检查是否允许连接该peer是否超过配置最大的连接数, 超过了则不允许连接;是否是重复连接。
2.1 消息发送
- 每个Peer都有一个send_packet接口,用户发送已经组装好的消息报文;
- 发送的消息目前只支持p2pProtocol和ethProtocol协议层的格式;
- 消息送给消息复用分发器mux,组装成Frame格式
3.1 组装成Frame是为了RLP序列化编码,帧数据提供了报文的大小,报文的协议;
3.2 Frame有2种形式:单帧,多帧;
3.3 单帧的格式如下:header || header-mac || frame || mac
3.4 多帧的格式如下:
header || header-mac || frame-0 || [ header || header-mac || frame-n || ... || ] header || header-mac || frame-last || mac
消息也可以带有优先级,因此, 消息在组装Frame后,根据类型分别进入3中不同类型的队列:normal队列,chunked队列,priority队列;
从上面3个队列中根据pws(protocol-window-size)大小和队列中frame数据组装成数据流,方法如下:
5.1 normal队列和priority队列中存在Frame:分别获取pws/2字节的报文数据(以frame为单位)
5.2 chunked队列和priority队列中存在Frame:分别获取pws/2字节的报文数据(以frame为单位)
5.3 normal队列和chunked队列中存在Frame:分别获取pws/2字节的报文数据(以frame为单位)
5.4 其他情况,从有数据的队列中获取pws字节的frame;
5.5 所有获取的数据字节数大于pws才可以发送;对上面取出来的frame数据进行加密,在放入发送队列:message_queue;
线程_run_egress_message监听该队列, 发送最终的数据给peer;
2.2 消息接收
收到原始的tcp数据流后, 需要解析为消息, 并根据protocolID和cmdID分发给p2pProtocol和ethProtocol协议。
3. 协议层
protocol层处于service层和network层之间,实现了一系列的命令。
3.1 命令消息
在协议层实现p2p线上协议定义的命令。
协议层的实例比如ethProtocol协议实例,对于每个命令消息,在初始化时都会生成下面3个方法(X是命令名字):
protocol.create_X(*args, **kargs)
protocol._receive_X(data)
protocol.send_X(*args, **kargs)
其中send_X
是 protocol.send_packet(protocol.create_X(*args, **kargs))
的简写。
上图是ethProtocol协议实例举例,比如发送和接收block对应的接口分别是send_newblock()
,_receive_newblock()
处理, 最后调用注册的回调函数: receive_newblock_callbacks()
。
3.2 接收命令报文
接收从网络层的报文的入口是 protocol.receive_packet
。
在 protocol.receive_packet
中:
- 从peer的netwwork层接收报文;
- 根据命令的结构,对数据进行反序列化并保存为dict;
- 根据cmd_id得到cmd_name,执行cmd_name对应的receiveX(...)函数,即command.receive();command.receive()中默认执行的是注册的回调函数:
protocol.receive_X_callbacks
; - 依次执行注册的callbacks;
3.3 发送命令报文
直接调用send_X函数。
4. P2PProtocol协议层
P2PProtocol有4个命令:
-
hello: cmd_id = 0 握手报文
发送: send_hello(...)
接收: 在hello命令的receive中
- 先注册ethProtocol, 实现位置:
proto.peer.receive_hello(proto, **data)
- 启动定时发送ping的任务:
BaseProtocol.command.receive(self, proto, data)
- 先注册ethProtocol, 实现位置:
disconnect: cmd_id = 1 关闭连接报文
ping: cmd_id = 2 保活的心跳报文,定时发送ping给peer;
pong: cmd_id = 3 保活的心跳报文,收到ping后,回一个pong;
p2pprotocol还包括一个连接监视器:
self.monitor = ConnectionMonitor(self)
在ConnectionMonitor中, 设置了pong消息和hello消息的回调函数:
self.proto.receive_pong_callbacks.append(self.track_response)
self.proto.receive_hello_callbacks.append(lambda p, **kargs: self.start())
receive_pong_callbacks
: 计算ping和pong的时间间隔
receive_hello_callbacks
: 启动定时发送ping 的保活消息任务: 计算ping消息是否超时
5. 服务层: PeerManager
Peermanager负责和peer的连接,并维护peer结构.
5.1 peer连接
peer连接分2种:我去连peer, 等待peer来连我。
等待peer来连我(不知道peer的公钥)
在初始化时, 已经在p2p监听端口上启动了一个tcp服务, 监听tcp客户端的连接。
如果新的连接进来, 启动一个协程执行已注册的处理函数: _on_new_connection
, 流程如下:
peer = self._start_peer(connection, address)
- 创建一个Peer结构(继承自Greenlet)
peer = Peer(self, connection, remote_pubkey=remote_pubkey)
,详细过程见下面小节. - 设置peer挂掉时的回调函数
peer.link(on_peer_exit)
- 保存peer
self.peers.append(peer)
- 调度peer运行
peer.start()
. 最终通过_run启动3个协程, 处理network层的原始数据分发,详见下面小节.
主动连接peer(知道peer的公钥)
有2种情况需要主动连接peer:
- 在初始化时,如果配置了bootstrap,主动连接bootstrap;
- 监视协程_discovery_loop线程定时检查,检测到连接的peer数量少于指定数量时, 从节点发现协议kademlia维护的活跃节点列表中选取节点进行连接;
peer = self._start_peer(connection, address, remote_pubkey)
5.2 peer广播
接口:broadcast(protocol, command_name, ...)
这是一个通用函数, 可以广播指定协议的指定命令. 可以指定广播的peer数量, 和指定排除那些peer.
对于符合要求的peer, 根据command_name 找到对应的send_<command_name>函数,, 发送要求的数据.
6. Peer
peer实现network层和protocol层, 主要功能实际上在上文中已经详细介绍:发送消息, 接收数据并解析, 分发给相应的Protocol协议。
6.1 peer的初始化
- 创建一个加密的多路复用的command分发器
self.mux = MultiplexedSession(privkey, hello_packet, remote_pubkey=remote_pubkey)
,在MultiplexedSession
初始化中:
1.1hello_packet
报文是p2p协议的cmdID=0的协议报文
1.2 新建一个rlpx_session,用于加密会话数据
1.3 如果已知peer的公钥,和peer进行加密握手:_send_init_msg()
auth_msg = self.rlpx_session.create_auth_message(self._remote_pubkey)#创建认证消息 auth_msg_ct = self.rlpx_session.encrypt_auth_message(auth_msg) #加密认证消息 self.message_queue.put(auth_msg_ct) #发送加密后的消息
- 注册p2p协议, 启动和peer连接的服务
self.connect_service(self.peermanager)
2.1 实例化一个线上协议: P2PProtocol
2.2 在复用器中增加该协议:self.mux.add_protocol(protocol.protocol_id)
2.3 启动该协议(P2PProtocol):protocol.start()
。执行的是baseProtocol的start(), 注册自定义的收到命令消息后的回调函数
6.2 Peer在network层收发数据
在收到一个新的peer的连接到时候, 实例化一个Peer结构, 然后会触发peer.start()
, 创建3个协程分别处理收发报文:
_run_ingress_message
- 等待读事件:
self.safe_to_read.wait()
- 每次读取最多4k数据:
imsg = self.connection.recv(4096)
- 将消息加到分发器中:
self.mux.add_message(imsg)
3.1 对消息进行解码, 因为是流数据, 需要解析消息,并对消息进行解密
3.2add_message
有两个定义
- 第一次收到消息时处理函数是
_add_message_during_handshake
- 如果是连接的发起者, 收到的第一个消息是认证确认消息,如果是连接的接收者 , 收到的第一个消息是hello认证消息
- 解密消息,并验证签名(对端使用本节点的公钥进行签名)
- 如果是连接的发起者, 再发送一个hello认证消息给对端, 如果是连接的接收者, 验证成功后, 回复一个认证确认消息, 然后需要再发送一个hello认证消息给对端
- 以后的消息处理入口是:
_add_message_post_handshake
_run_decoded_packets
- 从packet_queue获取报文数据
- 解析消息,并对每个消息解析protocol, cmd_id
- 发给protocol层的receive_packet(pkt)
_run_egress_message
- 从message_queue中获取报文,并发送
- 刚启动该监听任务时, 如果是本连接的发起者, 那么队列中已经有一个hello的认证消息, 此时会马上将该消息发送出去.