死磕hyperledger fabric源码|Deliver区块分发

死磕hyperledger fabric源码|Deliver区块分发

文章及代码:https://github.com/blockchainGuide/

分支:v1.1.0

00a1ba6f145942b3fdb1a7b63964967e

概述

Orderer排序服务器提供了区块分发服务接口,接收客户端提交的区块请求消息(Envelope类型,通道头部类型是DELIVER_SEEK_INFOCONFIG_UPDATE等),根据该消息封装的区块搜索信息对象(SeekInfo类型),包括查找最旧区块SeekOldest类型、查找最新区块SeekNewest类型、查找指定位置区块SeekSpecified类型等,构造对应请求范围的范围查询结果迭代器,读取Orderer节点指定通道账本上的区块数据,同时,建立消息处理循环,基于该结果迭代器依次读取请求的区块数据结果,发送给组织的Leader主节点等请求节点。

Orderer节点启动时在本地gRPC服务器上注册了Orderer排序服务器,并创建了Deliver服务处理句柄。当客户端发起Deliver服务请求时,Orderer排序服务器就调用Deliver()方法处理消息请求。

Diliver消息服务处理

入口在orderer/common/server/server.go/Deliver()方法中:

func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
    ...
    policyChecker := func(env *cb.Envelope, channelID string) error { // 定义策略检查器
        chain, ok := s.GetChain(channelID) // 获取指定通道的链支持对象
        if !ok {
            return errors.Errorf("channel %s not found", channelID)
        }
        // 创建消息过滤器
        sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain)
        return sf.Apply(env) // 过滤消息
    }
    server := &deliverMsgTracer{
        DeliverSupport: &deliverHandlerSupport{AtomicBroadcast_DeliverServer: srv},
        msgTracer: msgTracer{
            debug:    s.debug,
            function: "Deliver",
        },
    }
    // Deliver服务消息处理
    return s.dh.Handle(deliver.NewDeliverServer(server, policyChecker, s.sendProducer(srv)))
}

大概做了以下几件事:

  • 定义策略检查器:用于检查接收的区块请求消息必须满足指定通道上的访问控制权限策略的要求
  • 获取指定通道的链支持对象
  • 创建消息过滤器,过滤消息
  • Deliver服务消息处理区块请求

我们来看是如何处理的,进入到s.dh.Handle:

/common/deliver/deliver.go/Handle

func (ds *deliverHandler) Handle(srv *DeliverServer) error {
...
    // 等待消息请求并进行处理
    for {
        ...
        envelope, err := srv.Recv() // 等待接收客户端发送的区块消息请求
    ...
        // 从Orderer节点本地指定通道的区块账本中获取指定区块,并向客户端发送请求
        if err := ds.deliverBlocks(srv, envelope); err != nil {
            return err
        }
...
    }
}

不言而喻,直接进入到deliverBlocks,这部分的内容是最核心的,逐步分析如下:

①:解析PayLoad,检查header和ChannelHeader的合法性

payload, err := utils.UnmarshalPayload(envelope.Payload) // 解析消息负载
...
if payload.Header == nil {}
// 解析通道头部
    chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
err = ds.validateChannelHeader(srv, chdr) // 验证通道头部合法性

②:从chains字典中获取指定通道(chainID)的链支持对象chain,并检查该对象是否存在错误信息

chain, ok := ds.sm.GetChain(chdr.ChannelId) // 获取指定通道的链支持对象

③:创建访问控制对象,并检查消息签名是否符合指定的通道读权限策略**

accessControl, err := newSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
...
err := accessControl.evaluate()

④:解析区块搜索信息SeekInfo结构对象

seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {}

⑤:检查起始位置与结束位置的合法性

if seekInfo.Start == nil || seekInfo.Stop == nil {}

⑥:创建区块账本迭代器并获取起始区块号,同时设置起始位置

cursor, number := chain.Reader().Iterator(seekInfo.Start)

Iterator根据startPosition.Type起始位置对象的类型计算起始区块号startingBlockNumbe,类型如下:

  • SeekPosition_Oldest:搜索最旧的区块,将起始区块号startingBlockNumber设置为 0;

  • SeekPosition_Newest:搜索最新的区块,将起始区块号startingBlockNumber设置为当前通道账本的最新区块号info.Height-1,即账本高度减1;

  • SeekPosition_Specified:搜索指定位置的区块,将起始区块号startingBlockNumber设置为指定起始位置的区块号start.Specified.Number

Iterator 方法的大致功能如下: common/ledger/blockledger/file/impl.go/Iterator

func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
    var startingBlockNumber uint64
    switch start := startPosition.Type.(type) { // 分析起始位置类型
    case *ab.SeekPosition_Oldest: // 搜索最旧区块,区块号为0
        startingBlockNumber = 0
    case *ab.SeekPosition_Newest: // 搜索最新区块
        info, err := fl.blockStore.GetBlockchainInfo() // 获取区块链信息
        if err != nil {
            logger.Panic(err)
        }
        newestBlockNumber := info.Height - 1 // 最新区块号
        startingBlockNumber = newestBlockNumber
    case *ab.SeekPosition_Specified: // 搜索指定位置区块
        startingBlockNumber = start.Specified.Number
        height := fl.Height()
        if startingBlockNumber > height { // 若超过高度,则报错
            return &blockledger.NotFoundErrorIterator{}, 0
        }
    default:
        return &blockledger.NotFoundErrorIterator{}, 0
    }
    // 构造区块迭代器
    iterator, err := fl.blockStore.RetrieveBlocks(startingBlockNumber)
    if err != nil {
        return &blockledger.NotFoundErrorIterator{}, 0
    }
    // 构造账本区块迭代器
    return &fileLedgerIterator{ledger: fl, blockNumber: startingBlockNumber, commonIterator: iterator}, startingBlockNumber
}

⑦:循环读取区块数据,从本地区块账本中获取指定区块号范围内的区块数据,并依次顺序发送给请求客户端

7.1 未找到数据返回

if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
            if number > chain.Reader().Height()-1 {
                return sendStatusReply(srv, cb.Status_NOT_FOUND)
            }
        }

7.2 获取下一个数据

block, status := nextBlock(cursor, erroredChan) // 从本地账本获取下一个区块
if status != cb.Status_SUCCESS {...}

7.3 再次检查是否满足访问控制策略要求

if err := accessControl.evaluate(); err != nil {}

7.4 发送区块数据

if err := sendBlockReply(srv, block); err != nil { }

7.5 循环结束,发送成功状态

if err := sendStatusReply(srv, cb.Status_SUCCESS);

Deliver服务客户端

Leader主节点为例,分析Deliver服务客户端从Orderer节点请求获取区块的流程。

初始化Deliver服务实例

入口:gossip/service/gossip_service.go/InitializeChannel

func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string, support Support) {
    ...
    g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)
    if g.deliveryService[chainID] == nil { // 检查是否已经存在Deliver服务实例
        var err error
        g.deliveryService[chainID], err = g.deliveryFactory.Service(g, endpoints, g.mcs) // 检查是否已经存在Deliver服务实例
        ...
        // peer.gossip.useLeaderElection与peer.gossip.orgLeader是互斥的两个配置参数,
        // 如果将两个都设置为true且没有被定义,则会引起Peer节点错误
        // 启用Leader主节点动态选举机制
        leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
        // 静态设置为组织Leader主节点
        isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")
        ...
        if leaderElection { // 启用了动态Leader主节点选举机制
            logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
            g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, support.Committer))
        } else if isStaticOrgLeader {
            // 若静态指定了Leader主节点,则连接 Orderer节点请求区块数据
            // 启动指定通道上的Deliver服务实例请求获取区块数据
            g.deliveryService[chainID].StartDeliverForChannel(chainID, support.Committer, func() {})
        } ....
}

首先检查是否已经存在Deliver实例,然后根据Leader主节点动态选举机制还是静态指定了Leader主节点分别进入不同的分支,如果是静态指定了Leader主节点,则连接 Orderer节点请求区块数据,启动指定通道上的Deliver服务实例请求获取区块数据。接下来关注启动Deliver服务实例。

启动Deliver服务实例

主要做了以下事:

①:获取绑定指定通道的区块提供者

if _, exist := d.blockProviders[chainID];

②:不存在区块提供者

client := d.newClient(chainID, ledgerInfo)
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
    requester := &blocksRequester{ //定义区块请求者blocksRequester结构对象
        tls:     comm.TLSEnabled(),
        chainID: chainID,
    }
    //定义broadcastSetup()方法
    broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
        return requester.RequestBlocks(ledgerInfoProvider) // 请求区块数据
    }
    ...
    //创建connProducer对象
    connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
    //// 创建broadcastClient客户端
    bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
    requester.client = bClient // 设置到区块请求者对象的客户端
    return bClient
}

2.1 创建Deliver服务实例上的 broadcastClient客户端

client := d.newClient(chainID, ledgerInfo)

2.2 创建指定通道关联的区块提供者

d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)

2.3 启动goroutine开始从Orderer节点请求获取区块,并发送到组织内其他Peer节点

go func() {
            d.blockProviders[chainID].DeliverBlocks() // 请求获取区块数据
            finalizer()
        }()

接下来就是调用区块提供者对象的DeliverBlocks()方法,向Orderer节点发送消息请求的区块数据。

请求获取区块数据

入口在:core/deliverservice/blocksprovider/blocksprovider.go/DeliverBlocks(),具体分析如下:

①:接收消息

msg, err := b.client.Recv() 

② :根据消息类型进行处理

大致有以下几种消息类型:

  • DeliverResponse_Status:用于描述Deliver服务请求执行状态。
  • DeliverResponse_Block:包含请求获取的区块数据。

2.1 DeliverResponse_Status分支

如果DeliverBlocks()方法接收到Status_SUCCESS状态,则说明本次区块请求处理成功,表示已经接收完毕区块请求消息指定范围内的区块数据。除此以外的其他状态消息都是非成功的执行状态消息,包括Status_BAD_REQUEST、Status_FORBIDDEN等

if t.Status == common.Status_SUCCESS {}
if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN {}
if t.Status == common.Status_BAD_REQUEST {
                b.client.Disconnect(false)
            } else {
                b.client.Disconnect(true)
            }

2.2 DeliverResponse_Block分支

2.2.1 获取区块号

seqNum := t.Block.Header.Number

2.2.2获取经过序列化的区块字节数组

marshaledBlock, err := proto.Marshal(t.Block)

2.2.3验证区块

err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), seqNum, marshaledBlock);

2.2.4获取通道Peer节点数量

numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID)))

2.2.5创建消息负载和Gossip消息

payload := createPayload(seqNum, marshaledBlock) 
gossipMsg := createGossipMsg(b.chainID, payload)

2.2.6添加消息负载到本地消息负载缓冲区,等待提交账本

err := b.gossip.AddPayload(b.chainID, payload)

2.2.7通过Gossip消息协议发送区块消息到组织内的其他节点

基于Gossip消息协议将DataMsg类型数据消息(只含有区块数据)分发到组织内的其他Peer节点上,并保存到该节点的消息存储器上。

b.gossip.Gossip(gossipMsg)

参考

https://github.com/blockchainGuide/ (文章图片代码资料)

微信公众号:区块链技术栈

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容