nsq源码(12) nsqd 延时消息的投递

除了常规的PUB指令发布消息,nsq还支持延迟投递,例如发布一个延时消息:30秒后自动断开连接

支持DPUB延时投递消息

  • 虽然官方文档中nsqd的http接口没有提供dpub接口,但是从nsqd的源码中可以看到是支持DPUB延时发布的。

单元测试以及使用nsq_to_file消费

  • 修改nsqio/go-nsq/producer.go中的TestProducerPublish()单元测试函数,设置先发10条常规消息后发一条5秒延时消息,内容为发布时间
func TestProducerPublish(t *testing.T) {
    topicName := "test"// + strconv.Itoa(int(time.Now().Unix()))
    msgCount := 10

    config := NewConfig()
    w, _ := NewProducer("127.0.0.1:4150", config)
    w.SetLogger(nullLogger, LogLevelInfo)
    defer w.Stop()

    for i := 0; i < msgCount; i++ {
        err := w.Publish(topicName, []byte(time.Now().Format("2006/1/2 15:04:05")))
        if err != nil {
            t.Fatalf("error %s", err)
        }
    }

    err := w.DeferredPublish(topicName, 5 * time.Second,[]byte(time.Now().Format("2006/1/2 15:04:05")))
    //err := w.Publish(topicName, []byte("bad_test_case"))
    if err != nil {
        t.Fatalf("error %s", err)
    }

    readMessages(topicName, t, msgCount)
}
  • 修改nsq_to_file/file_logger.go中router()文件写入函数格式(消费时间---消息)
func (f *FileLogger) router(r *nsq.Consumer) {
...
    for {
        select {
        ...
        case m := <-f.logChan:
            if f.needsFileRotate() {
                f.updateFile()
                sync = true
            }
            var buffer bytes.Buffer 
            buffer.Write([]byte(time.Now().Format("2006/1/2 15:04:05")))
            buffer.Write([]byte("---"))
            buffer.Write(m.Body)
            _, err := f.writer.Write(buffer.Bytes())
            //_, err := f.writer.Write(m.Body)
            if err != nil {
                log.Fatalf("ERROR: writing message to disk - %s", err)
            }
            _, err = f.writer.Write([]byte("\n"))
            if err != nil {
                log.Fatalf("ERROR: writing newline to disk - %s", err)
            }
            output[pos] = m
            pos++
            if pos == cap(output) {
                sync = true
            }
        }
        ...
    }
}
  • 消费文件内容:
  • 可见最后一条消息时5秒延时投递的</br>


    消费文件.png

nsqd 执行DPUB指令

  • 和PUB的区别是 读取多一个延时参数,并设置到msg.deferred,消息同样是使用topic.PutMessage(msg)发布到topic.memoryMsgChan再由topic.messagePump()协程处理分发给每个channel
func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
    ...
    // 延时参数 timeoutMs
    timeoutMs, err := protocol.ByteToBase10(params[2])
    if err != nil {
        return nil, protocol.NewFatalClientErr(err, "E_INVALID",
            fmt.Sprintf("DPUB could not parse timeout %s", params[2]))
    }
    timeoutDuration := time.Duration(timeoutMs) * time.Millisecond

    if timeoutDuration < 0 || timeoutDuration > p.ctx.nsqd.getOpts().MaxReqTimeout {
        return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
            fmt.Sprintf("DPUB timeout %d out of range 0-%d",
                timeoutMs, p.ctx.nsqd.getOpts().MaxReqTimeout/time.Millisecond))
    }

    ...
    topic := p.ctx.nsqd.GetTopic(topicName)
    msg := NewMessage(topic.GenerateID(), messageBody)
    
    // 给消息对象设置延时熟悉
    msg.deferred = timeoutDuration
    err = topic.PutMessage(msg)

    client.PublishedMessage(topicName, 1)
    return okBytes, nil
}

topic.messagePump()处理延时消息

func (t *Topic) messagePump() {
    ...
    for i, channel := range chans {
        chanMsg := msg
        // copy the message because each channel
        // needs a unique instance but...
        // fastpath to avoid copy if its the first channel
        // (the topic already created the first copy)
        if i > 0 {
            chanMsg = NewMessage(msg.ID, msg.Body)
            chanMsg.Timestamp = msg.Timestamp
            chanMsg.deferred = msg.deferred
        }
        
        // 处理延时消息
        if chanMsg.deferred != 0 {
            channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
            continue
        }
        err := channel.PutMessage(chanMsg)
        if err != nil {
            t.ctx.nsqd.logf(LOG_ERROR,
                "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
                t.name, msg.ID, channel.name, err)
        }
    }
}
  • 由PutMessageDeferred()处理后会将延时消息放入deferredPQ队列,走超时消息的处理了流程,queueScanWorker线程处理
    • 延迟参数被设置到item.Priority,再由超时处理时取出判断是否超时
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

不出意外的话这应该是nsq消息队列的最后一更了,由于工作上没有实际使用所以只能想到什么写什么,后续想到其他的再补充

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 1. 介绍 最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基...
    aoho阅读 9,006评论 1 16
  • metaq是阿里团队的消息中间件,之前也有用过和了解过kafka,据说metaq是基于kafka的源码改过来的,他...
    菜鸟小玄阅读 33,070评论 0 14
  • 消息中间件选型分析 有很多网友留言:公司要做消息中间件选型,该如何选?你觉得哪个比较好?消息选型的确是一个大论题,...
    消失er阅读 2,068评论 0 24
  • 我自认为是一个感性的人,小时候就很爱哭,给同学们的印象都是一被老师留下或批评就会哭鼻子,现在也很爱哭看电影电视剧只...
    慵懒考拉阅读 320评论 0 0
  • 言语未能把我们的情意表达千万分之一 大侠尽管带上你的“秘籍”来匠臣流通,结同道中人,共建精神新帮派吧!
    2c366b3d53ae阅读 261评论 0 0