Go消息中间件Nsq系列(三)------apps/nsq_to_nsq源码阅读

上一篇: Go消息中间件Nsq系列(二)------Nsq目录结构

apps/nsq_to_nsq程序

功能描述: nsq客户端读取(消费)指定topic/channel数据,然后通过均衡策略由生产者再次发送

通过此次nsq_to_nsq程序源码阅读, 可以学习到

  1. goroutine channel 的简单使用方法
  2. flag的自定义, 还有透传技巧
  3. 负载均衡的简单实现, 和三方库 host-poolhttps://github.com/bitly/go-hostpool
  4. 耗时统计三方库 https://github.com/bitly/timer_metrics
  5. signal的简单使用, 和生产者消费者的使用案例

主要实现:

  1. 程序需要读取解析命令行参数
  2. N个消费者(Consumer)通过直连nsqd或者服务发现(nslookup)读取指定topic/channel数据,然后通知到配置的N个生产者(Producer)继续生产
  3. 生产者均衡使用了RoundRobin,HostPool 两种可选策略
  4. 使用timermetrics 去统计生产(publish)消息耗时
  5. 使用了signal对程序的收尾工作

源码实现

程序执行顺序: init( ) -> main()

func init() {
      //  自定义类型 实现 flag.Getter(组合flag.Value)接口
    flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
    flag.Var(&destNsqdTCPAddrs, "destination-nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
    flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
    flag.Var(&topics, "topic", "nsq topic (may be given multiple times)")
    flag.Var(&whitelistJSONFields, "whitelist-json-field", "for JSON messages: pass this field (may be given multiple times)")
}

init() 方法主要是自定义类型入参的映射, 自定义类型需要实现flag.Value接口

func main() {
    var selectedMode int
    // ...  省略
    //  一个技巧用法 实现其他参数透传
    flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
    // ...  省略
    // 命令行参数解析
    flag.Parse()

    if *showVersion {
        fmt.Printf("nsq_to_nsq v%s\n", version.Binary)
        return
    }
    // 一系列 输入参数校验 
    if len(topics) == 0 || *channel == "" {
    // ...  省略

    // 负载均衡策略
    switch *mode {
    case "round-robin": // 轮询的方式
        selectedMode = ModeRoundRobin
    case "hostpool", "epsilon-greedy": // 主机池轮询 --- 贪婪算法
        selectedMode = ModeHostPool
    }
      // 信号源的监听对程序进行收尾工作
    termChan := make(chan os.Signal, 1)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

    //多个消费者 maxInFlight 需要配置,否则接收会有问题
    cCfg.MaxInFlight = *maxInFlight

    // 生产者 添加进集合, key为地址
    producers := make(map[string]*nsq.Producer)
    // ... 省略,
       // 给生产者添加 耗时统计
    perAddressStatus := make(map[string]*timer_metrics.TimerMetrics)
       // ... 省略

    // 根据模式选择池子策略 默认的 或者  贪婪算法
    hostPool := hostpool.New(destNsqdTCPAddrs)
    if *mode == "epsilon-greedy" {
        hostPool = hostpool.NewEpsilonGreedy(destNsqdTCPAddrs, 0, &hostpool.LinearEpsilonValueCalculator{})
    }

    var consumerList []*nsq.Consumer

    // 生产者 处理封装
    publisher := &PublishHandler{
        addresses:        destNsqdTCPAddrs,
        producers:        producers,
        mode:             selectedMode,
        hostPool:         hostPool,
        respChan:         make(chan *nsq.ProducerTransaction, len(destNsqdTCPAddrs)),
        perAddressStatus: perAddressStatus,
        timermetrics:     timer_metrics.NewTimerMetrics(*statusEvery, "[aggregate]:"),
    }
    // 多消费者循环添加到 consumerList ,并添加handlermessage添加
    // ... 省略
        
    //  根据生产者个数去开启len个goroutine去异步处理发送结果以及统计 
    for i := 0; i < len(destNsqdTCPAddrs); i++ {
        go publisher.responder()
    }
    // 消费者直连配置的nsqds去连接消费
    for _, consumer := range consumerList {
        err := consumer.ConnectToNSQDs(nsqdTCPAddrs)
    // ... 省略

    // 通过服务发现去建立连接进行消费
    for _, consumer := range consumerList {
    err := consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
        
       // 收到中断信号, consumer 停止消费,释放并退出
    <-termChan // wait for signal
    for _, consumer := range consumerList {
        consumer.Stop()
    }
    for _, consumer := range consumerList {
        <-consumer.StopChan
    }
}

main() 方法主要做了解析命令行参数,根据配置创建producer,consumer,然后producer开启N个goroutine(responder())去异步处理消息与更新统计信息,consumer通过直连或者服务发现形式去进行消费, 最后就是信号源监听程序收尾工作

consumer消息消费..

// 主要是用来处理传递过来的消息
func (ph *PublishHandler) HandleMessage(m *nsq.Message, destinationTopic string) error {
    var err error
    msgBody := m.Body
        // 根据配置进行 消息过滤 主要两个函数
        //  1.  ph.shouldPassMessage(js);
        // 2.   filterMessage(js, msgBody)
        // ... 省略
         // 计时开始, 通过时间差Sub 进行耗时计算
    startTime := time.Now()

        // 根据均衡策略,生产者去发送异步消息
    switch ph.mode {
    case ModeRoundRobin:
        counter := atomic.AddUint64(&ph.counter, 1)
        idx := counter % uint64(len(ph.addresses))
        addr := ph.addresses[idx]
        p := ph.producers[addr]
                // 使用atomic原子操作, 自增 然后进行取模运算 轮询选取producer, 发布异步消息在ph.respChan进行通知
        err = p.PublishAsync(destinationTopic, msgBody, ph.respChan, m, startTime, addr)
    case ModeHostPool:
                // 在主机池里面根据算法获取生产者,然后发送异步消息
        hostPoolResponse := ph.hostPool.Get()
        p := ph.producers[hostPoolResponse.Host()]
        err = p.PublishAsync(destinationTopic, msgBody, ph.respChan, m, startTime, hostPoolResponse)
        if err != nil {
            hostPoolResponse.Mark(err)
        }
    }

    if err != nil {
        return err
    }
       //  禁用自动响应反馈 (就是auto finish)
    m.DisableAutoResponse()
    return nil
}

PublishHandler实现HandleMessage(), consumer的消费也是调用的该方法. 方法中主要就是过滤消息, 然后根据均衡策略算法去获取生产者,然后去发送异步消息, 禁用自动响应,使得异步消息结果在responder()方法中异步处理,
responder() 方法代码如下

func (ph *PublishHandler) responder() {
    var msg *nsq.Message
    var startTime time.Time
    var address string
    var hostPoolResponse hostpool.HostPoolResponse

    // ... 省略
             // 前面判断HostPoolResponse,就是从主机池里面获取的producer发送有无错误

        if success {
            msg.Finish() // 正常消费
        } else {
            msg.Requeue(-1)  // 重新入队列
        }
               // 更新统计状态
        ph.perAddressStatus[address].Status(startTime)
        ph.timermetrics.Status(startTime)
    }
}

responder() 是通过goroutine异步开启的, 主要是异步消息通知进行最终处理. 然后更新统计状态, 可能发生的问题就是producer发送出现故障了,导致消息没正常发送出去,就重新入队列

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容