Hyperledger-Fabric源码分析(Gossip-StateInfo)

StateInfo是用来传播peer的状态信息给其他成员。

struct

type StateInfo struct {
   Timestamp *PeerTime 
   PkiId     []byte   
   // channel_MAC is an authentication code that proves
   // that the peer that sent this message knows
   // the name of the channel.
   Channel_MAC          []byte      
   Properties           *Properties 
}

type Properties struct {
    LedgerHeight         uint64       
    LeftChannel          bool         
    Chaincodes           []*Chaincode 
}

初始化

stateInfMsg := &proto.StateInfo{
        Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
        PkiId:       gc.pkiID,
        Timestamp: &proto.PeerTime{
            IncNum: gc.incTime,
            SeqNum: uint64(time.Now().UnixNano()),
        },
        Properties: &proto.Properties{
            LeftChannel:  leftChannel,
            LedgerHeight: ledgerHeight,
            Chaincodes:   chaincodes,
        },
    }
    m := &proto.GossipMessage{
        Nonce: 0,
        Tag:   proto.GossipMessage_CHAN_OR_ORG,
        Content: &proto.GossipMessage_StateInfo{
            StateInfo: stateInfMsg,
        },
    }
  • 可以看到stateinfo的组成
    • pkiid,你可以理解为peer的标识id,内部其实就是mspid+cert算出来的一个摘要。
    • Channel_MAC,pkiid+chainid生成MAC,背后就是sha256计算hash
    • 时间戳
    • 属性代表着三个触发stateinfo消息的地方
      • 该节点退出通道
      • 有新的block写入,更新peer的账本height
      • chaincode更新
        • 这个看起来不太好理解。我跟进了下,chaincode部署成功会触发这里。换句话说如果cc部署成功,是通过这个消息让成员知道的。

触发点

commitblock

func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData util.PvtDataCollections) error {

    // Commit block with available private transactions
    if err := s.ledger.StoreBlock(block, pvtData); err != nil {
        logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
        return err
    }

    // Update ledger height
    s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChainID(s.chainID))
    logger.Debugf("[%s] Committed block [%d] with %d transaction(s)",
        s.chainID, block.Header.Number, len(block.Data.Data))

    return nil
}
  • 前面就是提交block到账本了
  • 后来开始UpdateLedgerHeight,开始处理账本新的height

UpdateLedgerHeight

func (gc *gossipChannel) UpdateLedgerHeight(height uint64) {
    gc.Lock()
    defer gc.Unlock()

    var chaincodes []*proto.Chaincode
    var leftChannel bool
    if prevMsg := gc.stateInfoMsg; prevMsg != nil {
        leftChannel = prevMsg.GetStateInfo().Properties.LeftChannel
        chaincodes = prevMsg.GetStateInfo().Properties.Chaincodes
    }
    gc.updateProperties(height, chaincodes, leftChannel)
}
  • 因为只是更新height,所以其他的状态沿用之前的stateInfoMsg
  • 下面开始要广播前的最后准备工作了

updateStateInfo

func (gc *gossipChannel) updateStateInfo(msg *proto.SignedGossipMessage) {
   gc.stateInfoMsgStore.Add(msg)
   gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
   gc.stateInfoMsg = msg
   atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}
  • stateInfoMsgStore用来保存收到的成员发来的所有的stateinfo消息,包括自己的。
  • 更新自己的height
  • 每给其他成员分享一次stateinfo的时候,都会自己保留一份,以备不时之需。这种情况正好用上。
  • 启动shouldGossipStateInfo开关

发送点

func (gc *gossipChannel) publishStateInfo() {
   if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
      return
   }
   gc.RLock()
   stateInfoMsg := gc.stateInfoMsg
   gc.RUnlock()
   gc.Gossip(stateInfoMsg)
   if len(gc.GetMembership()) > 0 {
      atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0))
   }
}
  • 可以看到最后会在这里将消息Gossip出去,里面是用emitter模块去处理。emitter你暂时不用关心,里面根据不同的消息类型来决定点对点发送还是群发,不过在这里你只用知道发出去就好了。有时间我会专门讲这个模块。
  • 那么发送是怎么触发的呢?

时机

func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
   chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
   gc := &gossipChannel{
      incTime:                   uint64(time.Now().UnixNano()),
      selfOrg:                   org,
      pkiID:                     pkiID,
      mcs:                       mcs,
      Adapter:                   adapter,
      logger:                    util.GetLogger(util.ChannelLogger, adapter.GetConf().ID),
      stopChan:                  make(chan struct{}, 1),
      shouldGossipStateInfo:     int32(0),
      stateInfoPublishScheduler: time.NewTicker(adapter.GetConf().PublishStateInfoInterval),
      stateInfoRequestScheduler: time.NewTicker(adapter.GetConf().RequestStateInfoInterval),
      orgs:                      []api.OrgIdentityType{},
      chainID:                   chainID,
   }

   ...

   // Periodically publish state info
   go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
   ...
}
  • 在初始化GossipChannel的时候,会定时来监听是否有新的stateinfo消息需要发布。
  • GossipChannel看名字你也大概能猜到干嘛的,这是专门给同channel的成员进行gossip服务的。

接受点

中间省略了很多地方,这个消息的专题都是这种风格,尽量不要被其他的细节给干扰。总之,消息已经被peer收到,下面我们看下收到后,怎么处理。

GossipService

GossipService是统管gossip服务的,所有的动作都由这里发起,消息处理也不例外

消息验证

func (g *gossipServiceImpl) validateStateInfoMsg(msg *proto.SignedGossipMessage) error {
    verifier := func(identity []byte, signature, message []byte) error {
        pkiID := g.idMapper.GetPKIidOfCert(api.PeerIdentityType(identity))
        if pkiID == nil {
            return errors.New("PKI-ID not found in identity mapper")
        }
        return g.idMapper.Verify(pkiID, signature, message)
    }
    identity, err := g.idMapper.Get(msg.GetStateInfo().PkiId)
    if err != nil {
        return errors.WithStack(err)
    }
    return msg.Verify(identity, verifier)
}
  • 这里主要做两个事情
  • 一,判断当前消息的Pkiid是否认识,这是消息接收的基础。因为Gossip有机制能同步成员列表,如果有不认识节点,那么就问题大了。
  • 二,对方消息是私钥进行数字签名的,这里用本地的公钥进行签名校验。这也是安全的基础。

消息处理

func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
    ...

    if msg.IsChannelRestricted() {
        if gc := g.chanState.lookupChannelForMsg(m); gc == nil {
            // If we're not in the channel, we should still forward to peers of our org
            // in case it's a StateInfo message
            if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() {
                if g.stateInfoMsgStore.Add(msg) {
                    g.emitter.Add(&emittedGossipMessage{
                        SignedGossipMessage: msg,
                        filter:              m.GetConnectionInfo().ID.IsNotSameFilter,
                    })
                }
            }
            if !g.toDie() {
                g.logger.Debug("No such channel", msg.Channel, "discarding message", msg)
            }
        } else {
            ...
            gc.HandleMessage(m)
        }
        return
    }

    ...
}
  • 前面校验部分过了后,基本上消息没有大的问题
  • 下面开始正式处理了,这里需要一些背景知识。首先一个Peer加入一个channel, 就会有一个GossipChannel相伴。所以如果查不到这个gc,那么说明Peer不在这个channel里面。
  • 这里是描述了两个场景
    • 首先该Peer不在这个channel里面,但属于同一个Org,也就是组织,那么处于优化的目的,可以尽快将消息扩散出去,以尽快让同channel的节点处理。再次遇到emitter,忽略它。
    • 如果同属一个channel,那么开始交给所属的GossipChannel来处理。

GossipChannel

消息校验

func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
    ...
    if m.IsStateInfoMsg() {
        si := m.GetStateInfo()
        expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
        if !bytes.Equal(expectedMAC, si.Channel_MAC) {
            gc.logger.Warning("Message contains wrong channel MAC(", si.Channel_MAC, "), expected", expectedMAC)
            return false
        }
        return true
    }
    ...
}

  • 通用校验,我们这里就不费功夫了,主要看stateinfo的。

  • 可以看到这里主要做MAC校验,这个校验感觉没什么用,并不能保证其完整性。

消息处理

if m.IsDataMsg() || m.IsStateInfoMsg() {
   added := false

   if m.IsDataMsg() {
      ...
   } else { // StateInfoMsg verification should be handled in a layer above
      //  since we don't have access to the id mapper here
      added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
   }

   if added {
      // Forward the message
      gc.Forward(msg)
      // DeMultiplex to local subscribers
      gc.DeMultiplex(m)

      if m.IsDataMsg() {
         gc.blocksPuller.Add(msg.GetGossipMessage())
      }
   }
  • 当然了这里会直接加到gc的stateInfoMsgStore里面存起来。当然了Add也不是那么简单。简单看看。

    func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
       if !cache.MessageStore.CheckValid(msg) {
          return false
       }
       if !cache.verify(msg) {
          return false
       }
       added := cache.MessageStore.Add(msg)
       if added {
          pkiID := msg.GetStateInfo().PkiId
          cache.MembershipStore.Put(pkiID, msg)
       }
       return added
    }
    
    • CheckValid会将msg跟本地保存做比较,如果是全新的或比已有的新,会判定有效
    • verify主要是校验消息发起人是否同属一个Channel,还有就是这个节点是否有读取成员状态的权力。
    • 按pkiID的维度冗余一遍
  • 如果Add成功,第一件事就是让其他人知道。通过emitter转发出去

  • DeMultiplex是本地的一个多路分发的模块,里面可以增加订阅器,来订阅感兴趣的消息类型,进而处理。不过幸运的是,stateinfo没有人订阅,所以这里不扩散了。

使用

前面讲了这么多StateInfo消息,那么问题来了,这消息收下来到底干嘛用呢?举两个例子就清楚了。当然里面的Properties还有别的用处,这里先不展开。

func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
   peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
   if len(peerIdentity) == 0 {
      gc.logger.Warning("Identity for peer", member.PKIid, "doesn't exist")
      return false
   }
   msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
   if msg == nil {
      return false
   }
   return true
}
  • 这里如果能从stateInfoMsgStore里面找到,说明这个member同属一个通道。
// If we don't have a StateInfo message from the peer,
// no way of validating its eligibility in the channel.
if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {
   gc.logger.Debug("Don't have StateInfo message of peer", msg.GetConnectionInfo())
   return
}
if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
   gc.logger.Warning(msg.GetConnectionInfo(), "isn't eligible for pulling blocks of", string(gc.chainID))
   return
}
  • 这里也是一样,上面判定同属一个通道,下面判定是否同属一个通道下的同一个组织。

总结

  • StateInfo主要给别人分享是否有新的cc部署完成,是否退出通道,是否有新的block写入。
  • 不管如何,第一时间转发消息给成员,利于消息扩散。
  • 然后自循环,对外发布状态更新。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容