HyperLedgerFabric源码解读(2)-chanState

主要是关于channel state相关内容

package gossip

type channelState struct {
    stopping int32                              // channel全部停止的标志值
    sync.RWMutex                                // 读写锁
    channels map[string]channel.GossipChannel  //  gossip channels: <channel-id, GossipChannel>
    g        *gossipServiceImpl                // gossip service
}
// 停止所有channel
// 1、判断stop标志是否满足
// 2、首先设置stop标志值 = 1: 使用atomic的storeInt32原子操作
//    接着迭代channels的map 逐一执行channel.stop
//    关闭所有channel操作在读写锁的环境内执行
func (cs *channelState) stop() {
    if cs.isStopping() {
        return
    }
    atomic.StoreInt32(&cs.stopping, int32(1))
    cs.Lock()
    defer cs.Unlock()
    for _, gc := range cs.channels {
        gc.Stop()
    }
}

// stopping 是否等于1 
func (cs *channelState) isStopping() bool {
    return atomic.LoadInt32(&cs.stopping) == int32(1)
}
// 根据gossip的receive message获取对应的channel
// 需要对receive message其中gossip message进行提取: 状态请求消息单独处理,主要由于该类消息内部提供了通道身份验证码和公钥ID
func (cs *channelState) lookupChannelForMsg(msg proto.ReceivedMessage) channel.GossipChannel {
    if msg.GetGossipMessage().IsStateInfoPullRequestMsg() {        // 状态信息获取请求消息
        sipr := msg.GetGossipMessage().GetStateInfoPullReq()
        mac := sipr.Channel_MAC                                    // channel_mac:身份验证码,代表发送消息的peer知道对应的channel名称
        pkiID := msg.GetConnectionInfo().ID                        // 根据消息获取对应的connection信息: connection_id
        return cs.getGossipChannelByMAC(mac, pkiID)               // 根据channel身份验证码 + 公钥ID 获取对应的gossip channel
    }
    return cs.lookupChannelForGossipMsg(msg.GetGossipMessage().GossipMessage) // 根据gossip消息获取对应的channel
}
// 获取gossip message对应的channel
// 主要关键点提取出Gossip Message里面的StateInfoMsg:该类信息内部包括了channel id,能够通过channel_id获取gossip channel
func (cs *channelState) lookupChannelForGossipMsg(msg *proto.GossipMessage) channel.GossipChannel {
    if !msg.IsStateInfoMsg() {  // 非状态信息消息
        // If we reached here then the message isn't:
        // 1) StateInfoPullRequest
        // 2) StateInfo
        // Hence, it was already sent to a peer (us) that has proved it knows the channel name, by
        // sending StateInfo messages in the past.
        // Therefore- we use the channel name from the message itself.
        // 由于已经发送信息给对等节点 那就说明通过已经发送的状态信息消息证明是知道channel名称的
        // 因此通过message本身就可以获取到对应的channel name
        return cs.getGossipChannelByChainID(msg.Channel)
    }

    // Else, it's a StateInfo message.
    stateInfMsg := msg.GetStateInfo() // 状态信息消息处理同状态请求消息
    return cs.getGossipChannelByMAC(stateInfMsg.Channel_MAC, stateInfMsg.PkiId)
}
// 遍历所有的channels 寻找对应的channel的MAC计算和消息的MAC相等
// 如果是的话,对等节点签名的消息里知道channel的名字 主要是因为在消息验证时 对应的公钥ID会被检查的
// 整个遍历过程至于读锁下 保证数据的不可变
func (cs *channelState) getGossipChannelByMAC(receivedMAC []byte, pkiID common.PKIidType) channel.GossipChannel {
    // Iterate over the channels, and try to find a channel that the computation
    // of the MAC is equal to the MAC on the message.
    // If it is, then the peer that signed the message knows the name of the channel
    // because its PKI-ID was checked when the message was verified.
    cs.RLock()
    defer cs.RUnlock()
    for chanName, gc := range cs.channels {                               // 遍历所有channel
        mac := channel.GenerateMAC(pkiID, common.ChainID(chanName))        // 根据通道channel的名称 + 公钥 生产对应的身份验证码
        if bytes.Equal(mac, receivedMAC) {                                 // 将新生成的通道验证码与消息本身的进行比较
            return gc
        }
    }
    return nil
}
// 根据channel_id来获取gossip channel
// 当前所有的channels是不是已停止
// 根据channel id获取对应的gossip channel: channelState中map[string]*GossipChannel: <channelID, GossipChannel>
func (cs *channelState) getGossipChannelByChainID(chainID common.ChainID) channel.GossipChannel {
    if cs.isStopping() {
        return nil
    }
    cs.RLock()
    defer cs.RUnlock()
    return cs.channels[string(chainID)]
}
// 加入channel
// 1、首先判断对应的channels是否全部关闭
// 2、首先判断对应的channel是否已存在:channels[string(chainID)]
// 3、若存在:则直接配置组织列表定义有资格进入channel的peer
//    若不存在:a、首先获取gossip service的connection公钥:PKIid
//              b、根据gossipServiceImpl + discovery构成gossipAdapterImpl
//              c、在新建gossipChannel:公钥PKIid、组织selfOrgnization、消息加密服务MCS、通道IDchanId、gossipAdapter、加入信息joinMsg,并加入到指定的channel的组织列表中
//              d、并将新建的gossipChannel添加到本地:channels
func (cs *channelState) joinChannel(joinMsg api.JoinChannelMessage,   // 加入channel消息
    chainID common.ChainID) {                                         // 指定的channel
    if cs.isStopping() {
        return
    }
    cs.Lock()
    defer cs.Unlock()
    if gc, exists := cs.channels[string(chainID)]; !exists {
        pkiID := cs.g.comm.GetPKIid()
        ga := &gossipAdapterImpl{gossipServiceImpl: cs.g, Discovery: cs.g.disc}
        gc := channel.NewGossipChannel(pkiID, cs.g.selfOrg, cs.g.mcs, chainID, ga, joinMsg)
        cs.channels[string(chainID)] = gc
    } else {
        gc.ConfigureChannel(joinMsg)
    }
}
// gossip适配器:具备gossip相关功能,以及节点发现
// 通过组合的方式使得当前定义的类型 能够完全具备内部引用类型的内容(filed和method)
// 例子:type TestA struct {  // 字段
//              name string
//              age int
//      }
//
//      func (a *TestA) testA(){  // 方法
//          fmt.Println("this is a TestA method!!!")
//      }
//
//      type combinationTestA struct { // 通过组合的方式 使得combinationTestA具备了TestA的内容
//          *TestA
//      }
//
// ================== 应用 =================
//      ta := TestA{
//          name: "Hello World",
//          age: 30,
//      }
//
//      cta := combinationTestA{&ta}
//      fmt.Println(cta.age)
//      fmt.Println(cta.name)
//      cta.testA()

type gossipAdapterImpl struct {
    *gossipServiceImpl
    discovery.Discovery
}
// 获取channel配置
func (ga *gossipAdapterImpl) GetConf() channel.Config {
    return channel.Config{
        ID:                          ga.conf.ID,                         // channelID = gossipID
        MaxBlockCountToStore:        ga.conf.MaxBlockCountToStore,       // 缓存区大小
        PublishStateInfoInterval:    ga.conf.PublishStateInfoInterval,   // 状态消息推送间隔(秒)
        PullInterval:                ga.conf.PullInterval,               // 消息获取间隔(秒)
        PullPeerNum:                 ga.conf.PullPeerNum,                // 消息获取节点数
        RequestStateInfoInterval:    ga.conf.RequestStateInfoInterval,   // 状态消息获取间隔(秒)
        BlockExpirationInterval:     ga.conf.PullInterval * 100,         // 获取有效区块间隔(毫秒)
        StateInfoCacheSweepInterval: ga.conf.PullInterval * 5,           // 状态信息缓冲清除间隔(毫秒)
    }
}
// Gossip消息签名: GossipMessage => SignedGossipMessage
// 1、定义一个signer:实现消息签名功能
// 2、构建一个SignedGossipMessage执行sign操作完成信息的签名:等到envelope
// 3、输出完整的签名gossip message(SignedGossipMessage:envelope + gossipmessage)
func (ga *gossipAdapterImpl) Sign(msg *proto.GossipMessage) (*proto.SignedGossipMessage, error) {
    signer := func(msg []byte) ([]byte, error) {   // 签名消息 通过消息安全服务来完成
        return ga.mcs.Sign(msg)
    }
    sMsg := &proto.SignedGossipMessage{            // 构建签名消息
        GossipMessage: msg,
    }
    e, err := sMsg.Sign(signer)                    // 将SignedGossipMessage与签名函数绑定
    if err != nil {
        return nil, err
    }
    return &proto.SignedGossipMessage{
        Envelope:      e,
        GossipMessage: msg,
    }, nil
}
// gossip传播消息(签名后的消息)
// Gossip gossips a message
// 将签名后的消息绑定对应的过滤策略形成gossip传播消息emittedGossipMessage
// 接着将emittedGossipMessage添加emitter batch中等到emit
func (ga *gossipAdapterImpl) Gossip(msg *proto.SignedGossipMessage) {
    ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{  // 将emitterGossipMessage添加到batch中
        SignedGossipMessage: msg,
        filter: func(_ common.PKIidType) bool {  // 根据公钥进行消息过滤策略:此处不做处理 直接全部允许
            return true
        },
    })
}
// 转发消息
// 向下一级发送消息:指定签名消息SignedGossipMessage以及过滤策略filter:connection_id不相同过滤
// Forward sends message to the next hops
func (ga *gossipAdapterImpl) Forward(msg proto.ReceivedMessage) {
    ga.gossipServiceImpl.emitter.Add(&emittedGossipMessage{
        SignedGossipMessage: msg.GetGossipMessage(),
        filter:              msg.GetConnectionInfo().ID.IsNotSameFilter,
    })
}
// 给指定peers发送信息
// 参数:SignedGossipMessage 签名消息
//       peers 接收消息的节点
func (ga *gossipAdapterImpl) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
    ga.gossipServiceImpl.comm.Send(msg, peers...)
}
// 验证状态消息StateInfoMessage: 若消息无效则返回error,否则返回nil
// ValidateStateInfoMessage returns error if a message isn't valid
// nil otherwise
func (ga *gossipAdapterImpl) ValidateStateInfoMessage(msg *proto.SignedGossipMessage) error {
    return ga.gossipServiceImpl.validateStateInfoMsg(msg)
}
// 返回一个确定对等节点的组织ID 根据公钥ID:PKIid
// GetOrgOfPeer returns the organization identifier of a certain peer
func (ga *gossipAdapterImpl) GetOrgOfPeer(PKIID common.PKIidType) api.OrgIdentityType {
    return ga.gossipServiceImpl.getOrgOfPeer(PKIID)
}
// 返回一个对等节点的ID  否则返回nil 代表没有发现
// GetIdentityByPKIID returns an identity of a peer with a certain
// pkiID, or nil if not found
func (ga *gossipAdapterImpl) GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType {
    identity, err := ga.idMapper.Get(pkiID)
    if err != nil {
        return nil
    }
    return identity
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,597评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,457评论 25 707
  • 用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金 Cover 有什么料? 从这篇文章中你...
    hw1212阅读 12,691评论 2 59
  • 窗帘的缝隙里投进的光 照亮翻飞的记忆 从容不怨 我凝视光来的方向 仿若站在田野上 阳光刺痛了渴望 我透过镜中的双眼...
    Alberxyxuxuxu阅读 268评论 0 0
  • 亲爱的宝贝,早上你起床的时候都快九点半了,可能是昨天睡的太晚了,也可能寒假里你过的太懒散了,洗漱后你简单吃了早餐就...
    刘韩旭妈妈阅读 135评论 0 1