HyperLedgerFabric源码解读(4)-batcher

关于signedGossipMessage的emit组件,以及batchingEmitterImpl

// 提交签名gossip消息后的回调函数
type emitBatchCallback func([]interface{})

// batchingEmitter常用语gossip的提交和转发阶段
//   消息被添加批处理发射器中,接着定期分批被转发T次,然后丢弃
//   若是batchingEmitter中存放的message数达到了一定的容量,将会触发消息分派
//batchingEmitter is used for the gossip push/forwarding phase.
// Messages are added into the batchingEmitter, and they are forwarded periodically T times in batches and then discarded.
// If the batchingEmitter's stored message count reaches a certain capacity, that also triggers a message dispatch
type batchingEmitter interface {
    // 将一个消息添加到批次里
    // Add adds a message to be batched
    Add(interface{})

    // 停止组件
    // Stop stops the component
    Stop()

    // 待发送消息的数量
    // Size returns the amount of pending messages to be emitted
    Size() int
}
// newBatchingEmitter accepts the following parameters:
// iterations: number of times each message is forwarded
// burstSize: a threshold that triggers a forwarding because of message count
// latency: the maximum delay that each message can be stored without being forwarded
// cb: a callback that is called in order for the forwarding to take place
func newBatchingEmitter(iterations,  // 每次各自消息被转发的次数
    burstSize int,                    // 每次触发消息转发的消息总量的阈值
    latency time.Duration,            // 每条消息被存放而不被转发的延迟周期
    cb emitBatchCallback) batchingEmitter { // 为了进行转发而调用的回调函数
    // 消息被转发的次数不能 < 0
    if iterations < 0 {
        panic(errors.Errorf("Got a negative iterations number"))
    }

    // batchingEmitterImpl batchingEmitter实现
    p := &batchingEmitterImpl{
        cb:         cb,             // 转发而进行的回调函数
        delay:      latency,        // batch中消息存放的有效期
        iterations: iterations,     // 每次转发消息时每条消息被转发的次数
        burstSize:  burstSize,      // 转发消息触发的消息容量阈值
        lock:       &sync.Mutex{},  //
        buff:       make([]*batchedMessage, 0),  // batchmessage缓存
        stopFlag:   int32(0),                    // batchingEmitter停止的标记
    }

    if iterations != 0 {                        // 定期指定消息Emit 启动一个goroutine执行
        go p.periodicEmit()
    }

    return p
}
// 周期性执行消息emit
// 循环判断当前batchingEmitter是否停止
// 首先延迟delay周期 保证batch中的消息已超出缓存在batch最大有效期
// 获取到已超出缓存在batch最大有效期的消息执行提交同时保证执行过程在lock环境下
func (p *batchingEmitterImpl) periodicEmit() {
    for !p.toDie() {
        time.Sleep(p.delay)
        p.lock.Lock()
        p.emit()
        p.lock.Unlock()
    }
}
// 提交消息到其他peer
func (p *batchingEmitterImpl) emit() {
    // 保证提交消息时batchingEmitter仍可用
    if p.toDie() {
        return
    }
    if len(p.buff) == 0 {  // batch的buffer里面没有可提交的消息
        return
    }
    msgs2beEmitted := make([]interface{}, len(p.buff)) // 本地emit数据缓存
    for i, v := range p.buff {
        msgs2beEmitted[i] = v.data
    }

    p.cb(msgs2beEmitted)   // 执行回调函数
    p.decrementCounters()  // 减少已提交的消息量
}
// 减少batch中已提交的消息数  保证batch消息量都是未提交的
func (p *batchingEmitterImpl) decrementCounters() {
    //
    n := len(p.buff)
    for i := 0; i < n; i++ {
        msg := p.buff[i]
        msg.iterationsLeft--
        if msg.iterationsLeft == 0 {
            p.buff = append(p.buff[:i], p.buff[i+1:]...)
            n--
            i--
        }
    }
}
// batchingEmitter是否可用标记
func (p *batchingEmitterImpl) toDie() bool {
    return atomic.LoadInt32(&(p.stopFlag)) == int32(1)
}
// batchingEmitter实现
type batchingEmitterImpl struct {
    iterations int
    burstSize  int
    delay      time.Duration
    cb         emitBatchCallback
    lock       *sync.Mutex
    buff       []*batchedMessage
    stopFlag   int32
}
// batch消息
type batchedMessage struct {
    data           interface{}
    iterationsLeft int
}
// 停止
func (p *batchingEmitterImpl) Stop() {
    atomic.StoreInt32(&(p.stopFlag), int32(1))
}
// batch中已缓存的消息总量
func (p *batchingEmitterImpl) Size() int {
    p.lock.Lock()
    defer p.lock.Unlock()
    return len(p.buff)
}
// 添加消息到batch缓存中
// 一旦当前batch的buffer中消息量已达到消息发送阈值  则执行emit
func (p *batchingEmitterImpl) Add(message interface{}) {
    if p.iterations == 0 {
        return
    }
    p.lock.Lock()
    defer p.lock.Unlock()

    // 添加消息到batch的buffer中 等待触发发送阈值 执行emit
    p.buff = append(p.buff, &batchedMessage{data: message, iterationsLeft: p.iterations})

    if len(p.buff) >= p.burstSize {
        p.emit()
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前言 您将在本文当中了解到,往网页中添加数据,从传统的dom操作过渡到数据层操作,实现同一个目标,两种不同的方式....
    itclanCoder阅读 26,022评论 1 12
  • 组件(Component)是Vue.js最核心的功能,也是整个架构设计最精彩的地方,当然也是最难掌握的。...
    六个周阅读 5,668评论 0 32
  • 原文地址及更多文章 - century's home 首先让我们先恭喜RNG,其次我们恭喜LGD(data, lo...
    CenturyGuo阅读 2,149评论 1 3
  • 徘徊且无力, 提笔抖精神。 放眼云窗外, 思绪去无痕。 人远久不归, 承心倚香门。 对坐两相望, 肝肠自矛盾。 漠...
    江南信阅读 380评论 0 1
  • 春日时多雨,却又偏偏有阳光,雨水滴答落下,转眼遇见了光,然后又感受了热,渐渐化为氤氲的气雾。气雾升腾掠过了心头,转...
    寒渠生阅读 199评论 0 0