nsq_to_file创建消费者,读取nsq消息并写入文件中,支持topic的模糊匹配和实时更新,以及消息落文件的定时rotate等设置。研究nsq_to_file的代码有助于理解nsq消息队列的工作流程。
创建消费者的代码为:
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
if err != nil {
return nil, err
}
consumer, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
}
consumer.AddHandler(f)
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
log.Fatal(err)
}
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
log.Fatal(err)
}
return &ConsumerFileLogger{
C: consumer,
F: f,
}, nil
}
其中调用了go-nsq库。在go-nsq中通过consumer.ConnectToNSQLookupds,调用nsqlookupd的/lookup接口查询当前的所有生产者。然后连接每一个nsqd,并订阅相应的channel(不指定就默认为"nsq_to_file",如果nsqd
不存在该channel会自动创建)。
当与nsqd的TCP连接收到消息时,go-nsq中会回调消费者的HandleMessage函数。
func (f *FileLogger) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
f.logChan <- m
return nil
}
func (f *FileLogger) router(r *nsq.Consumer) {
pos := 0
output := make([]*nsq.Message, *maxInFlight)
sync := false
ticker := time.NewTicker(time.Duration(30) * time.Second)
closing := false
closeFile := false
exit := false
for {
select {
case <-r.StopChan:
sync = true
closeFile = true
exit = true
case <-f.termChan:
ticker.Stop()
r.Stop()
sync = true
closing = true
case <-f.hupChan:
sync = true
closeFile = true
case <-ticker.C:
if f.needsFileRotate() {
if *skipEmptyFiles {
closeFile = true
} else {
f.updateFile()
}
}
sync = true
case m := <-f.logChan:
if f.needsFileRotate() {
f.updateFile()
sync = true
}
_, err := f.writer.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: writing message to disk - %s", err)
}
_, err = f.writer.Write([]byte("\n"))
if err != nil {
log.Fatalf("ERROR: writing newline to disk - %s", err)
}
output[pos] = m
pos++
if pos == cap(output) {
sync = true
}
}
if closing || sync || r.IsStarved() {
if pos > 0 {
log.Printf("syncing %d records to disk", pos)
err := f.Sync()
if err != nil {
log.Fatalf("ERROR: failed syncing messages - %s", err)
}
for pos > 0 {
pos--
m := output[pos]
m.Finish()
output[pos] = nil
}
}
sync = false
}
if closeFile {
f.Close()
closeFile = false
}
if exit {
close(f.ExitChan)
break
}
}
}