/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items identified by string numbers. The protocol is as follows: 1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers. 2) Each remote peer responds with a digest of its messages and returns that NONCE. 3) The initiator checks the validity of the NONCEs received, aggregates the digests, and crafts a request containing specific item ids it wants to receive from each remote peer and then sends each request to its corresponding peer. 4) Each peer sends back the response containing the items requested, if it still holds them and the NONCE. Other peer Initiator O <-------- Hello <NONCE> ------------------------- O /|\ --------- Digest <[3,5,8, 10...], NONCE> --------> /|\ | <-------- Request <[3,8], NONCE> ----------------- | / \ --------- Response <[item3, item8], NONCE>-------> / \ */
- 先看下官方的解释好了。
- 还记得之前的RemoteStateRequest篇么?那个似乎也是去拉取block啊,这里似乎也是啊,下面我们具体看下,为什么要有这两种机制,他们所面对的场景是什么。
Hello
发起点
func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine {
engine := &PullEngine{
PullAdapter: participant,
stopFlag: int32(0),
state: util.NewSet(),
item2owners: make(map[string][]string),
peers2nonces: make(map[string]uint64),
nonces2peers: make(map[uint64]string),
acceptingDigests: int32(0),
acceptingResponses: int32(0),
incomingNONCES: util.NewSet(),
outgoingNONCES: util.NewSet(),
digFilter: df,
digestWaitTime: util.GetDurationOrDefault("peer.gossip.digestWaitTime", defDigestWaitTime),
requestWaitTime: util.GetDurationOrDefault("peer.gossip.requestWaitTime", defRequestWaitTime),
responseWaitTime: util.GetDurationOrDefault("peer.gossip.responseWaitTime", defResponseWaitTime),
}
go func() {
for !engine.toDie() {
time.Sleep(sleepTime)
if engine.toDie() {
return
}
engine.initiatePull()
}
}()
return engine
}
func (engine *PullEngine) initiatePull() {
engine.lock.Lock()
defer engine.lock.Unlock()
engine.acceptDigests()
for _, peer := range engine.SelectPeers() {
nonce := engine.newNONCE()
engine.outgoingNONCES.Add(nonce)
engine.nonces2peers[nonce] = peer
engine.peers2nonces[peer] = nonce
engine.Hello(peer, nonce)
}
time.AfterFunc(engine.digestWaitTime, func() {
engine.processIncomingDigests()
})
}
- acceptDigests表示正式开始一次基于hello-digest流程,串行执行。
- 当PullEngine启动的时候每隔一定时间会随机选取节点发起Hello请求
- 这里需要注意的是,会记录Hello请求的Nonce和peer节点的关系。应该是后面会以这个来跟踪执行情况。
- 发出的Hello会记录到outgoingNONCES里面
- 超时的情况,之后再讲
请求
func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
helloMsg := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Content: &proto.GossipMessage_Hello{
Hello: &proto.GossipHello{
Nonce: nonce,
Metadata: nil,
MsgType: p.config.MsgType,
},
},
}
p.logger.Debug("Sending", p.config.MsgType, "hello to", dest)
sMsg, err := helloMsg.NoopSign()
if err != nil {
p.logger.Errorf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}
p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
- 没什么好讲的,注意下nonce就好。下面看下对方收到hello怎么处理。
处理
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
engine.incomingNONCES.Add(nonce)
time.AfterFunc(engine.requestWaitTime, func() {
engine.incomingNONCES.Remove(nonce)
})
a := engine.state.ToArray()
var digest []string
filter := engine.digFilter(context)
for _, item := range a {
dig := item.(string)
if !filter(dig) {
continue
}
digest = append(digest, dig)
}
if len(digest) == 0 {
return
}
engine.SendDigest(digest, nonce, context)
}
- 这里也是一样会记录收到的hello请求的nonce到本地incomingNONCES
- 将本地的state里面的blocknums序号集组装成Digest返回,这里以blockpullengine为例,每次收到block消息时候,都会将block序号冗余到engine的state里面。
- 下面我们看下Digest的情况
Digest
请求
func (p *pullMediatorImpl) SendDigest(digest []string, nonce uint64, context interface{}) {
digMsg := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Nonce: 0,
Content: &proto.GossipMessage_DataDig{
DataDig: &proto.DataDigest{
MsgType: p.config.MsgType,
Nonce: nonce,
Digests: util.StringsToBytes(digest),
},
},
}
remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
if p.logger.IsEnabledFor(zapcore.DebugLevel) {
p.logger.Debug("Sending", p.config.MsgType, "digest:", digMsg.GetDataDig().FormattedDigests(), "to", remotePeer)
}
context.(proto.ReceivedMessage).Respond(digMsg)
}
- 不会再生成新的Nonce,这里会沿用之前的nonce。
处理
func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) {
if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {
return
}
engine.lock.Lock()
defer engine.lock.Unlock()
for _, n := range digest {
if engine.state.Exists(n) {
continue
}
if _, exists := engine.item2owners[n]; !exists {
engine.item2owners[n] = make([]string, 0)
}
engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce])
}
}
- 基本就是统计本地没有的block序号,将序号所属的peer节点记录下来,还记得发hello的时候的nonces2peers么,当时记录了hello是发给哪个节点的。
- 这里需要注意的是item2owners,这里会最终组成一组映射,拥有某个block的peer列表
小结
好了,至此,整个流程处理了一半,发起点也收到了节点反馈的Digest数据。知道了自己和对方的差异的部分,那么什么时候开始正式拉取数据呢?下面我们继续往下深入。
Request
发起点
func (engine *PullEngine) initiatePull() {
...
time.AfterFunc(engine.digestWaitTime, func() {
engine.processIncomingDigests()
})
}
- 还记得这部分代码么?就是hello的超时处理的部分,很明显了,Request的发起点就是等待digestWaitTime超时。
func (engine *PullEngine) processIncomingDigests() {
engine.ignoreDigests()
engine.lock.Lock()
defer engine.lock.Unlock()
requestMapping := make(map[string][]string)
for n, sources := range engine.item2owners {
// select a random source
source := sources[util.RandomInt(len(sources))]
if _, exists := requestMapping[source]; !exists {
requestMapping[source] = make([]string, 0)
}
// append the number to that source
requestMapping[source] = append(requestMapping[source], n)
}
engine.acceptResponses()
for dest, seqsToReq := range requestMapping {
engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
}
time.AfterFunc(engine.responseWaitTime, engine.endPull)
}
- ignoreDigests表示hello-digest结束,开始request-response流程
- 统计item2owners,计算本地没有的block部分,准备去拥有方去拉取
请求
func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
req := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Nonce: 0,
Content: &proto.GossipMessage_DataReq{
DataReq: &proto.DataRequest{
MsgType: p.config.MsgType,
Nonce: nonce,
Digests: util.StringsToBytes(items),
},
},
}
if p.logger.IsEnabledFor(zapcore.DebugLevel) {
p.logger.Debug("Sending", req.GetDataReq().FormattedDigests(), "to", dest)
}
sMsg, err := req.NoopSign()
if err != nil {
p.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
return
}
p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
只需要注意nonce,其他没什么好讲的。说明整个过程双方都会密切关心这个流程的状态。
处理
func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
if !engine.incomingNONCES.Exists(nonce) {
return
}
engine.lock.Lock()
defer engine.lock.Unlock()
filter := engine.digFilter(context)
var items2Send []string
for _, item := range items {
if engine.state.Exists(item) && filter(item) {
items2Send = append(items2Send, item)
}
}
if len(items2Send) == 0 {
return
}
go engine.SendRes(items2Send, context, nonce)
}
- 基本上就是拿到对方发来的请求,来满足对方的需求,马上来看看Response发送会组装些什么?
Response
请求
func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) {
items2return := []*proto.Envelope{}
p.RLock()
defer p.RUnlock()
for _, item := range items {
if msg, exists := p.itemID2Msg[item]; exists {
items2return = append(items2return, msg.Envelope)
}
}
returnedUpdate := &proto.GossipMessage{
Channel: p.config.Channel,
Tag: p.config.Tag,
Nonce: 0,
Content: &proto.GossipMessage_DataUpdate{
DataUpdate: &proto.DataUpdate{
MsgType: p.config.MsgType,
Nonce: nonce,
Data: items2return,
},
},
}
remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
p.logger.Debug("Sending", len(returnedUpdate.GetDataUpdate().Data), p.config.MsgType, "items to", remotePeer)
context.(proto.ReceivedMessage).Respond(returnedUpdate)
}
- itemID2Msg,这里暂存了收到的block
- 所以这里就是根据收到block序号列表,然后组装items2return返回给对方。
处理
if res := msg.GetDataUpdate(); res != nil {
itemIDs = make([]string, len(res.Data))
items = make([]*proto.SignedGossipMessage, len(res.Data))
pullMsgType = ResponseMsgType
for i, pulledMsg := range res.Data {
msg, err := pulledMsg.ToGossipMessage()
if err != nil {
p.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
return
}
p.MsgCons(msg)
itemIDs[i] = p.IdExtractor(msg)
items[i] = msg
p.Lock()
p.itemID2Msg[itemIDs[i]] = msg
p.Unlock()
}
p.engine.OnRes(itemIDs, res.Nonce)
}
- MsgCons(msg)这里会将拉取的block push到payloads里面,等待commit到本地,并同步最新的height给好朋友们。至于怎么到payloads的,细节这里就不提了,有兴趣的可以自己研究。
- 剩下的就是为了Pull机制来服务的,将收到的数据进行下次pull的准备。加入本地state和itemID2Msg,什么的,都跟前面的做一些呼应。
补充
下面我们看下gossip里面是怎么使用这个PullEngine的,总的来说分为两块,一是block,前面也提到,另一个就是cert。
BlockPuller
func (gc *gossipChannel) createBlockPuller() pull.Mediator {
conf := pull.Config{
MsgType: proto.PullMsgType_BLOCK_MSG,
Channel: []byte(gc.chainID),
ID: gc.GetConf().ID,
PeerCountToSelect: gc.GetConf().PullPeerNum,
PullInterval: gc.GetConf().PullInterval,
Tag: proto.GossipMessage_CHAN_AND_ORG,
}
seqNumFromMsg := func(msg *proto.SignedGossipMessage) string {
dataMsg := msg.GetDataMsg()
if dataMsg == nil || dataMsg.Payload == nil {
gc.logger.Warning("Non-data block or with no payload")
return ""
}
return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
}
adapter := &pull.PullAdapter{
Sndr: gc,
MemSvc: gc.memFilter,
IdExtractor: seqNumFromMsg,
MsgCons: func(msg *proto.SignedGossipMessage) {
gc.DeMultiplex(msg)
},
}
adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
gc.RLock()
height := gc.ledgerHeight
gc.RUnlock()
digests := digestMsg.Digests
digestMsg.Digests = nil
for i := range digests {
seqNum, err := strconv.ParseUint(string(digests[i]), 10, 64)
if err != nil {
gc.logger.Warningf("Can't parse digest %s : %+v", digests[i], errors.WithStack(err))
continue
}
if seqNum >= height {
digestMsg.Digests = append(digestMsg.Digests, digests[i])
}
}
return digestMsg
}
return pull.NewPullMediator(conf, adapter)
}
- 这里决定了首先接收的是PullMsgType_BLOCK_MSG类型消息
- 然后会在DataMessage消息中提取SeqNum,也就是blocknum
- 过滤掉收到消息的height低于本地账本的部分,没有意义啦,既然本地已经有了。
- 这里的MsgCons也就是消息的comsumer,最终会将收到的block提交到本地账本
- 之后就是走engine的标准流程了
CertPuller
func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
conf := pull.Config{
MsgType: proto.PullMsgType_IDENTITY_MSG,
Channel: []byte(""),
ID: g.conf.InternalEndpoint,
PeerCountToSelect: g.conf.PullPeerNum,
PullInterval: g.conf.PullInterval,
Tag: proto.GossipMessage_EMPTY,
}
pkiIDFromMsg := func(msg *proto.SignedGossipMessage) string {
identityMsg := msg.GetPeerIdentity()
if identityMsg == nil || identityMsg.PkiId == nil {
return ""
}
return fmt.Sprintf("%s", string(identityMsg.PkiId))
}
certConsumer := func(msg *proto.SignedGossipMessage) {
idMsg := msg.GetPeerIdentity()
if idMsg == nil || idMsg.Cert == nil || idMsg.PkiId == nil {
g.logger.Warning("Invalid PeerIdentity:", idMsg)
return
}
err := g.idMapper.Put(common.PKIidType(idMsg.PkiId), api.PeerIdentityType(idMsg.Cert))
if err != nil {
g.logger.Warningf("Failed associating PKI-ID with certificate: %+v", errors.WithStack(err))
}
g.logger.Debug("Learned of a new certificate:", idMsg.Cert)
}
adapter := &pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
EgressDigFilter: g.sameOrgOrOurOrgPullFilter,
}
return pull.NewPullMediator(conf, adapter)
}
- 这里决定了接收的是PullMsgType_IDENTITY_MSG类型消息
- 然后会在PeerIdentity消息中提取PkiId
- certConsumer这里是消息的订阅者,最终会将收到的消息存入idMapper
- 之后就是走engine的标准流程了
总结
- 至此,基本上你能区别RemoteStateRequest和DataRequest的区别。DataRequest是用来给本地的block进行查漏补缺,而RemoteStateRequest是block同步的主力,主要是跟leader的最新的block保持一致。
- PullEngine不光是用来同步block,也用来同步peer节点的证书。分别叫BlockPullEngine和CertPullEngine。