死磕hyperledger fabric源码|交易广播

死磕hyperledger fabric源码|交易广播

文章及代码:https://github.com/blockchainGuide/

分支:v1.1.0

40fe3d4a84cc46e22caf5e06071c3aa7

前言

Hyperledger Fabric提供了Broadcast(srv ab.AtomicBroadcast_BroadcastServer)交易广播服务接口,接收客户端提交的签名交易消息请求,交由共识组件链对象对交易进行排序与执行通道管理,按照交易出块规则切割打包,构造新区块并提交账本。同时,通过Deliver()区块分发服务接口,将区块数据发送给通道组织内发起请求的Leader主节点,再基于Gossip消息协议广播到组织内的其他节点上,从而实现广播交易消息的目的。

Broadcast服务消息处理

Orderer节点启动时已经在本地的gRPC服务器上注册了Orderer排序服务器,并创建了Broadcast服务处理句柄。当客户端调用Broadcast()服务接口发起服务请求时,Orderer排序服务器会调用Broadcast()→s.bh.Handle()方法处理请求,流程如下:

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
...
    return s.bh.Handle(&broadcastMsgTracer{
    ...
    })
}
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
  ...
}

主要就是这个Handle的处理,分析如下:

①:等待接收处理消息

msg, err := srv.Recv()

②:解析获取通道头部chdr、配置交易消息标志位isConfig、通道链支持对象(通道消息处理器)

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)

③:检查共识组件链对象是否准备好接收新的交易消息

if err = processor.WaitReady(); err != nil {}

④:分类处理消息

处理普通消息

4.1 解析获取通道的最新配置序号

configSeq, err := processor.ProcessNormalMsg(msg)
/orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    configSeq = s.support.Sequence()
    err = s.filters.Apply(env)
    return
}

configSeq是最新配置序号,默认初始值为0,新建应用通道后该配置序号自增为1,通过比较该序号就能判断当前通道配置版本是否发生了更新,从而确定当前交易消息是否需要重新过滤与重新排序。

接着就是使用自带的默认通道消息过滤器过滤消息,有以下过滤条件:

  • 验证不能为空
  • 拒绝过期的签名者身份证书
  • 消息最大字节数过滤器(98MB)
  • 消息签名验证过滤器

4.2 构造新的普通交易消息并发送到共识组件链对象请求处理

err = processor.Order(msg, configSeq) 

这里我们只关注kafka的共识组件处理。

首先序列化消息,然后将该消息发送到Kafka集群的指定分区上请求排序,再转发给Kafka共识组件链对象请求打包出块。

/orderer/consensus/kafka/chain.go
func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
    marshaledEnv, err := utils.Marshal(env)
    if err != nil {
        return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
    }
    if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq, originalOffset)) {
        return fmt.Errorf("cannot enqueue")
    }
    return nil
}

我们来看看enqueue方法是如何做的:

func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
    logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
    select {
    case <-chain.startChan: // // 共识组件在启动阶段启动完成
        select {
        case <-chain.haltChan: //  已经关闭chain.startChan通道
        ...
            }
            //// 创建Kafka生产者消息
            message := newProducerMessage(chain.channel, payload)
            //// 发送消息到Kafka集群请求排序
            if _, _, err = chain.producer.SendMessage(message); err != nil {
            ...
    }
}

处理通道配置交易消息

4.3 获取配置交易消息与通道的最新配置序号

config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)

代码位置:/orderer/common/msgprocessor/systemchannel.go/ProcessConfigUpdateMsg,大概做了以下事情:

  • 获取消息中的通道ID
  • 检查消息中的通道ID与当前通道ID是否一致,一致的话交由标准通道处理器处理
  • 创建新应用通道的通道配置实体Bundle结构对象
  • 构造新的通道配置更新交易消息(ConfigEnvelope类型),注意将该消息的通道配置序号更新为1
  • 创建内层的通道配置交易消息(CONFIG类型)
  • 创建外层的配置交易消息(ORDERER_TRANSACTION类型)
  • 应用系统通道的消息过滤器
  • 返回新的通道配置交易消息与当前系统通道的配置序号
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    channelID, err := utils.ChannelID(envConfigUpdate) // 获取消息中的通道ID
    ...
    //检查消息中的通道ID与当前通道ID是否一致
    if channelID == s.support.ChainID() {
        //// 交由标准通道处理器处理
        return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
    }
    ...
    // 创建新的应用通道,其通道配置序号默认初始化为0
    // 创建新应用通道的通道配置实体Bundle结构对象
    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
    ...
    //构造新的通道配置更新交易消息(ConfigEnvelope类型),注意将该消息的通道配置序号更新为1
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
    ...
    //创建内层的通道配置交易消息(CONFIG类型)
    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    ...
    //创建外层的配置交易消息(ORDERER_TRANSACTION类型)
    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    ...
    // 应用系统通道的消息过滤器
    err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
    ...
    //返回新的通道配置交易消息与当前系统通道的配置序号
    return wrappedOrdererTransaction, s.support.Sequence(), nil

4.4 构造新的配置交易消息发送到共识组件链对象请求排序

err = processor.Configure(config, configSeq)

这里我们依旧只是考虑kafka共识组件,processor.Configure()方法实际上是调用chainImpl.configure()方法,同样构造Kafka常规消息(KafkaMessageRegular类型)。其中,Class消息类别属于KafkaMessageRegular_CONFIG类型,包含了通道配置交易消息、 通道配置序号configSeq与初始消息偏移量originalOffset(0)。接着,调用chain.enqueue()方法,将其发送到Kafka集群上指定主题(chainID)和分区号(0)的分区上,同时,由Kafka共识组件链对象分区消费者channelConsumer获取该消息,再交由给Kafka共识组件链对象请求打包出块。

⑤:发送成功处理状态响应消息

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})

整个流程图如下:

image-20210125112957202

参考

https://github.com/blockchainGuide/ (文章图片代码资料)

微信公众号:区块链技术栈

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容