nsq源码(12) nsqd 延时消息的投递

除了常规的PUB指令发布消息,nsq还支持延迟投递,例如发布一个延时消息:30秒后自动断开连接

支持DPUB延时投递消息

  • 虽然官方文档中nsqd的http接口没有提供dpub接口,但是从nsqd的源码中可以看到是支持DPUB延时发布的。

单元测试以及使用nsq_to_file消费

  • 修改nsqio/go-nsq/producer.go中的TestProducerPublish()单元测试函数,设置先发10条常规消息后发一条5秒延时消息,内容为发布时间
func TestProducerPublish(t *testing.T) {
    topicName := "test"// + strconv.Itoa(int(time.Now().Unix()))
    msgCount := 10

    config := NewConfig()
    w, _ := NewProducer("127.0.0.1:4150", config)
    w.SetLogger(nullLogger, LogLevelInfo)
    defer w.Stop()

    for i := 0; i < msgCount; i++ {
        err := w.Publish(topicName, []byte(time.Now().Format("2006/1/2 15:04:05")))
        if err != nil {
            t.Fatalf("error %s", err)
        }
    }

    err := w.DeferredPublish(topicName, 5 * time.Second,[]byte(time.Now().Format("2006/1/2 15:04:05")))
    //err := w.Publish(topicName, []byte("bad_test_case"))
    if err != nil {
        t.Fatalf("error %s", err)
    }

    readMessages(topicName, t, msgCount)
}
  • 修改nsq_to_file/file_logger.go中router()文件写入函数格式(消费时间---消息)
func (f *FileLogger) router(r *nsq.Consumer) {
...
    for {
        select {
        ...
        case m := <-f.logChan:
            if f.needsFileRotate() {
                f.updateFile()
                sync = true
            }
            var buffer bytes.Buffer 
            buffer.Write([]byte(time.Now().Format("2006/1/2 15:04:05")))
            buffer.Write([]byte("---"))
            buffer.Write(m.Body)
            _, err := f.writer.Write(buffer.Bytes())
            //_, err := f.writer.Write(m.Body)
            if err != nil {
                log.Fatalf("ERROR: writing message to disk - %s", err)
            }
            _, err = f.writer.Write([]byte("\n"))
            if err != nil {
                log.Fatalf("ERROR: writing newline to disk - %s", err)
            }
            output[pos] = m
            pos++
            if pos == cap(output) {
                sync = true
            }
        }
        ...
    }
}
  • 消费文件内容:
  • 可见最后一条消息时5秒延时投递的</br>


    消费文件.png

nsqd 执行DPUB指令

  • 和PUB的区别是 读取多一个延时参数,并设置到msg.deferred,消息同样是使用topic.PutMessage(msg)发布到topic.memoryMsgChan再由topic.messagePump()协程处理分发给每个channel
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
    ...
    // 延时参数 timeoutMs
    timeoutMs, err := protocol.ByteToBase10(params[2])
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_INVALID",
            fmt.Sprintf("DPUB could not parse timeout %s", params[2]))
    }
    timeoutDuration := time.Duration(timeoutMs) * time.Millisecond

    if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
            fmt.Sprintf("DPUB timeout %d out of range 0-%d",
                timeoutMs, p.ctx.nsqd.getOpts().MaxReqTimeout/time.Millisecond))
    }

    ...
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    
    // 给消息对象设置延时熟悉
    msg.deferred = timeoutDuration
    err = topic.PutMessage(msg)

    client.PublishedMessage(topicName, 1)
    return okBytes, nil
}

topic.messagePump()处理延时消息

func (t *Topic) messagePump() {
    ...
    for i, channel := range chans {
        chanMsg := msg
        // copy the message because each channel
        // needs a unique instance but...
        // fastpath to avoid copy if its the first channel
        // (the topic already created the first copy)
        if i > 0 {
            chanMsg = NewMessage(msg.ID, msg.Body)
            chanMsg.Timestamp = msg.Timestamp
            chanMsg.deferred = msg.deferred
        }
        
        // 处理延时消息
        if chanMsg.deferred != 0 {
            channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
            continue
        }
        err := channel.PutMessage(chanMsg)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                t.name, msg.ID, channel.name, err)
        }
    }
}
  • 由PutMessageDeferred()处理后会将延时消息放入deferredPQ队列,走超时消息的处理了流程,queueScanWorker线程处理
    • 延迟参数被设置到item.Priority,再由超时处理时取出判断是否超时
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

不出意外的话这应该是nsq消息队列的最后一更了,由于工作上没有实际使用所以只能想到什么写什么,后续想到其他的再补充

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容

  • 1. 介绍 最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基...
    aoho阅读 8,930评论 1 16
  • metaq是阿里团队的消息中间件,之前也有用过和了解过kafka,据说metaq是基于kafka的源码改过来的,他...
    菜鸟小玄阅读 32,876评论 0 14
  • 消息中间件选型分析 有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,...
    消失er阅读 2,018评论 0 24
  • 我自认为是一个感性的人,小时候就很爱哭,给同学们的印象都是一被老师留下或批评就会哭鼻子,现在也很爱哭看电影电视剧只...
    慵懒考拉阅读 306评论 0 0
  • 言语未能把我们的情意表达千万分之一 大侠尽管带上你的“秘籍”来匠臣流通,结同道中人,共建精神新帮派吧!
    2c366b3d53ae阅读 246评论 0 0