Nsq从入门到实践

当nsq跑起来之后, 我们可能会遇到以下问题

  • 分布式部署
  • 处理错误(何时requeue)
  • 如何使用golang lib
  • 消息生命周期如何, 如重试/断线重连逻辑.

抱着不应该只停留在入门的态度, 笔者粗浅的研究了一下这几个问题, 希望也对有同样疑问的人有帮助.

分布式部署

注意:
由于NSQ的分布式网络结构, 必须为每一个NSQD分配单独的地址(如IP或host)以保证消费者在lookup找到NSQD节点后能够正确的连接到对于的NSQD节点, 这也就意味着需要好好的规划NSQD的广播地址. (下文会说到在Rancher中如何配置NSQD的广播地址)

NSQ推荐的部署方式是 让NSQD与TOPIC生产者一一对应 既一个生产者一个NSQD.

如: user服务需要发送一个Topic为register的消息, 那么就需要为user服务创建一个NSQD, 广播地址设置这台服务器的ip, 例如10.0.0.1. 现在消费者端需要连接上Nsqlookupd就能得到生产者NSQD节点的地址即10.0.0.1.

如果如果想要消除单点故障, 那么我们需要为user服务再添加一台服务器, 并且也在这台机器上添加一个Nsqd节点并连接上Nsqlookupd, 广播地址设置为这台服务器的ip, 如10.0.0.2, 现在消费者通过Nsqlookupd就能得到两个Nsqd节点的地址并连接了, 只要有一个服务器是"活"的, 那么整个系统就能正常使用.

详细可参考官方文档的拓扑图: topology_patterns

topology_patterns

在Rancher中部署

官方文档提供的分布式部署方式是多主机部署, 所以如何在Rancher中部署就只有自己实践了.

由于篇幅较多, 所以另起一篇.

NSQ Requeue And Backoff

这两个概念与作用些许复杂, 建议结合官方文档来看

requeue(重试)

当错误发生, 需要重试时就应该使用nsq的requeue功能.

backoff(避退)

backoff能降低消费者吞吐量以让消费者从错误中恢复.

当消费者在backoff状态时, 这个消费者将不再处理任何消息, 直到backoff超时

当触发backoff时控制台将打印:

// 进入backoff状态, RDY设置为0代表准备接收0条消息(不接收消息) (协议详情看 https://nsq.io/clients/tcp_protocol_spec.html)
WRN    1 [test/test] backing off for 1m4s (backoff level 6), setting all to RDY 0
// 时间到了将设置RDY为1接收1条消息以测试状态, 官方将这个状态称为`tests the waters`
WRN    1 [test/test] (DESKTOP-HELJ7V4:4150) backoff timeout expired, sending RDY 1

当有多个消费者竞争时, 出错的消费者应当主动backoff不再处理消息(以让出更多的机会给其他消费者).
如果只有一个消费者, 则消费者会等到backoff超时后才开始处理消息(空出时间让消费者恢复).

避退是存在于整个消费者上的, 所以消费者每当一个消息处理失败了之后都会增加这个消费者的backoff level. 这会影响这个消费者的处理能力.

到底需不需要用backoff, 就要看业务了:

  • 消息是用来更新数据库订单状态的, 这是一个不容易出错的逻辑, 如果需要requeue则需要backoff让出优先级, 让其他消费者来做, 尽量以挽救这个订单.
  • 消息是用来通知第三方(如支付宝支付成功的http回调)的, 一般requeue是发生在第三方端响应不满足预期的响应, 这不是我方消费者的错误, 应当不使用backoff, 避免阻塞消息消费.

参考:

Max in Flight

cnf := nsq.NewConfig()
cfg.MaxInFlight =200

MaxInFlight控制nsqd将多少消息同时发送给消费者, 默认是1, 意味着同时只有一个消息在被消费者处理, 如果你没有控制并发数量的需求, 建议设置为CPU的数量以提高性能.

参考: Question about concurrency and Max in Flight

golang lib

nsq提供golang的client lib. 支持全部特性.

本着不重复造轮子原则, 我也想尽大可能的使用nsq lib里的代码逻辑来实现需求, 但有些需求它实现不了:

  • 自定义requeue的等待时间
  • 判断某错误是否应该重试: 对于不应该重试的错误(如参数有误), 应该直接FIN, 而不是REQ.

我也只好自己写代码了.

先看看它原有的几个逻辑

// Handler is the message processing interface for Consumer
//
// Implement this interface for handlers that return whether or not message
// processing completed successfully.
//
// When the return value is nil Consumer will automatically handle FINishing.
//
// When the returned value is non-nil Consumer will automatically handle REQueing.
type Handler interface {
    HandleMessage(message *Message) error
}

消息自动重试(REQ)与完成(FIN):

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        message, ok := <-r.incomingMessages
        if !ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }

        err := handler.HandleMessage(message)
        if err != nil {
            r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }

        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

判断失败:

func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
    // message passed the max number of attempts
    if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
        r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
            message.ID, message.Attempts)

        logger, ok := handler.(FailedMessageLogger)
        if ok {
            logger.LogFailedMessage(message)
        }

        return true
    }
    return false
}

看懂它, 并根据需求改进它.

优化requeue逻辑

可以看到当handler返回的error不为空时, nsq将自动requeue, 这种重试是很方便但是也有坏处.

使用这个重试机制的坏处是:

  • 不能自定义requeue的等待时间(默认等待时间=config.DefaultRequeueDelay*Attempts)
  • 会在控制台打印一个ERR(不能自定义格式, 而且有一些err不应该打印到控制台), 这点可能有洁癖的开发者受不了.
  • 一些错误不应该重试, 如入参不合法, 再怎么重试也是徒劳. 这时候应该直接失败.

我建议不要使用这个err机制, 而应当手动使用msg.Requeue(-1)或者msg.RequeueWithoutBackoff(-1) 来显式requeue.

我的做法是再包裹一次Handler, 在闭包内部自定义错误逻辑. 我就不写上我乱糟糟的代码了, 您能实现得更简单.

nsqadmin

nsqadmin 提供一个web页面来管理nsq的消息/Topic/Channel.

Lookup

loopup

我们知道还没有生产者产生消息时(比如刚刚才部署), topic不存在, 这时如果有消费者连接上nsqlookup就会一直报错 topic not found, 为了避免这个报错, 就可以在Create Topic/Channel栏目中预先创建topic和channel.

比如下图是添加了名为test的topic.

可以看到提示说这个topic当前不活跃, 也就是只在nsqloopup新建了topic但是没有在任何nsqd里生产. 这个提示在nsq开始发送第一个消息后消失.

如何保证消息被至少投递一次

重试

在Handler中返回一个错误就会触发重试, 重试的消息被存储在nsq的Deferred队列, 一定延时后消费者会再次收到此消息.

断线恢复

发送给消费者的消息总会被nsq先存储在InFlight队列, 消费者处理完消息需要给nsq发送FIN消息, 这时nsq才算完成了消息的投递.

如果消费者没有发送FIN给nsq的话(如断线了)会出现什么情况? 在nsq后台有一个专门的协程处理InFlight队列, 当消息超过了一定时间还没有被FIN 则会重新加入队列发送给其他消费者.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,919评论 2 89
  • 如果智慧是成功的彼岸,读书则恰似一叶扁舟,满载希望与你共同驶向前方;如果智慧是快乐的门扇,读书则是一枚钥匙,打开心...
    S从心阅读 331评论 0 1
  • 类目系统是整个电商系统中的基础部分,比较容易被忽略,但确实是比较重要的一个部分,目前淘宝或者京东的商品数量已经有...
    DearNicole阅读 18,674评论 18 53
  • 一般情况下,右键拖动子view到父view上,xcode弹出选框如下: 我们可以看到,spacing只能关联到 T...
    jezong阅读 519评论 0 0