Hyperledger-Fabric源码分析(Gossip-PullEngine)

/* PullEngine is an object that performs pull-based gossip, and maintains an internal state of items
   identified by string numbers.
   The protocol is as follows:
   1) The Initiator sends a Hello message with a specific NONCE to a set of remote peers.
   2) Each remote peer responds with a digest of its messages and returns that NONCE.
   3) The initiator checks the validity of the NONCEs received, aggregates the digests,
      and crafts a request containing specific item ids it wants to receive from each remote peer and then
      sends each request to its corresponding peer.
   4) Each peer sends back the response containing the items requested, if it still holds them and the NONCE.

    Other peer                          Initiator
    O <-------- Hello <NONCE> -------------------------  O
   /|\    --------- Digest <[3,5,8, 10...], NONCE> -------->     /|\
    | <-------- Request <[3,8], NONCE> -----------------      |
   / \    --------- Response <[item3, item8], NONCE>------->     / \

*/
  • 先看下官方的解释好了。
  • 还记得之前的RemoteStateRequest篇么?那个似乎也是去拉取block啊,这里似乎也是啊,下面我们具体看下,为什么要有这两种机制,他们所面对的场景是什么。

Hello

发起点

func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, df DigestFilter) *PullEngine {
    engine := &PullEngine{
        PullAdapter:        participant,
        stopFlag:           int32(0),
        state:              util.NewSet(),
        item2owners:        make(map[string][]string),
        peers2nonces:       make(map[string]uint64),
        nonces2peers:       make(map[uint64]string),
        acceptingDigests:   int32(0),
        acceptingResponses: int32(0),
        incomingNONCES:     util.NewSet(),
        outgoingNONCES:     util.NewSet(),
        digFilter:          df,
        digestWaitTime:     util.GetDurationOrDefault("peer.gossip.digestWaitTime", defDigestWaitTime),
        requestWaitTime:    util.GetDurationOrDefault("peer.gossip.requestWaitTime", defRequestWaitTime),
        responseWaitTime:   util.GetDurationOrDefault("peer.gossip.responseWaitTime", defResponseWaitTime),
    }

    go func() {
        for !engine.toDie() {
            time.Sleep(sleepTime)
            if engine.toDie() {
                return
            }
            engine.initiatePull()
        }
    }()

    return engine
}

func (engine *PullEngine) initiatePull() {
   engine.lock.Lock()
   defer engine.lock.Unlock()

   engine.acceptDigests()
   for _, peer := range engine.SelectPeers() {
      nonce := engine.newNONCE()
      engine.outgoingNONCES.Add(nonce)
      engine.nonces2peers[nonce] = peer
      engine.peers2nonces[peer] = nonce
      engine.Hello(peer, nonce)
   }

   time.AfterFunc(engine.digestWaitTime, func() {
      engine.processIncomingDigests()
   })
}
  • acceptDigests表示正式开始一次基于hello-digest流程,串行执行。
  • 当PullEngine启动的时候每隔一定时间会随机选取节点发起Hello请求
  • 这里需要注意的是,会记录Hello请求的Nonce和peer节点的关系。应该是后面会以这个来跟踪执行情况。
  • 发出的Hello会记录到outgoingNONCES里面
  • 超时的情况,之后再讲

请求

func (p *pullMediatorImpl) Hello(dest string, nonce uint64) {
   helloMsg := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Content: &proto.GossipMessage_Hello{
         Hello: &proto.GossipHello{
            Nonce:    nonce,
            Metadata: nil,
            MsgType:  p.config.MsgType,
         },
      },
   }

   p.logger.Debug("Sending", p.config.MsgType, "hello to", dest)
   sMsg, err := helloMsg.NoopSign()
   if err != nil {
      p.logger.Errorf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}
  • 没什么好讲的,注意下nonce就好。下面看下对方收到hello怎么处理。

处理

func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
   engine.incomingNONCES.Add(nonce)

   time.AfterFunc(engine.requestWaitTime, func() {
      engine.incomingNONCES.Remove(nonce)
   })

   a := engine.state.ToArray()
   var digest []string
   filter := engine.digFilter(context)
   for _, item := range a {
      dig := item.(string)
      if !filter(dig) {
         continue
      }
      digest = append(digest, dig)
   }
   if len(digest) == 0 {
      return
   }
   engine.SendDigest(digest, nonce, context)
}
  • 这里也是一样会记录收到的hello请求的nonce到本地incomingNONCES
  • 将本地的state里面的blocknums序号集组装成Digest返回,这里以blockpullengine为例,每次收到block消息时候,都会将block序号冗余到engine的state里面。
  • 下面我们看下Digest的情况

Digest

请求

func (p *pullMediatorImpl) SendDigest(digest []string, nonce uint64, context interface{}) {
   digMsg := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Nonce:   0,
      Content: &proto.GossipMessage_DataDig{
         DataDig: &proto.DataDigest{
            MsgType: p.config.MsgType,
            Nonce:   nonce,
            Digests: util.StringsToBytes(digest),
         },
      },
   }
   remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
   if p.logger.IsEnabledFor(zapcore.DebugLevel) {
      p.logger.Debug("Sending", p.config.MsgType, "digest:", digMsg.GetDataDig().FormattedDigests(), "to", remotePeer)
   }

   context.(proto.ReceivedMessage).Respond(digMsg)
}
  • 不会再生成新的Nonce,这里会沿用之前的nonce。

处理

func (engine *PullEngine) OnDigest(digest []string, nonce uint64, context interface{}) {
   if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {
      return
   }

   engine.lock.Lock()
   defer engine.lock.Unlock()

   for _, n := range digest {
      if engine.state.Exists(n) {
         continue
      }

      if _, exists := engine.item2owners[n]; !exists {
         engine.item2owners[n] = make([]string, 0)
      }

      engine.item2owners[n] = append(engine.item2owners[n], engine.nonces2peers[nonce])
   }
}
  • 基本就是统计本地没有的block序号,将序号所属的peer节点记录下来,还记得发hello的时候的nonces2peers么,当时记录了hello是发给哪个节点的。
  • 这里需要注意的是item2owners,这里会最终组成一组映射,拥有某个block的peer列表

小结

好了,至此,整个流程处理了一半,发起点也收到了节点反馈的Digest数据。知道了自己和对方的差异的部分,那么什么时候开始正式拉取数据呢?下面我们继续往下深入。

Request

发起点

func (engine *PullEngine) initiatePull() {
   ...

   time.AfterFunc(engine.digestWaitTime, func() {
      engine.processIncomingDigests()
   })
}
  • 还记得这部分代码么?就是hello的超时处理的部分,很明显了,Request的发起点就是等待digestWaitTime超时。
func (engine *PullEngine) processIncomingDigests() {
   engine.ignoreDigests()

   engine.lock.Lock()
   defer engine.lock.Unlock()

   requestMapping := make(map[string][]string)
   for n, sources := range engine.item2owners {
      // select a random source
      source := sources[util.RandomInt(len(sources))]
      if _, exists := requestMapping[source]; !exists {
         requestMapping[source] = make([]string, 0)
      }
      // append the number to that source
      requestMapping[source] = append(requestMapping[source], n)
   }

   engine.acceptResponses()

   for dest, seqsToReq := range requestMapping {
      engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
   }

   time.AfterFunc(engine.responseWaitTime, engine.endPull)
}
  • ignoreDigests表示hello-digest结束,开始request-response流程
  • 统计item2owners,计算本地没有的block部分,准备去拥有方去拉取

请求

func (p *pullMediatorImpl) SendReq(dest string, items []string, nonce uint64) {
   req := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Nonce:   0,
      Content: &proto.GossipMessage_DataReq{
         DataReq: &proto.DataRequest{
            MsgType: p.config.MsgType,
            Nonce:   nonce,
            Digests: util.StringsToBytes(items),
         },
      },
   }
   if p.logger.IsEnabledFor(zapcore.DebugLevel) {
      p.logger.Debug("Sending", req.GetDataReq().FormattedDigests(), "to", dest)
   }
   sMsg, err := req.NoopSign()
   if err != nil {
      p.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   p.Sndr.Send(sMsg, p.peersWithEndpoints(dest)...)
}

只需要注意nonce,其他没什么好讲的。说明整个过程双方都会密切关心这个流程的状态。

处理

func (engine *PullEngine) OnReq(items []string, nonce uint64, context interface{}) {
   if !engine.incomingNONCES.Exists(nonce) {
      return
   }
   engine.lock.Lock()
   defer engine.lock.Unlock()

   filter := engine.digFilter(context)
   var items2Send []string
   for _, item := range items {
      if engine.state.Exists(item) && filter(item) {
         items2Send = append(items2Send, item)
      }
   }

   if len(items2Send) == 0 {
      return
   }

   go engine.SendRes(items2Send, context, nonce)
}
  • 基本上就是拿到对方发来的请求,来满足对方的需求,马上来看看Response发送会组装些什么?

Response

请求

func (p *pullMediatorImpl) SendRes(items []string, context interface{}, nonce uint64) {
   items2return := []*proto.Envelope{}
   p.RLock()
   defer p.RUnlock()
   for _, item := range items {
      if msg, exists := p.itemID2Msg[item]; exists {
         items2return = append(items2return, msg.Envelope)
      }
   }
   returnedUpdate := &proto.GossipMessage{
      Channel: p.config.Channel,
      Tag:     p.config.Tag,
      Nonce:   0,
      Content: &proto.GossipMessage_DataUpdate{
         DataUpdate: &proto.DataUpdate{
            MsgType: p.config.MsgType,
            Nonce:   nonce,
            Data:    items2return,
         },
      },
   }
   remotePeer := context.(proto.ReceivedMessage).GetConnectionInfo()
   p.logger.Debug("Sending", len(returnedUpdate.GetDataUpdate().Data), p.config.MsgType, "items to", remotePeer)
   context.(proto.ReceivedMessage).Respond(returnedUpdate)
}
  • itemID2Msg,这里暂存了收到的block
  • 所以这里就是根据收到block序号列表,然后组装items2return返回给对方。

处理

if res := msg.GetDataUpdate(); res != nil {
   itemIDs = make([]string, len(res.Data))
   items = make([]*proto.SignedGossipMessage, len(res.Data))
   pullMsgType = ResponseMsgType
   for i, pulledMsg := range res.Data {
      msg, err := pulledMsg.ToGossipMessage()
      if err != nil {
         p.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
         return
      }
      p.MsgCons(msg)
      itemIDs[i] = p.IdExtractor(msg)
      items[i] = msg
      p.Lock()
      p.itemID2Msg[itemIDs[i]] = msg
      p.Unlock()
   }
   p.engine.OnRes(itemIDs, res.Nonce)
}
  • MsgCons(msg)这里会将拉取的block push到payloads里面,等待commit到本地,并同步最新的height给好朋友们。至于怎么到payloads的,细节这里就不提了,有兴趣的可以自己研究。
  • 剩下的就是为了Pull机制来服务的,将收到的数据进行下次pull的准备。加入本地state和itemID2Msg,什么的,都跟前面的做一些呼应。

补充

下面我们看下gossip里面是怎么使用这个PullEngine的,总的来说分为两块,一是block,前面也提到,另一个就是cert。

BlockPuller

func (gc *gossipChannel) createBlockPuller() pull.Mediator {
   conf := pull.Config{
      MsgType:           proto.PullMsgType_BLOCK_MSG,
      Channel:           []byte(gc.chainID),
      ID:                gc.GetConf().ID,
      PeerCountToSelect: gc.GetConf().PullPeerNum,
      PullInterval:      gc.GetConf().PullInterval,
      Tag:               proto.GossipMessage_CHAN_AND_ORG,
   }
   seqNumFromMsg := func(msg *proto.SignedGossipMessage) string {
      dataMsg := msg.GetDataMsg()
      if dataMsg == nil || dataMsg.Payload == nil {
         gc.logger.Warning("Non-data block or with no payload")
         return ""
      }
      return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
   }
   adapter := &pull.PullAdapter{
      Sndr:        gc,
      MemSvc:      gc.memFilter,
      IdExtractor: seqNumFromMsg,
      MsgCons: func(msg *proto.SignedGossipMessage) {
         gc.DeMultiplex(msg)
      },
   }

   adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
      gc.RLock()
      height := gc.ledgerHeight
      gc.RUnlock()
      digests := digestMsg.Digests
      digestMsg.Digests = nil
      for i := range digests {
         seqNum, err := strconv.ParseUint(string(digests[i]), 10, 64)
         if err != nil {
            gc.logger.Warningf("Can't parse digest %s : %+v", digests[i], errors.WithStack(err))
            continue
         }
         if seqNum >= height {
            digestMsg.Digests = append(digestMsg.Digests, digests[i])
         }

      }
      return digestMsg
   }

   return pull.NewPullMediator(conf, adapter)
}
  • 这里决定了首先接收的是PullMsgType_BLOCK_MSG类型消息
  • 然后会在DataMessage消息中提取SeqNum,也就是blocknum
  • 过滤掉收到消息的height低于本地账本的部分,没有意义啦,既然本地已经有了。
  • 这里的MsgCons也就是消息的comsumer,最终会将收到的block提交到本地账本
  • 之后就是走engine的标准流程了

CertPuller

func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
   conf := pull.Config{
      MsgType:           proto.PullMsgType_IDENTITY_MSG,
      Channel:           []byte(""),
      ID:                g.conf.InternalEndpoint,
      PeerCountToSelect: g.conf.PullPeerNum,
      PullInterval:      g.conf.PullInterval,
      Tag:               proto.GossipMessage_EMPTY,
   }
   pkiIDFromMsg := func(msg *proto.SignedGossipMessage) string {
      identityMsg := msg.GetPeerIdentity()
      if identityMsg == nil || identityMsg.PkiId == nil {
         return ""
      }
      return fmt.Sprintf("%s", string(identityMsg.PkiId))
   }
   certConsumer := func(msg *proto.SignedGossipMessage) {
      idMsg := msg.GetPeerIdentity()
      if idMsg == nil || idMsg.Cert == nil || idMsg.PkiId == nil {
         g.logger.Warning("Invalid PeerIdentity:", idMsg)
         return
      }
      err := g.idMapper.Put(common.PKIidType(idMsg.PkiId), api.PeerIdentityType(idMsg.Cert))
      if err != nil {
         g.logger.Warningf("Failed associating PKI-ID with certificate: %+v", errors.WithStack(err))
      }
      g.logger.Debug("Learned of a new certificate:", idMsg.Cert)
   }
   adapter := &pull.PullAdapter{
      Sndr:            g.comm,
      MemSvc:          g.disc,
      IdExtractor:     pkiIDFromMsg,
      MsgCons:         certConsumer,
      EgressDigFilter: g.sameOrgOrOurOrgPullFilter,
   }
   return pull.NewPullMediator(conf, adapter)
}
  • 这里决定了接收的是PullMsgType_IDENTITY_MSG类型消息
  • 然后会在PeerIdentity消息中提取PkiId
  • certConsumer这里是消息的订阅者,最终会将收到的消息存入idMapper
  • 之后就是走engine的标准流程了

总结

  • 至此,基本上你能区别RemoteStateRequest和DataRequest的区别。DataRequest是用来给本地的block进行查漏补缺,而RemoteStateRequest是block同步的主力,主要是跟leader的最新的block保持一致。
  • PullEngine不光是用来同步block,也用来同步peer节点的证书。分别叫BlockPullEngine和CertPullEngine。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容