NSQ 源码学习笔记(三)

上一篇的最后一段代码中,channel中的消息在发送至客户端时,也同步了一份消息发送到了inFight队列中

    subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

这里其实一开始不是很明白,在上网查阅了资料后,了解到inFlight队列是NSQ用来实现消息至少投递一次的。知道了功能后,再来看就很明了了。

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
    now := time.Now()
    msg.clientID = clientID
    msg.deliveryTS = now
    msg.pri = now.Add(timeout).UnixNano()
    err := c.pushInFlightMessage(msg)
    if err != nil {
        return err
    }
    c.addToInFlightPQ(msg)
    return nil
}

上述代码中,首先初始化消息的过期时间timeout+now,通过将msg加入到InFlight队列中,InFlight其实是一个堆排序队列,优先级是按照超时时间来排序的,越靠近过期时间,将会越靠前。这里只是将消息存入队列,那么在哪里消费呢?我们在第一篇笔记中的末尾,Nsqd在完成监听部分的初始化后,有四个自启动的goroutine,第一个通过Wrap启动的n.queueScanLoop()就是用来执行消费的。

func (n *NSQD) queueScanLoop() {
    //任务派发 队列
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)

    //任务结果 队列
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)

    // 用来优雅关闭
    closeCh := make(chan int)
    // 利用Ticket来定期开始任务和调整worker
    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    // 调整worker
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C: // 开始一次任务的派发
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:  // 重新调整 worker 数量
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:  // 退出
            goto exit
        }
        
        // num最大为nsqd的所有channel总数
        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

        loop:
        // 随机取出num个channel, 派发给 worker 进行 扫描
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        // 接收 扫描结果, 统一 有多少 channel 是 "脏" 的
        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        // 假如 "脏" 的 "比例" 大于阀值, 则不等待 workTicker
        // 马上进行下一轮 扫描
        if float64(numDirty) / float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

    exit:
    n.logf("QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
//  1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    // 校验启动的worker数量,最大为nsqd的所有channel数 * 1/4,
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        // 当前启动的worker数等于设定的idealPoolSize,那么直接返回,
        // 如果大于了idealPoolSize,通过closeCh关闭一个worker
        // 如果未达到idealPoolSize,启动worker的goroutine
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

worker的具体实现是queueScanWorker

// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            // 实现消息至少被投递一次
            if c.processInFlightQueue(now) {
                dirty = true
            }
            // 实现延迟消息队列
            if c.processDeferredQueue(now) {
                dirty = true
            }
            // 如果有过期消息的存在,则dirty
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

func (c *Channel) processInFlightQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.inFlightMutex.Lock()
        // 从队列中获取已经过期的消息
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()
        
        if msg == nil {
            goto exit
        }
        dirty = true
        // 如果获取到了符合条件的msg,按msg.ID将msg在infight队列中删除
        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        // 消息在channel中发起重新投递
        c.doRequeue(msg)
    }

exit:
    return dirty
}

// 延迟消息队列的实现
func (c *Channel) processDeferredQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.deferredMutex.Lock()
        item, _ := c.deferredPQ.PeekAndShift(t)
        c.deferredMutex.Unlock()

        if item == nil {
            goto exit
        }
        dirty = true

        msg := item.Value.(*Message)
        _, err := c.popDeferredMessage(msg.ID)
        if err != nil {
            goto exit
        }
        c.doRequeue(msg)
    }

exit:
    return dirty
}

  上面的两个函数processDeferredQueueprocessInFlightQueue的实现基本一致,那为什么相同的逻辑要实现两次呢。两个队列,DeferredQueue 用 head 包实现, InFlightQueue 自己又实现了一次heap, 其实跟 DeferredQueue 不是一样的么?

  之前两个就真是是一样的, 后来有一个提交,里面的注释是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.

https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53

  意思就是说, 这些 队列里 存的是 Message 这个类型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而这个value 是一个 interface{} , 赋值 和 取值 都需要做类型推断 和 包装,那么作为 InFlightQueue 这个 “高负荷” 的队列, 减少这种 “类型推断和包装” , 有利于提高性能

测试一下:

type Item struct {
    d1 int
    d2 int
}

func BenchmarkT1(b *testing.B) {
    q := make([]*Item, 0)   // 不需要类型推断的 slice
    for i := 0; i < b.N; i++ {
        q = append(q, &Item{i, i})
    }
    for _, hero := range q {
        hero.d1++
    }
}

func BenchmarkT2(b *testing.B) {
    q := make([]interface{}, 0)
    for i := 0; i < b.N; i++ {
        q = append(q, &Item{i, i})
    }
    for _, hero := range q {
        hero.(*Item).d1++   // 需要做类型推断
    }
}

测试结果:

BenchmarkT1-8           10000000               241 ns/op
BenchmarkT2-8            5000000               332 ns/op
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,063评论 6 510
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,805评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,403评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,110评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,130评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,877评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,533评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,429评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,947评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,078评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,204评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,894评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,546评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,086评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,195评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,519评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,198评论 2 357

推荐阅读更多精彩内容