试着将nsq的tcp部分提出来,看看它是怎么处理沾包, 协议分装
这个过程以后自己写tcp对外提供服务应该也是可以做到有参考,有借鉴 一下就能上手去做这样的一件事情了
package main
import (
"bufio"
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"sync/atomic"
)
var (
clientIDSequence int64 = 0
separatorBytes = []byte(" ")
heartbeatBytes = []byte("_heartbeat_")//心跳
okBytes = []byte("OK")
)
const defaultBufferSize = 16 * 1024
type Client interface {
Close() error
}
type Protocol interface {
NewClient(net.Conn) Client
IOLoop(Client) error
}
type clientV2 struct {
ID int64 //为每个客户端连接 定义一个唯一id
Reader *bufio.Reader
Writer *bufio.Writer
}
func (c *clientV2) Close() (err error) {
return
}
type protocolV2 struct {
}
func (p *protocolV2) NewClient(conn net.Conn) Client {
clientID := atomic.AddInt64(&clientIDSequence, 1)
return &clientV2{
ID: clientID,
Reader: bufio.NewReaderSize(conn, defaultBufferSize),
Writer: bufio.NewWriterSize(conn, defaultBufferSize),
}
}
//PUB <topic_name>\n
//[ 4-byte size in bytes ][ N-byte binary data ]
//
//<topic_name> - a valid string (optionally having #ephemeral suffix)
//TODO 关于tcp通讯协议,nsq首先对命令做了区分,如果本身的命令是 fin 这种通知类型的命令,不存在业务数据,那直接用 readuntil(特殊字符分割方式) 就ok了
//TODO 如果是带有业务数据的这种通讯,很明显特殊字符就不好定义了,就还是先采用 \n 作为特殊字符不停的读, 读到命令后发现是带业务body的,就再按定长读4字节,读出来的就是接下来的body数据长度指标值了, 再io.ReadFull(这个秒) 连续读就行了
//TODO 所以我们如果要做一个通用的tcp处理handle 倒不如直接就预定成
//PUB PUB2 PUB3\n
//[ 4-byte size in bytes ][ N-byte binary data ]
//TODO PUB 这个就相当于命令类型,可以多级以空格分开。 这样设计的妙处就是哪怕你业务数据有 \n 这个特殊字符, 我也是优先读的第一段tcp协议头,然后再读4字节body长度指标,最后再读body内容
//TODO 不好的地方是带body的是读了3次, 但nsq根据 tcp协议头就断出了是否为绝对不带body通知消息,是的话就不需要再读了,通知消息就一次就读好了
//TODO 倒是兼容了2情况,如果通知频繁效率更高,如果全是带body的那就没必要这样了,换一种后面再分析
func (p *protocolV2) IOLoop(c Client) (err error) {
client, ok := c.(*clientV2)
if !ok {
return errors.New("client TYPE error")
}
for {
line, rErr := client.Reader.ReadSlice('\n')
if rErr != nil && rErr != io.EOF {
err = fmt.Errorf("failed to read command - %s", rErr)
break
}
line = line[:len(line)-1]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
params := bytes.Split(line, separatorBytes) // 例如: FIN 12 23 body数据
response,exErr:=p.Exec(client,params) //nsq 将客户端发送给服务端的信息,都是按\n分割,
checkErr(exErr)
//TODO 不存在需要返回就让response等于nil就行了
if response!=nil{
_,rErr=client.Writer.Write(response)
checkErr(rErr)
}
}
return
}
//都归功于约定好的协议,所以exec既充当了我们web中的路由分发
func (p *protocolV2) Exec(client *clientV2,params [][]byte) ([]byte, error) {
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.PUB(client,params)
}
return nil, errors.New(fmt.Sprintf("invalid command %s", params[0]))
}
//这个就是真正响应指令的handle了
//里面会用到golang的大小端将二进制数据转成[]byte
//最终按约定好的message格式(按位读),将[]byte渲染到message结构体上(可不是json通过 bind 绑定上去的)
//可谓是将数据流大小控制到了极致了呀~
func (p *protocolV2) PUB(client *clientV2,params [][]byte) ([]byte, error) {
lenSlice := make([]byte, 4)
io.ReadFull(client.Reader, lenSlice) //读取body的长度
bodyLen := int32(binary.BigEndian.Uint32(lenSlice))
body := make([]byte, bodyLen)
io.ReadFull(client.Reader, body) //读取后面约定好的body长度
return nil,nil
}
func checkErr(err error) {
//打日志
}
type TCPHandler interface {
Handle(net.Conn)
}
const (
PROTOCOlV2 = " V2"
)
func Handle(conn net.Conn) {
defer func() {
checkErr(conn.Close())
}()
//nsq的tcp协议,conn第一次被accept的时候,服务端会先读conn的前4个字节,这前四个字节代表conn是要走什么通讯协议
//这样拓展性就强了
buf:=make([]byte,4)
if _,err:=io.ReadFull(conn,buf);err!=nil{
checkErr(conn.Close())
return
}
protocolMagic := string(buf)
var prot Protocol
switch protocolMagic {
case PROTOCOlV2:
prot=&protocolV2{}
}
checkErr(prot.IOLoop(conn))
}
// TCPServer 起tcp服务,注意里面是for循环的,同步则永远阻塞
func TCPServer(listener net.Listener) {
for {
clientConn, aErr := listener.Accept()
if aErr!=nil{
continue
}
go func() {
Handle(clientConn)
}()
}
}
func main() {
listener,_:= net.Listen("tcp","0.0.0.0:8080")
TCPServer(listener)
}