上一篇: Go消息中间件Nsq系列(二)------Nsq目录结构
apps/nsq_to_nsq程序
功能描述: nsq客户端读取(消费)指定topic/channel数据,然后通过均衡策略由生产者再次发送
通过此次nsq_to_nsq程序源码阅读, 可以学习到
- goroutine channel 的简单使用方法
- flag的自定义, 还有透传技巧
- 负载均衡的简单实现, 和三方库 host-poolhttps://github.com/bitly/go-hostpool
- 耗时统计三方库 https://github.com/bitly/timer_metrics
- signal的简单使用, 和生产者消费者的使用案例
主要实现:
- 程序需要读取解析命令行参数
- N个消费者(Consumer)通过直连nsqd或者服务发现(nslookup)读取指定topic/channel数据,然后通知到配置的N个生产者(Producer)继续生产
- 生产者均衡使用了RoundRobin,HostPool 两种可选策略
- 使用timermetrics 去统计生产(publish)消息耗时
- 使用了signal对程序的收尾工作
源码实现
程序执行顺序: init( ) -> main()
func init() {
// 自定义类型 实现 flag.Getter(组合flag.Value)接口
flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&destNsqdTCPAddrs, "destination-nsqd-tcp-address", "destination nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
flag.Var(&topics, "topic", "nsq topic (may be given multiple times)")
flag.Var(&whitelistJSONFields, "whitelist-json-field", "for JSON messages: pass this field (may be given multiple times)")
}
init() 方法主要是自定义类型入参的映射, 自定义类型需要实现flag.Value接口
func main() {
var selectedMode int
// ... 省略
// 一个技巧用法 实现其他参数透传
flag.Var(&nsq.ConfigFlag{cCfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
// ... 省略
// 命令行参数解析
flag.Parse()
if *showVersion {
fmt.Printf("nsq_to_nsq v%s\n", version.Binary)
return
}
// 一系列 输入参数校验
if len(topics) == 0 || *channel == "" {
// ... 省略
// 负载均衡策略
switch *mode {
case "round-robin": // 轮询的方式
selectedMode = ModeRoundRobin
case "hostpool", "epsilon-greedy": // 主机池轮询 --- 贪婪算法
selectedMode = ModeHostPool
}
// 信号源的监听对程序进行收尾工作
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
//多个消费者 maxInFlight 需要配置,否则接收会有问题
cCfg.MaxInFlight = *maxInFlight
// 生产者 添加进集合, key为地址
producers := make(map[string]*nsq.Producer)
// ... 省略,
// 给生产者添加 耗时统计
perAddressStatus := make(map[string]*timer_metrics.TimerMetrics)
// ... 省略
// 根据模式选择池子策略 默认的 或者 贪婪算法
hostPool := hostpool.New(destNsqdTCPAddrs)
if *mode == "epsilon-greedy" {
hostPool = hostpool.NewEpsilonGreedy(destNsqdTCPAddrs, 0, &hostpool.LinearEpsilonValueCalculator{})
}
var consumerList []*nsq.Consumer
// 生产者 处理封装
publisher := &PublishHandler{
addresses: destNsqdTCPAddrs,
producers: producers,
mode: selectedMode,
hostPool: hostPool,
respChan: make(chan *nsq.ProducerTransaction, len(destNsqdTCPAddrs)),
perAddressStatus: perAddressStatus,
timermetrics: timer_metrics.NewTimerMetrics(*statusEvery, "[aggregate]:"),
}
// 多消费者循环添加到 consumerList ,并添加handlermessage添加
// ... 省略
// 根据生产者个数去开启len个goroutine去异步处理发送结果以及统计
for i := 0; i < len(destNsqdTCPAddrs); i++ {
go publisher.responder()
}
// 消费者直连配置的nsqds去连接消费
for _, consumer := range consumerList {
err := consumer.ConnectToNSQDs(nsqdTCPAddrs)
// ... 省略
// 通过服务发现去建立连接进行消费
for _, consumer := range consumerList {
err := consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
// 收到中断信号, consumer 停止消费,释放并退出
<-termChan // wait for signal
for _, consumer := range consumerList {
consumer.Stop()
}
for _, consumer := range consumerList {
<-consumer.StopChan
}
}
main() 方法主要做了解析命令行参数,根据配置创建producer,consumer,然后producer开启N个goroutine(responder())去异步处理消息与更新统计信息,consumer通过直连或者服务发现形式去进行消费, 最后就是信号源监听程序收尾工作
consumer消息消费..
// 主要是用来处理传递过来的消息
func (ph *PublishHandler) HandleMessage(m *nsq.Message, destinationTopic string) error {
var err error
msgBody := m.Body
// 根据配置进行 消息过滤 主要两个函数
// 1. ph.shouldPassMessage(js);
// 2. filterMessage(js, msgBody)
// ... 省略
// 计时开始, 通过时间差Sub 进行耗时计算
startTime := time.Now()
// 根据均衡策略,生产者去发送异步消息
switch ph.mode {
case ModeRoundRobin:
counter := atomic.AddUint64(&ph.counter, 1)
idx := counter % uint64(len(ph.addresses))
addr := ph.addresses[idx]
p := ph.producers[addr]
// 使用atomic原子操作, 自增 然后进行取模运算 轮询选取producer, 发布异步消息在ph.respChan进行通知
err = p.PublishAsync(destinationTopic, msgBody, ph.respChan, m, startTime, addr)
case ModeHostPool:
// 在主机池里面根据算法获取生产者,然后发送异步消息
hostPoolResponse := ph.hostPool.Get()
p := ph.producers[hostPoolResponse.Host()]
err = p.PublishAsync(destinationTopic, msgBody, ph.respChan, m, startTime, hostPoolResponse)
if err != nil {
hostPoolResponse.Mark(err)
}
}
if err != nil {
return err
}
// 禁用自动响应反馈 (就是auto finish)
m.DisableAutoResponse()
return nil
}
PublishHandler实现HandleMessage(), consumer的消费也是调用的该方法. 方法中主要就是过滤消息, 然后根据均衡策略算法去获取生产者,然后去发送异步消息, 禁用自动响应,使得异步消息结果在responder()方法中异步处理,
responder() 方法代码如下
func (ph *PublishHandler) responder() {
var msg *nsq.Message
var startTime time.Time
var address string
var hostPoolResponse hostpool.HostPoolResponse
// ... 省略
// 前面判断HostPoolResponse,就是从主机池里面获取的producer发送有无错误
if success {
msg.Finish() // 正常消费
} else {
msg.Requeue(-1) // 重新入队列
}
// 更新统计状态
ph.perAddressStatus[address].Status(startTime)
ph.timermetrics.Status(startTime)
}
}
responder() 是通过goroutine异步开启的, 主要是异步消息通知进行最终处理. 然后更新统计状态, 可能发生的问题就是producer发送出现故障了,导致消息没正常发送出去,就重新入队列