go-nsq发送消息流程:
config := nsq.NewConfig()
// 创建producer
producer, err := nsq.NewProducer("192.168.200.151:4150", config)
if err != nil {
log.Fatal(err)
}
messageBody := []byte(msg)
topicName := "test"
// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err := producer.Publish(topicName, messageBody)
发送消息的时候
func (w *Producer) Publish(topic string, body []byte) error {
return w.sendCommand(Publish(topic, body))
}
// 构建publish command
func Publish(topic string, body []byte) *Command {
var params = [][]byte{[]byte(topic)}
return &Command{[]byte("PUB"), params, body}
}
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan //同步发送,等待返回
return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect() // 注意这里connect
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
在producer的connect中调用conn的Connect
_, err := w.conn.Connect()
在创建连接之后,需要发送下面的数据
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
c.r = conn
c.w = conn
// 这里的MagicV2是 " V2"
_, err = c.Write(MagicV2)
if err != nil {
c.Close()
return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
}
// 这里发送command "IDENTIFY"
resp, err := c.identify()
if err != nil {
return nil, err
}
if resp != nil && resp.AuthRequired {
if c.config.AuthSecret == "" {
c.log(LogLevelError, "Auth Required")
return nil, errors.New("Auth Required")
}
err := c.auth(c.config.AuthSecret)
if err != nil {
c.log(LogLevelError, "Auth Failed %s", err)
return nil, err
}
}
c.wg.Add(2)
atomic.StoreInt32(&c.readLoopRunning, 1)
go c.readLoop()
go c.writeLoop()
return resp, nil
}
从上面代码中看到
_, err = c.Write(MagicV2) // 发送" V2"
resp, err := c.identify() // 这里发送command "IDENTIFY"
通过抓包看到
-
发送版本号 " V2"
len=8 IDENTIFY
-
发送字符identify
-
身份信息
-
发送PUB 命令, topic:test body: hello world1
下一节我们分析怎么处理上述命令