在 <以太坊节点间通信和数据交互分析> 一文中,介绍了service层,protocol层和network层,其中service层还有一个很重要的ChainService。
ChainService服务提供链操作相关的辅助功能:
- 广播交易
- 管理交易池
- 广播block
- 发送和处理block相关的命令, 比如请求/发送指定block,请求/发送链状态等等。
它的protocol层是ETHProtocol协议。ethProtocol封装了和链相关的数据交互命令,极大方便了节点之间的链数据通信,ETHProtocol的启动时会对DAO分叉进行检查,确保是和最新的分叉后的peer进行连接, 并且在和peer握手后自动执行block同步的动作,详细流程个如下:
1. ETHProtocol协议的启动
节点加载ETHProtocol协议时,
- 通知本节点的状态给peer,
- 触发peer进行block的同步检查,如果节点的block高度比peer高, 启动block的同步.
节点加载ETHProtocol协议首先会完成DAO分叉的检查:
- 在添加ETHProtocol协议到peer的protocol层的时候, 调用
on_wire_protocol_start()
,在该函数中除了注册各个命令的接收回调函数之外, 还会发送本节点的状态:
proto.send_status( chain_difficulty=self.chain.get_score(head), chain_head_hash=head.hash, genesis_hash=self.chain.genesis.hash)
- 对端收到该status消息后,除了检查信息, 还会发起一个DAO挑战:
self.dao_challenges[proto] =
(DAOChallenger(self, proto), chain_head_hash, chain_difficulty)
2.1. 在DAOChallenger中启动一个协程: gevent.spawn(self.run)
2.2. 在run中, 首先向本端请求DAO分叉的block头: self.proto.send_getblockheaders(self.config['DAO_FORK_BLKNUM'], 1, 0)
2.3. 本端收到请求消息后, 发现是DAO挑战, 发送DAO分叉的block头:
headers.append(build_dao_header(self.config['eth']['block']))
proto.send_blockheaders(*headers)
2.4. 对端收到该block头,发现是DAO挑战, 调用DAOChallenger
的函数处理该block头:
self.dao_challenges[proto][0].receive_blockheaders(proto, blockheaders)
2.5. receive_blockheaders中将block头传递给上面的run协程中, 检查收到的DAO block头的hash和extra_data,
2.6. 将检查结果传给chainservice: self.chainservice.on_dao_challenge_answer(self.proto, result)
, 当然, 如果消息接收超时, result=false
2.7. on_dao_challenge_answer
中处理:
result=false: DAO挑战失败, 说明是没有进行DAO分叉的节点,停止和peer的连接;
result=true: DAO挑战成功, 说明是没有进行DAO分叉的节点,进行请求chain数据的操作:
# request chain
self.synchronizer.receive_status(proto, chain_head_hash, chain_difficulty)
# send transactions
transactions = self.transaction_queue.peek()
if transactions:
log.debug("sending transactions", remote_id=proto)
proto.send_transactions(*transactions)
synchronizer.receive_status 只有在连接新的peer的时候才同步
检查最开始收到的head_hash是否在本地的chain上, 或者在本地的缓存block_queue中, 如果已经在的话, 不再同步
如果本地节点已经启动同步任务synctask,也不再需要同步
-
如果收到的难度值比本地的难度值要大,需要创建一个同步任务:
self.synctask = SyncTask(self, proto, blockhash, chain_difficulty)
SyncTask的主要任务就是
fetch_hashchain()
,
SyncTask.fetch_hashchain()
- 检查blockhash是否在链上, 如果存在的话,说明对方的链不是最新的链, 不需要进行同步.
- 遍历所有注册的protocols(即所有peer的ethprotocol)
2.1. 发送请求getblockheaders, 获取的block数是32
2.2. 对端收到blockhash后, 获取指定数量的block头
2.3. 对端将block头发送回来:proto.send_blockheaders(*headers)
2.4. 本端收到后, 在回调函数on_receive_blockheaders()
中送给同步器:self.synchronizer.receive_blockheaders(proto, blockheaders)
2.5. 同步器中送给syncTask:self.synctask.receive_blockheaders(proto, blockheaders)
2.6. syncTask根据收到的blockheaders,获取block:self.fetch_blocks(blockheaders_chain)
2.7. 根据block头的hash,去获取block数据:proto.send_getblockbodies(*blockhashes_batch)
2.8. 对端获得blockhash, 获取block,然后发送:proto.send_blockbodies(*found)
2.9. 收到blockbody,送给同步器,再送给syncTask:self.synchronizer.receive_blockbodies(proto, bodies)
,self.synctask.receive_blockbodies(proto, bodies)
2.10. syncTask根据收到的blockbodies,生成临时的未验证的block:TransientBlock(h, body.transactions, body.uncles)
.TransientBlock
里面的交易数据是RLP编码
2.11. 将block添加到链上:self.chainservice.add_block(t_block, proto)
, 详见下一小节.
2. ChainService.add_block(t_block,proto)
该函数将一个block送给核心层,添加到链上。
- 先将TransientBlock加到缓存队列 block_queue, 启动一个异步处理的协程:
_add_blocks()
, 并加锁执行, 当协程执行成功后新起一个协程继续添加; -
_add_blocks
协程中, 从block_queue取出block - 本block已经在链上: 丢弃该block, 继续取下一个;
- 本block的父块还没有在链上, 丢弃该block, 继续取下一个;
- 本block的父块在链上, 本block没有在链上, 将该block的TransientBlock类型转换为Block类型(取Block的成员变量时会进行RLP解码);
- 将block加到真正的链上:
self.chain.add_block(block)
; - 将交易从交易池中删除;