主流程
- Server Listen tcp端口
- for循环Accept
- Accept后go handleConnection(conn)
- handleConnection创建srv service,srv相当于client
- 在service中又go了3个goroutine:processor(),receiver(),sender()
- service中创建了in buffer和out buffer,buffer是ring buffer
- receiver()收数据到in buffer
- sender()从out buffer取数据发出
- processor()处理mqtt业务逻辑,从in buffer中取出数据,发数据时写入outbuffer中
优缺点
从上面可以看出一个service也就是client占用了4个goroutine,不过使用了ringbuffer所以速度还是比较快,但是ringbuffer控制不好就有可能出错。
对比gnatsd的client只用1个goroutine来说,surgemq占用了太多的goroutine。
但是gnatsd使用了更复杂的锁控制,在程序逻辑上比surgemq更加复杂。
surgemq的mqtt消息使用Message类来解析处理,比gnatsd的parse更加清晰而且跟主程序耦合度更低。
surgemq还很不完整,不支持集群,不支持持久化等。
又仔细研究了一下surgemq,发现其使用的ringbuf占用内存惊人,一个ringbuf占用256K内存,而一个client连接开了in和out两个ringbuf一共512K字节,那么1千个client就占用512M字节,太可怕了。还有goroutine一个占用2k字节,一个client就去了8K,客户端连接多了以后也是不小的内存开销。
附
看了一下代码,实现了qos1,2的ack,但是没有实现重发......
比如qos1:收到客户端publish消息马上就发送一个ack,同时给其他sub发publish消息出去,并且保存在队列中等待sub的ack消息。但是如果一直没有收到sub的ack消息并不会重发publish,所以并不能保证消息到达sub。
另外看了一下号称继承surgemq的volantmq,已经改的面目全非,而且代码实在很难看。