Hyperledger-Fabric源码分析(Gossip-Discovery)

简单来说,Gossip的Discovery是用来维持成员节点的状态的,是死是活全由他来决定。下面我们具体分析下。

初始化

func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoService, disPol DisclosurePolicy) Discovery {
   d := &gossipDiscoveryImpl{
      self:             self,
      incTime:          uint64(time.Now().UnixNano()),
      seqNum:           uint64(0),
      deadLastTS:       make(map[string]*timestamp),
      aliveLastTS:      make(map[string]*timestamp),
      id2Member:        make(map[string]*NetworkMember),
      aliveMembership:  util.NewMembershipStore(),
      deadMembership:   util.NewMembershipStore(),
      crypt:            crypt,
      comm:             comm,
      lock:             &sync.RWMutex{},
      toDieChan:        make(chan struct{}, 1),
      toDieFlag:        int32(0),
      logger:           util.GetLogger(util.DiscoveryLogger, self.InternalEndpoint),
      disclosurePolicy: disPol,
      pubsub:           util.NewPubSub(),

      aliveTimeInterval:            getAliveTimeInterval(),
      aliveExpirationTimeout:       getAliveExpirationTimeout(),
      aliveExpirationCheckInterval: getAliveExpirationCheckInterval(),
      reconnectInterval:            getReconnectInterval(),
   }

   d.validateSelfConfig()
   d.msgStore = newAliveMsgStore(d)

   go d.periodicalSendAlive()
   go d.periodicalCheckAlive()
   go d.handleMessages()
   go d.periodicalReconnectToDead()
   go d.handlePresumedDeadPeers()

   return d
}

基本上搞清楚启动的时候会做什么,那么整个模块大体上你就知道怎么实现的了。

  • periodicalSendAlive
  • periodicalCheckAlive
  • handleMessages
  • periodicalReconnectToDead
  • handlePresumedDeadPeers

periodicalSendAlive

func (d *gossipDiscoveryImpl) periodicalSendAlive() {
   defer d.logger.Debug("Stopped")

   for !d.toDie() {
      d.logger.Debug("Sleeping", d.aliveTimeInterval)
      time.Sleep(d.aliveTimeInterval)
      msg, err := d.createSignedAliveMessage(true)
      if err != nil {
         d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
         return
      }
      d.lock.Lock()
      d.selfAliveMessage = msg
      d.lock.Unlock()
      d.comm.Gossip(msg)
   }
}
  • 第一印象就是每隔一段时间会将自己的Alive状态跟好朋友分享。

periodicalCheckAlive

func (d *gossipDiscoveryImpl) periodicalCheckAlive() {
   defer d.logger.Debug("Stopped")

   for !d.toDie() {
      time.Sleep(d.aliveExpirationCheckInterval)
      dead := d.getDeadMembers()
      if len(dead) > 0 {
         d.logger.Debugf("Got %v dead members: %v", len(dead), dead)
         d.expireDeadMembers(dead)
      }
   }
}
  • 看aliveLastTS列表中的最后的访问时间截止到当前是否已经超时,如果是,挑出来这些超时的
  • 就是将aliveLastTS中过期的加到deadLastTS里面
  • 将aliveMembership中过期的加到deadMembership里面
  • 关闭这些过期节点的grpc通讯

periodicalReconnectToDead

func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
   defer d.logger.Debug("Stopped")

   for !d.toDie() {
      wg := &sync.WaitGroup{}

      for _, member := range d.copyLastSeen(d.deadLastTS) {
         wg.Add(1)
         go func(member NetworkMember) {
            defer wg.Done()
            if d.comm.Ping(&member) {
               d.logger.Debug(member, "is responding, sending membership request")
               d.sendMembershipRequest(&member, true)
            } else {
               d.logger.Debug(member, "is still dead")
            }
         }(member)
      }

      wg.Wait()
      d.logger.Debug("Sleeping", d.reconnectInterval)
      time.Sleep(d.reconnectInterval)
   }
}
  • 每隔一段时间遍历deadLastTS列表,如果能Ping通,给他们发GossipMessage_MemReq请求。该请求就是去拉取成员列表。

handlePresumedDeadPeers

func (d *gossipDiscoveryImpl) handlePresumedDeadPeers() {
   defer d.logger.Debug("Stopped")

   for !d.toDie() {
      select {
      case deadPeer := <-d.comm.PresumedDead():
         if d.isAlive(deadPeer) {
            d.expireDeadMembers([]common.PKIidType{deadPeer})
         }
      case s := <-d.toDieChan:
         d.toDieChan <- s
         return
      }
   }
}
  • 这里主要是处理底层comm模块的send时,如果报错,会将该节点归类到PresumedDead列表,这里会进行扫尾工作。
  • 如果这个节点当前是Alive的,那把Alive转为Dead

handleMessages

func (d *gossipDiscoveryImpl) handleMessages() {
   defer d.logger.Debug("Stopped")

   in := d.comm.Accept()
   for !d.toDie() {
      select {
      case s := <-d.toDieChan:
         d.toDieChan <- s
         return
      case m := <-in:
         d.handleMsgFromComm(m)
      }
   }
}
  • 当然了,这里是用来处理Discovery消息。我们分三种类型来分析下,主要是做什么。

MembershipRequest

if memReq := m.GetMemReq(); memReq != nil {
   selfInfoGossipMsg, err := memReq.SelfInformation.ToGossipMessage()
   if err != nil {
      d.logger.Warningf("Failed deserializing GossipMessage from envelope: %+v", errors.WithStack(err))
      return
   }

   if !d.crypt.ValidateAliveMsg(selfInfoGossipMsg) {
      return
   }

   if d.msgStore.CheckValid(selfInfoGossipMsg) {
      d.handleAliveMessage(selfInfoGossipMsg)
   }

   var internalEndpoint string
   if m.Envelope.SecretEnvelope != nil {
      internalEndpoint = m.Envelope.SecretEnvelope.InternalEndpoint()
   }

   // Sending a membership response to a peer may block this routine
   // in case the sending is deliberately slow (i.e attack).
   // will keep this async until I'll write a timeout detector in the comm layer
   go d.sendMemResponse(selfInfoGossipMsg.GetAliveMsg().Membership, internalEndpoint, m.Nonce)
   return
}
  • ValidateAliveMsg会根据对方发来的MembershipRequest中关于自身的AliveMsg做两件事,一是保存该节点identity到idMapper,二是用identity来验证消息签名。如果第一次交流,会直接保存下来,以后同节点发来的消息就用这个身份来验签,也是为了安全的考虑,避免中间人攻击。
  • CheckValid会校验该AliveMsg跟本地相比是否比较新,如果是,那么开始handleAliveMessage,这个后面再讲。
  • 最后再sendMemResponse

handleAliveMessage

func (d *gossipDiscoveryImpl) handleAliveMessage(m *proto.SignedGossipMessage) {
   d.logger.Debug("Entering", m)
   defer d.logger.Debug("Exiting")

   if d.isSentByMe(m) {
      return
   }

   pkiID := m.GetAliveMsg().Membership.PkiId

   ts := m.GetAliveMsg().Timestamp

   d.lock.RLock()
   _, known := d.id2Member[string(pkiID)]
   d.lock.RUnlock()

   if !known {
      d.learnNewMembers([]*proto.SignedGossipMessage{m}, []*proto.SignedGossipMessage{})
      return
   }

   d.lock.RLock()
   _, isAlive := d.aliveLastTS[string(pkiID)]
   lastDeadTS, isDead := d.deadLastTS[string(pkiID)]
   d.lock.RUnlock()

   if !isAlive && !isDead {
      d.logger.Panicf("Member %s is known but not found neither in alive nor in dead lastTS maps, isAlive=%v, isDead=%v", m.GetAliveMsg().Membership.Endpoint, isAlive, isDead)
      return
   }

   if isAlive && isDead {
      d.logger.Panicf("Member %s is both alive and dead at the same time", m.GetAliveMsg().Membership)
      return
   }

   if isDead {
      if before(lastDeadTS, ts) {
         // resurrect peer
         d.resurrectMember(m, *ts)
      } else if !same(lastDeadTS, ts) {
         d.logger.Debug(m.GetAliveMsg().Membership, "lastDeadTS:", lastDeadTS, "but got ts:", ts)
      }
      return
   }

   d.lock.RLock()
   lastAliveTS, isAlive := d.aliveLastTS[string(pkiID)]
   d.lock.RUnlock()

   if isAlive {
      if before(lastAliveTS, ts) {
         d.learnExistingMembers([]*proto.SignedGossipMessage{m})
      } else if !same(lastAliveTS, ts) {
         d.logger.Debug(m.GetAliveMsg().Membership, "lastAliveTS:", lastAliveTS, "but got ts:", ts)
      }

   }
   // else, ignore the message because it is too old
}
  • 如果id2Member没有,说明新来的Alive节点不认识,当然要learnNewMembers
  • 去alive和dead列表里面看看没有这个节点。同时有或同时没有,都不对。
    • 如果在dead列表里面,而且收到的alive消息的时间戳要比这里新,说明什么?说明这个节点活了,无非就是更新id2Member,然后从dead里面移除,然后put到alive列表啦
    • 如果alive列表里面有,那么说明有更新,需要learnExistingMembers

sendMemResponse

func (d *gossipDiscoveryImpl) sendMemResponse(targetMember *proto.Member, internalEndpoint string, nonce uint64) {
   d.logger.Debug("Entering", targetMember)

   targetPeer := &NetworkMember{
      Endpoint:         targetMember.Endpoint,
      Metadata:         targetMember.Metadata,
      PKIid:            targetMember.PkiId,
      InternalEndpoint: internalEndpoint,
   }

   var aliveMsg *proto.SignedGossipMessage
   var err error
   d.lock.RLock()
   aliveMsg = d.selfAliveMessage
   d.lock.RUnlock()
   if aliveMsg == nil {
      aliveMsg, err = d.createSignedAliveMessage(true)
      if err != nil {
         d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
         return
      }
   }
   memResp := d.createMembershipResponse(aliveMsg, targetPeer)
   if memResp == nil {
      errMsg := `Got a membership request from a peer that shouldn't have sent one: %v, closing connection to the peer as a result.`
      d.logger.Warningf(errMsg, targetMember)
      d.comm.CloseConn(targetPeer)
      return
   }

   defer d.logger.Debug("Exiting, replying with", memResp)

   msg, err := (&proto.GossipMessage{
      Tag:   proto.GossipMessage_EMPTY,
      Nonce: nonce,
      Content: &proto.GossipMessage_MemRes{
         MemRes: memResp,
      },
   }).NoopSign()
   if err != nil {
      err = errors.WithStack(err)
      d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   d.comm.SendToPeer(targetPeer, msg)
}

主要做三件事情。

  • 将自己包装成AliveMsg
  • 打包自己所知的Alive和Dead的成员列表
  • 一起打包成MembershipResponse发给对方。

AliveMsg

if m.IsAliveMsg() {
   if !d.msgStore.CheckValid(m) || !d.crypt.ValidateAliveMsg(m) {
      return
   }
   // If the message was sent by me, ignore it and don't forward it further
   if d.isSentByMe(m) {
      return
   }

   d.msgStore.Add(m)
   d.handleAliveMessage(m)
   d.comm.Forward(msg)
   return
}
  • 看着很简单,首先是要add到本地的alivemsgstore。
  • handleAliveMessage前面讲过了,开始处理alivemsg
  • 好东西要跟好朋友分享,处理完就转发给其他人,更有利与状态同步

MembershipResponse

if memResp := m.GetMemRes(); memResp != nil {
   d.pubsub.Publish(fmt.Sprintf("%d", m.Nonce), m.Nonce)
   for _, env := range memResp.Alive {
      am, err := env.ToGossipMessage()
      if err != nil {
         d.logger.Warningf("Membership response contains an invalid message from an online peer:%+v", errors.WithStack(err))
         return
      }
      if !am.IsAliveMsg() {
         d.logger.Warning("Expected alive message, got", am, "instead")
         return
      }

      if d.msgStore.CheckValid(am) && d.crypt.ValidateAliveMsg(am) {
         d.handleAliveMessage(am)
      }
   }

   for _, env := range memResp.Dead {
      dm, err := env.ToGossipMessage()
      if err != nil {
         d.logger.Warningf("Membership response contains an invalid message from an offline peer %+v", errors.WithStack(err))
         return
      }

      // Newer alive message exists or the message isn't authentic
      if !d.msgStore.CheckValid(dm) || !d.crypt.ValidateAliveMsg(dm) {
         continue
      }

      newDeadMembers := []*proto.SignedGossipMessage{}
      d.lock.RLock()
      if _, known := d.id2Member[string(dm.GetAliveMsg().Membership.PkiId)]; !known {
         newDeadMembers = append(newDeadMembers, dm)
      }
      d.lock.RUnlock()
      d.learnNewMembers([]*proto.SignedGossipMessage{}, newDeadMembers)
   }
}
  • 这里就是处理对方发来的alive和dead列表了。
  • alive的话,当然是handleAliveMessage
  • dead的话,如果id2Member没有,需要去learnNewMembers

小结

  • 基本就是在alive和dead列表中流转,一方面自己有超时的策略来轮询alive列表,二是定时轮询dead列表来尽可能复活节点,三是定时对外公布自己的状态。

发起点

那么问题来了,什么情况会发起MembershipRequest呢?基本上发起点都集中在peer节点启动的时候。

InitiateSync

func (d *gossipDiscoveryImpl) InitiateSync(peerNum int) {
   if d.toDie() {
      return
   }
   var peers2SendTo []*NetworkMember
   m, err := d.createMembershipRequest(true)
   if err != nil {
      d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err))
      return
   }
   memReq, err := m.NoopSign()
   if err != nil {
      d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
      return
   }
   d.lock.RLock()

   n := d.aliveMembership.Size()
   k := peerNum
   if k > n {
      k = n
   }

   aliveMembersAsSlice := d.aliveMembership.ToSlice()
   for _, i := range util.GetRandomIndices(k, n-1) {
      pulledPeer := aliveMembersAsSlice[i].GetAliveMsg().Membership
      var internalEndpoint string
      if aliveMembersAsSlice[i].Envelope.SecretEnvelope != nil {
         internalEndpoint = aliveMembersAsSlice[i].Envelope.SecretEnvelope.InternalEndpoint()
      }
      netMember := &NetworkMember{
         Endpoint:         pulledPeer.Endpoint,
         Metadata:         pulledPeer.Metadata,
         PKIid:            pulledPeer.PkiId,
         InternalEndpoint: internalEndpoint,
      }
      peers2SendTo = append(peers2SendTo, netMember)
   }

   d.lock.RUnlock()

   for _, netMember := range peers2SendTo {
      d.comm.SendToPeer(netMember, memReq)
   }
}
  • 这里逻辑很简单,当peer节点启动的时候,会调用这里,然后随机选取n个节点发送MembershipRequest去拉取成员列表

Connect

func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
   for _, endpoint := range []string{member.InternalEndpoint, member.Endpoint} {
      if d.isMyOwnEndpoint(endpoint) {
         d.logger.Debug("Skipping connecting to myself")
         return
      }
   }

   d.logger.Debug("Entering", member)
   defer d.logger.Debug("Exiting")
   go func() {
      for i := 0; i < maxConnectionAttempts && !d.toDie(); i++ {
         id, err := id()
         if err != nil {
            if d.toDie() {
               return
            }
            d.logger.Warningf("Could not connect to %v : %v", member, err)
            time.Sleep(d.reconnectInterval)
            continue
         }
         peer := &NetworkMember{
            InternalEndpoint: member.InternalEndpoint,
            Endpoint:         member.Endpoint,
            PKIid:            id.ID,
         }
         m, err := d.createMembershipRequest(id.SelfOrg)
         if err != nil {
            d.logger.Warningf("Failed creating membership request: %+v", errors.WithStack(err))
            continue
         }
         req, err := m.NoopSign()
         if err != nil {
            d.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
            continue
         }
         req.Nonce = util.RandomUInt64()
         req, err = req.NoopSign()
         if err != nil {
            d.logger.Warningf("Failed adding NONCE to SignedGossipMessage %+v", errors.WithStack(err))
            continue
         }
         go d.sendUntilAcked(peer, req)
         return
      }

   }()
}
  • 在peer启动的时候会去连接peer.gossip.bootstrap来发起MembershipRequest,去拉取成员列表。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容