Go消息中间件Nsq系列(四)------apps/nsq_to_file源码阅读

上一篇: Go消息中间件Nsq系列(三)------apps/nsq_to_nsq源码阅读

apps/nsq_to_file程序

功能描述: nsq客户端读取(消费)所有的topic数据,然后写入到文件,通过配置阈值或定时时间去切割消息记录文件

通过此次nsq_to_file程序源码阅读, 可以学习到

  1. flag参数, goroutine,channel,select使用案例
  2. 一些os,file的api, 比如os.HostName, OpenFile()
  3. 消费者consumer 结合channel,select使用
  4. go的接口实现封装, 例如FileLogger的Writer
  5. 文件的同步, fsync , page cache这个可以去百度一下
  6. gzip的使用,特别是关闭后再重新新开一个在Close方法,
  7. 其他慢慢体会

apps/nsq_to_file 目录结构

file_logger.go // 文件写出封装类
nsq_to_file.go // 程序入口
options.go // 程序配置参数
strftime.go // 时间格式化
topic_discoverer.go // 根据配置或者服务发现获取所有的topic开始消费消息, 监听信号进行收尾工作

程序执行从nsq_to_file.go, main()入口开始

func main() {
        // 解析输入参数, 与 默认opt 进行合并
       // 然后一系列参数效验
    fs := flagSet()
    fs.Parse(os.Args[1:])
       //  ...  省略
       //  填充配置数据, 监听singal信号,并以此做为topicDiscoverer构造参数, 
    cfg := nsq.NewConfig()
    // ... 省略
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
        // 开始消费消息, 写出文件
    discoverer := newTopicDiscoverer(logf, opts, cfg, hupChan, termChan)
    discoverer.run()
}

main() 实现了解析命令行参数与默认options合并, 然后在构造默认Config, 在监听三个Signal信号源, 通过这些参数作为TopicDiscoverer的构造函数, 并调用了discover.run()方法,下面接着看
topic_discoverer.go的run()方法

func (t *TopicDiscoverer) run() {
    var ticker <-chan time.Time
    // 如果没有配置topics, 则通过配置的定时器去lookupd查找
    if len(t.opts.Topics) == 0 {
        ticker = time.Tick(t.opts.TopicRefreshInterval)
    }
    // 为新topic进行分配goroutine消费消息写出文件
    t.updateTopics(t.opts.Topics)
forloop:
    for {
        select {
        case <-ticker:
            // 获取所有的topics
            newTopics, err := t.ci.GetLookupdTopics(t.opts.NSQLookupdHTTPAddrs)
             // ... 省略
            t.updateTopics(newTopics)
        case <-t.termChan:
            //收中断,退出信号,遍历topics逐个关闭消费,并退出循环
            for _, fl := range t.topics {
                close(fl.termChan)
            }
            break forloop
        case <-t.hupChan:
            //  收到hup信号, 遍历topics逐个往fl.hupChan发送通知
            for _, fl := range t.topics {
                fl.hupChan <- true
            }
        }
    }
    // Wait()方法阻塞直到WaitGroup计数器减为0。
    t.wg.Wait()
}

run() 实现了如果配置了topics那么将不使用定时轮询去lookupd去获取所有topics,否则将使用配置的轮询时间去调用ci.GetLookupdTopics(),获取所有的topics.获取的topics交由函数updateTopics()去处理. 然后监听signal信号处理程序中断,关闭释放工作

func (t *TopicDiscoverer) updateTopics(topics []string) {
    for _, topic := range topics {
        if _, ok := t.topics[topic]; ok {
            continue
        }
        if !t.isTopicAllowed(topic) {
            continue
        }
        fl, err := NewFileLogger(t.logf, t.opts, topic, t.cfg)
        if err != nil {
            continue
        }
        t.topics[topic] = fl
        t.wg.Add(1)
        go func(fl *FileLogger) {
            fl.router()
            t.wg.Done()
        }(fl)
    }
}

updateTopics() 方法功能主要是遍历所有topics 检查是否已经分配消息消费写出(fileLogger), 是否过滤该topic,否则分配fileLogger,使用waitGroup 添加goroutine异步调用fileLogger.router() 方法
整个nsq_to_file代码的核心就在file_logger, FileLogger实现了Handler,Writer接口,在NewFileLogger()的时候,通过传递进来的配置,topic去计算输出文件名称,初始化FileLogger,然后初始化消费者,连接进行消费消息

func NewFileLogger(logf lg.AppLogFunc, opts *Options, topic string, cfg *nsq.Config) (*FileLogger, error) {
       // 计算文件名
    computedFilenameFormat, err := computeFilenameFormat(opts, topic)
    // 初始化 消息消费者
    consumer, err := nsq.NewConsumer(topic, opts.Channel, cfg)
    
    f := &FileLogger{
  // ...  省略, 实现了 HandleMessage(m *nsq.Message) error 方法
    }
   // 接收到的消息通过channel (logChan )转发,在router()方法进行处理
    consumer.AddHandler(f)
        // 连接消费者
    //consumer.ConnectToNSQDs(opts.NSQDTCPAddrs)
       //consumer.ConnectToNSQLookupds(opts.NSQLookupdHTTPAddrs)
    return f, nil
}

在初始化FileLogger后 调用router( ) 方法开始处理数据

func (f *FileLogger) router() {
    pos := 0
    output := make([]*nsq.Message, f.opts.MaxInFlight)
    sync := false
    // 同步定时器
    ticker := time.NewTicker(f.opts.SyncInterval)
    closeFile := false
    exit := false

    for {
        select {
        // 接收到consumer停止信号, 退出循环 关闭并文件
        case <-f.consumer.StopChan:
            sync = true
            closeFile = true
            exit = true
        // 接收到中断,程序退出信号, 关闭定时器,停止消息消费,同步文件
        case <-f.termChan:
            ticker.Stop()
            f.consumer.Stop()
            sync = true
        // 接收到挂起暂停信号, 关闭并同步文件
        case <-f.hupChan:
            sync = true
            closeFile = true
        // 同步定时器, 是否需要切割文件, 在进行同步
        case <-ticker.C:
            if f.needsRotation() {
                if f.opts.SkipEmptyFiles {
                    closeFile = true
                } else {
                    f.updateFile()
                }
            }
            sync = true
        // 接收到新的消息, 是否需要切割文件, 然后写入数据流, 并记录阈值 满了在同步
        case m := <-f.logChan:
            if f.needsRotation() {
                f.updateFile()
                sync = true
            }
            _, err := f.Write(m.Body)
        // ...  省略
            _, err = f.Write([]byte("\n"))
        // ...  省略
            output[pos] = m
            pos++
            if pos == cap(output) {
                sync = true
            }
        }
        // 同步标志 或者 可能RDY为0但是未关闭的状态
        if sync || f.consumer.IsStarved() {
            if pos > 0 {
        // ...  省略
                err := f.Sync() // fsync同步
        // ...  省略
                // 阈值递减, 消息反馈已消费, 释放
                for pos > 0 {
                    pos--
                    m := output[pos]
                    m.Finish()
                    output[pos] = nil
                }
            }
            // 重置标志位
            sync = false
        }
    // FileLogger Close() 方法功能概要
    // 1. 释放文件资源之前先同步数据
    // 2. 如果需要,将文件从工作目录移动到输出目录,注意不要覆盖现有文件,如果文件移动出现失败, 使用rev序号拼接文件名进行重试
        if closeFile {
            f.Close()
            closeFile = false
        }
        // 退出循环
        if exit {
            break
        }
    }
}

router( ) 方法主要功能是开启定时同步落盘数据, 等待接收新消息,记录阈值,满值在进行同步落盘. 等待挂起,中断,退出信号去关闭同步文件,停止consumer消费.
router中调用的updateFile()方法代码如下,

func (f *FileLogger) updateFile() {
    // FileLogger Close() 方法功能概要
    // 1. 释放文件资源之前先同步数据
    // 2. 如果需要,将文件从工作目录移动到输出目录,注意不要覆盖现有文件,如果文件移动出现失败, 使用rev序号拼接文件名进行重试
    f.Close() // uses current f.filename and f.rev to resolve rename dst conflict

    // 文件名格式 "<TOPIC>.<HOST><REV>.<DATETIME>.log",
    filename := f.currentFilename()
    if filename != f.filename {
        f.rev = 0 // reset revision to 0 if it is a new filename
    } else {
        f.rev++
    }
    f.filename = filename
    f.openTime = time.Now()

    //  创建文件夹
    fullPath := path.Join(f.opts.WorkDir, filename)
    err := makeDirFromPath(f.logf, fullPath)

    if err != nil {
        f.logf(lg.FATAL, "[%s/%s] unable to create dir: %s", f.topic, f.opts.Channel, err)
        os.Exit(1)
    }

    var fi os.FileInfo
    // 文件命名冲突检测
    // 需要开启gzip和需要切割文件的情况新建,或者就是追加模式
    // 切割文件判断时间 和 文件大小
    // 打开文件流, 
    for ; ; f.rev++ {
        absFilename := strings.Replace(fullPath, "<REV>", fmt.Sprintf("-%06d", f.rev), -1)
        // If we're using a working directory for in-progress files,
        // proactively check for duplicate file names in the output dir to
        // prevent conflicts on rename in the normal case
        if f.opts.WorkDir != f.opts.OutputDir {
            outputFileName := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(absFilename, f.opts.WorkDir))
            err := makeDirFromPath(f.logf, outputFileName)
            if err != nil {
                f.logf(lg.FATAL, "[%s/%s] unable to create dir: %s", f.topic, f.opts.Channel, err)
                os.Exit(1)
            }

            _, err = os.Stat(outputFileName)
            if err == nil {
                f.logf(lg.WARN, "[%s/%s] output file already exists: %s", f.topic, f.opts.Channel, outputFileName)
                continue // next rev
            } else if !os.IsNotExist(err) {
                f.logf(lg.FATAL, "[%s/%s] unable to stat output file %s: %s", f.topic, f.opts.Channel, outputFileName, err)
                os.Exit(1)
            }
        }

        openFlag := os.O_WRONLY | os.O_CREATE
        if f.opts.GZIP || f.opts.RotateInterval > 0 {
            openFlag |= os.O_EXCL
        } else {
            openFlag |= os.O_APPEND
        }
        f.out, err = os.OpenFile(absFilename, openFlag, 0666)
        if err != nil {
            if os.IsExist(err) {
                f.logf(lg.WARN, "[%s/%s] working file already exists: %s", f.topic, f.opts.Channel, absFilename)
                continue // next rev
            }
            f.logf(lg.FATAL, "[%s/%s] unable to open %s: %s", f.topic, f.opts.Channel, absFilename, err)
            os.Exit(1)
        }

        f.logf(lg.INFO, "[%s/%s] opening %s", f.topic, f.opts.Channel, absFilename)

        fi, err = f.out.Stat()
        if err != nil {
            f.logf(lg.FATAL, "[%s/%s] unable to stat file %s: %s", f.topic, f.opts.Channel, f.out.Name(), err)
        }
        f.filesize = fi.Size()

        if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize {
            f.logf(lg.INFO, "[%s/%s] %s currently %d bytes (> %d), rotating...",
                f.topic, f.opts.Channel, f.out.Name(), f.filesize, f.opts.RotateSize)
            continue // next rev
        }

        break // good file
    }

    if f.opts.GZIP {
        f.gzipWriter, _ = gzip.NewWriterLevel(f.out, f.opts.GZIPLevel)
        f.writer = f.gzipWriter
    } else {
        f.writer = f.out
    }
}

还有其他函数,例如currentFilename(),needsRotation(),makeDirFromPath(),exclusiveRename(),computeFilenameFormat()就不细说了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容