本文通过回顾 teleport (https://github.com/henrylee2cn/teleport) 框架的开发过程,讲述Go Socket的开发实战经验。
本文的内容组织形式:teleport架构源码赏析+相应Go技巧分享
期间,我们可以分别从这两条线进行思考与探讨。
文中以TP
作为teleport
的简称
文中内容针对具有一定Go语言基础的开发者
文中以Go技巧
是指高于语法常识的一些编程技巧、设计模式
为了压缩篇幅,代码块中删除了一些空行,并使用...
表示省略行
目 录 |
---|
TP性能测试 |
第一部分 TP架构设计 |
第二部分 TP关键源码赏析及相关Go技巧 |
TP性能测试
TP与其他使用长连接的框架的性能对比:
测试用例
- 一个服务端与一个客户端进程,在同一台机器上运行
- CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
- Memory: 16G
- OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
- Go: 1.9.2
- 信息大小: 581 bytes
- 信息编码:protobuf
- 发送 1000000 条信息
测试结果
- teleport
并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
---|---|---|---|---|---|
100 | 1 | 0 | 16 | 0 | 75505 |
500 | 9 | 11 | 97 | 0 | 52192 |
1000 | 19 | 24 | 187 | 0 | 50040 |
2000 | 39 | 54 | 409 | 0 | 42551 |
5000 | 96 | 128 | 1148 | 0 | 46367 |
- teleport/socket
并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
---|---|---|---|---|---|
100 | 0 | 0 | 14 | 0 | 225682 |
500 | 2 | 1 | 24 | 0 | 212630 |
1000 | 4 | 3 | 51 | 0 | 180733 |
2000 | 8 | 6 | 64 | 0 | 183351 |
5000 | 21 | 18 | 651 | 0 | 133886 |
- 与rpcx的对比
并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
---|---|---|---|---|---|
100 | 0 | 0 | 50 | 0 | 109217 |
500 | 5 | 4 | 50 | 0 | 88113 |
1000 | 11 | 10 | 1040 | 0 | 87535 |
2000 | 23 | 29 | 3080 | 0 | 80886 |
5000 | 59 | 72 | 7111 | 0 | 78412 |
- rpcx与其他框架的对比参考(图片来源于rpcx)
- CPU耗时火焰图 teleport/socket
- 堆栈信息火焰图 teleport/socket
第一部分 TP架构设计
1 设计理念
TP定位于提供socket通信解决方案,遵循以下三点设计理念。
- 通用:不做定向深入,专注长连接通信
- 高效:高性能,低消耗
- 灵活:用法灵活简单,易于深入定制
- 可靠:使用接口(
interface
)而非约束说明,规定框架用法
2 架构图
-
Peer
: 通信端点,可以是服务端或客户端 -
Plugin
: 贯穿于通信各个环节的插件 -
Handler
: 用于处理推、拉请求的函数 -
Router
: 通过请求信息(如URI)索引响应函数(Handler)的路由器 -
Socket
: 对net.Conn的封装,增加自定义包协议、传输管道等功能 -
Session
: 基于Socket封装的连接会话,提供的推、拉、回复、关闭等会话操作 -
Context
: 连接会话中一次通信(如PULL-REPLY, PUSH)的上下文对象 -
Packet
: 约定数据报文包含的内容元素(注意:它不是协议格式) -
Protocol
: 数据报文封包解包操作,即通信协议的实现接口 -
Codec
: 数据包body部分(请求参数或响应结果)的序列化接口 -
XferPipe
: 数据包字节流的编码处理管道,如压缩、加密、校验等
3 重要特性
- 支持自定义通信协议和包数据处理管道
- 使用I/O缓冲区与多路复用技术,提升数据吞吐量
- 支持设置读取包的大小限制(如果超出则断开连接)
- 支持插件机制,可以自定义认证、心跳、微服务注册中心、统计信息插件等
- 服务端和客户端之间对等通信,统一为peer端点,具有基本一致的用法:
- 推、拉、回复等通信方法
- 丰富的插件挂载点,可以自定义认证、心跳、微服务注册中心、统计信息等等
- 平滑重启与关闭
- 日志信息详尽,支持打印输入、输出消息的详细信息(状态码、消息头、消息体)
- 支持设置慢操作报警阈值
- 提供Hander的上下文(pull、push的handler)
- 客户端的Session支持断线后自动重连
- 支持的网络类型:
tcp
、tcp4
、tcp6
、unix
、unixpacket
等
第二部分 TP关键源码赏析及相关Go技巧
---------------------------以下为github.com/henrylee2cn/telepot/socket
包内容---------------------------
1 Packet统一数据包元素
Packet
结构体用于定义统一的数据包内容元素,为上层架构提供稳定、统一的操作API。
$ Go技巧分享
1. 在teleport/socket
目录下执行go doc Packet
命令,我们可以获得以下关于Packet
的定义、函数与方法:
type Packet struct {
// Has unexported fields.
}
Packet a socket data packet.
func GetPacket(settings ...PacketSetting) *Packet
func NewPacket(settings ...PacketSetting) *Packet
func (p *Packet) AppendXferPipeFrom(src *Packet)
func (p *Packet) Body() interface{}
func (p *Packet) BodyCodec() byte
func (p *Packet) MarshalBody() ([]byte, error)
func (p *Packet) Meta() *utils.Args
func (p *Packet) Ptype() byte
func (p *Packet) Reset(settings ...PacketSetting)
func (p *Packet) Seq() uint64
func (p *Packet) SetBody(body interface{})
func (p *Packet) SetBodyCodec(bodyCodec byte)
func (p *Packet) SetNewBody(newBodyFunc NewBodyFunc)
func (p *Packet) SetPtype(ptype byte)
func (p *Packet) SetSeq(seq uint64)
func (p *Packet) SetSize(size uint32) error
func (p *Packet) SetUri(uri string)
func (p *Packet) Size() uint32
func (p *Packet) String() string
func (p *Packet) UnmarshalBody(bodyBytes []byte) error
func (p *Packet) UnmarshalNewBody(bodyBytes []byte) error
func (p *Packet) Uri() string
func (p *Packet) XferPipe() *xfer.XferPipe
2. Packet
全部字段均不可导出,可以增强代码稳定性以及对其操作的掌控力
3. 下面是由Packet
结构体实现的两个接口Header
和Body
。思考:为什么不直接使用Packet
或者定义两个子结构体?
- 使用接口可以达到限制调用方法的目的,不同情况下使用不同方法集,开发者不会因为调用了不该调用的方法而掉坑里
- 在语义上,
Packet
只是用于定义统一的数据包内容元素,并未给予任何关于数据结构方面(协议)的暗示、误导。因此不应该使用子结构体
type (
// packet header interface
Header interface {
// Ptype returns the packet sequence
Seq() uint64
// SetSeq sets the packet sequence
SetSeq(uint64)
// Ptype returns the packet type, such as PULL, PUSH, REPLY
Ptype() byte
// Ptype sets the packet type
SetPtype(byte)
// Uri returns the URL string string
Uri() string
// SetUri sets the packet URL string
SetUri(string)
// Meta returns the metadata
Meta() *utils.Args
}
// packet body interface
Body interface {
// BodyCodec returns the body codec type id
BodyCodec() byte
// SetBodyCodec sets the body codec type id
SetBodyCodec(bodyCodec byte)
// Body returns the body object
Body() interface{}
// SetBody sets the body object
SetBody(body interface{})
// SetNewBody resets the function of geting body.
SetNewBody(newBodyFunc NewBodyFunc)
// MarshalBody returns the encoding of body.
MarshalBody() ([]byte, error)
// UnmarshalNewBody unmarshal the encoded data to a new body.
// Note: seq, ptype, uri must be setted already.
UnmarshalNewBody(bodyBytes []byte) error
// UnmarshalBody unmarshal the encoded data to the existed body.
UnmarshalBody(bodyBytes []byte) error
}
// NewBodyFunc creates a new body by header.
NewBodyFunc func(Header) interface{}
)
4. 编译期校验Packet
是否已实现Header
与Body
接口的技巧
var (
_ Header = new(Packet)
_ Body = new(Packet)
)
5. 一种常见的自由赋值的函数用法,用于自由设置Packet
的字段
// PacketSetting sets Header field.
type PacketSetting func(*Packet)
// WithSeq sets the packet sequence
func WithSeq(seq uint64) PacketSetting {
return func(p *Packet) {
p.seq = seq
}
}
// Ptype sets the packet type
func WithPtype(ptype byte) PacketSetting {
return func(p *Packet) {
p.ptype = ptype
}
}
...
2 Socket接口
Socket
接口是对net.Conn
的封装,通过协议接口Proto
对数据包内容元素Packet
进行封包、解包与IO传输操作。
type (
// Socket is a generic stream-oriented network connection.
//
// Multiple goroutines may invoke methods on a Socket simultaneously.
Socket interface {
net.Conn
// WritePacket writes header and body to the connection.
// Note: must be safe for concurrent use by multiple goroutines.
WritePacket(packet *Packet) error
// ReadPacket reads header and body from the connection.
// Note: must be safe for concurrent use by multiple goroutines.
ReadPacket(packet *Packet) error
// Public returns temporary public data of Socket.
Public() goutil.Map
// PublicLen returns the length of public data of Socket.
PublicLen() int
// Id returns the socket id.
Id() string
// SetId sets the socket id.
SetId(string)
// Reset reset net.Conn and ProtoFunc.
Reset(netConn net.Conn, protoFunc ...ProtoFunc)
}
socket struct {
net.Conn
protocol Proto
id string
idMutex sync.RWMutex
ctxPublic goutil.Map
mu sync.RWMutex
curState int32
fromPool bool
}
)
$ Go技巧分享
1. 为什么要对外提供接口,而不直接公开结构体?
socket
结构体通过匿名字段net.Conn
的方式“继承”了底层的连接操作方法,并基于该匿名字段创建了协议对象。
所以不能允许外部直接通过socket.Conn=newConn
的方式改变连接句柄。
使用Socket
接口封装包外不可见的socket
结构体可达到避免外部直接修改字段的目的。
2. 读写锁遵循最小化锁定的原则,且defer
绝不是必须的,在确定运行安全的情况下尽量避免使用有性能消耗的defer
。
func (s *socket) ReadPacket(packet *Packet) error {
s.mu.RLock()
protocol := s.protocol
s.mu.RUnlock()
return protocol.Unpack(packet)
}
3 Proto协议接口
Proto
接口按照实现它的具体规则,对Packet
数据包内容元素进行封包、解包、IO等操作。
type (
// Proto pack/unpack protocol scheme of socket packet.
Proto interface {
// Version returns the protocol's id and name.
Version() (byte, string)
// Pack pack socket data packet.
// Note: Make sure to write only once or there will be package contamination!
Pack(*Packet) error
// Unpack unpack socket data packet.
// Note: Concurrent unsafe!
Unpack(*Packet) error
}
// ProtoFunc function used to create a custom Proto interface.
ProtoFunc func(io.ReadWriter) Proto
// FastProto fast socket communication protocol.
FastProto struct {
id byte
name string
r io.Reader
w io.Writer
rMu sync.Mutex
}
)
$ Go技巧分享
1. 将数据包的封包、解包操作封装为Proto
接口,并定义一个默认实现(FastProto
)。
这是框架设计中增强可定制性的一种有效手段。开发者既可以使用默认实现,也可以根据特殊需求定制自己的个性实现。
2. 使用Packet
屏蔽不同协议的差异性:封包时以Packet
的字段为内容元素进行数据序列化,解包时以Packet
为内容模板进行数据的反序列化。
---------------------------以下为github.com/henrylee2cn/telepot/codec
包内容---------------------------
4 Codec编解码
Codec
接口是socket.Packet.body
的编解码器。TP已默认注册了JSON、Protobuf、String三种编解码器。
type (
// Codec makes Encoder and Decoder
Codec interface {
// Id returns codec id.
Id() byte
// Name returns codec name.
Name() string
// Marshal returns the encoding of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the encoded data and stores the result
// in the value pointed to by v.
Unmarshal(data []byte, v interface{}) error
}
)
$ Go技巧分享
1. 下面codecMap
变量的类型为什么不用关键字type
定义?
var codecMap = struct {
nameMap map[string]Codec
idMap map[byte]Codec
}{
nameMap: make(map[string]Codec),
idMap: make(map[byte]Codec),
}
Go语法允许我们在声明变量时临时定义类型并赋值。因为codecMap
所属类型只会有一个全局唯一的实例,且不会用于其他变量类型声明上,所以直接在声明变量时声明类型可以令代码更简洁。
2. 常用的依赖注入实现方式,实现编解码器的自由定制
const (
NilCodecId byte = 0
NilCodecName string = ""
)
func Reg(codec Codec) {
if codec.Id() == NilCodecId {
panic(fmt.Sprintf("codec id can not be %d", NilCodecId))
}
if _, ok := codecMap.nameMap[codec.Name()]; ok {
panic("multi-register codec name: " + codec.Name())
}
if _, ok := codecMap.idMap[codec.Id()]; ok {
panic(fmt.Sprintf("multi-register codec id: %d", codec.Id()))
}
codecMap.nameMap[codec.Name()] = codec
codecMap.idMap[codec.Id()] = codec
}
func Get(id byte) (Codec, error) {
codec, ok := codecMap.idMap[id]
if !ok {
return nil, fmt.Errorf("unsupported codec id: %d", id)
}
return codec, nil
}
func GetByName(name string) (Codec, error) {
codec, ok := codecMap.nameMap[name]
if !ok {
return nil, fmt.Errorf("unsupported codec name: %s", name)
}
return codec, nil
}
---------------------------以下为github.com/henrylee2cn/telepot/xfer
包内容---------------------------
5 XferPipe数据编码管道
XferPipe
接口用于对数据包进行一系列自定义处理加工,如gzip压缩、加密、校验等。
type (
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// Note: the length can not be bigger than 255!
XferPipe struct {
filters []XferFilter
}
// XferFilter handles byte stream of packet when transfer.
XferFilter interface {
Id() byte
OnPack([]byte) ([]byte, error)
OnUnpack([]byte) ([]byte, error)
}
)
var xferFilterMap = struct {
idMap map[byte]XferFilter
}{
idMap: make(map[byte]XferFilter),
}
teleport/xfer
包的设计与teleport/codec
类似,xferFilterMap
为注册中心,提供注册、查询、执行等功能。
---------------------------以下为github.com/henrylee2cn/telepot
包内容---------------------------
6 Peer通信端点
Peer结构体是TP的一个通信端点,它可以是服务端也可以是客户端,甚至可以同时是服务端与客户端。因此,TP是端对端对等通信的。
type Peer struct {
PullRouter *Router
PushRouter *Router
// Has unexported fields.
}
func NewPeer(cfg PeerConfig, plugin ...Plugin) *Peer
func (p *Peer) Close() (err error)
func (p *Peer) CountSession() int
func (p *Peer) Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
func (p *Peer) DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
func (p *Peer) GetSession(sessionId string) (Session, bool)
func (p *Peer) Listen(protoFunc ...socket.ProtoFunc) error
func (p *Peer) RangeSession(fn func(sess Session) bool)
func (p *Peer) ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) (Session, error)
- 通信端点介绍
1. Peer配置信息
type PeerConfig struct {
TlsCertFile string
TlsKeyFile string
DefaultReadTimeout time.Duration
DefaultWriteTimeout time.Duration
SlowCometDuration time.Duration
DefaultBodyCodec string
PrintBody bool
CountTime bool
DefaultDialTimeout time.Duration
RedialTimes int32
Network string
ListenAddress string
}
2. Peer的功能列表
- 提供路由功能
- 作为服务端可同时支持监听多个地址端口
- 作为客户端可与任意服务端建立连接
- 提供会话查询功能
- 支持TLS证书安全加密
- 设置默认的建立连接和读、写超时
- 慢响应阀值(超出后运行日志由INFO提升为WARN)
- 支持打印body
- 支持在运行日志中增加耗时统计
$ Go技巧分享
一个Go协程的初始堆栈大小为2KB(在运行过程中可以动态扩展大小)如在高并发服务中不加限制地频繁创建/销毁协程,很容易造成内存资源耗尽,且对GC压力也会很大。因此,TP内部采用协程资源池来管控协程,可以大大降低服务器内存与CPU的压力。(该思路源于fasthttp)
协程资源池的源码实现在本人goutil库中的github.com/henrylee2cn/goutil/pool
。下面是TP的二次封装(保守认为一个goroutine平均占用8KB):
var (
_maxGoroutinesAmount = (1024 * 1024 * 8) / 8 // max memory 8GB (8KB/goroutine)
_maxGoroutineIdleDuration time.Duration
_gopool = pool.NewGoPool(_maxGoroutinesAmount, _maxGoroutineIdleDuration)
)
// SetGopool set or reset go pool config.
// Note: Make sure to call it before calling NewPeer() and Go()
func SetGopool(maxGoroutinesAmount int, maxGoroutineIdleDuration time.Duration) {
_maxGoroutinesAmount, _maxGoroutineIdleDuration := maxGoroutinesAmount, maxGoroutineIdleDuration
if _gopool != nil {
_gopool.Stop()
}
_gopool = pool.NewGoPool(_maxGoroutinesAmount, _maxGoroutineIdleDuration)
}
// Go similar to go func, but return false if insufficient resources.
func Go(fn func()) bool {
if err := _gopool.Go(fn); err != nil {
Warnf("%s", err.Error())
return false
}
return true
}
// AnywayGo similar to go func, but concurrent resources are limited.
func AnywayGo(fn func()) {
TRYGO:
if !Go(fn) {
time.Sleep(time.Second)
goto TRYGO
}
}
每当Peer创建一个session时,都有调用上述Go
函数进行并发执行:
func (p *Peer) DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror) {
...
Go(sess.startReadAndHandle)
...
}
func (p *Peer) Listen(protoFunc ...socket.ProtoFunc) error {
lis, err := listen(p.network, p.listenAddr, p.tlsConfig)
if err != nil {
Fatalf("%v", err)
}
defer lis.Close()
p.listen = lis
network := lis.Addr().Network()
addr := lis.Addr().String()
Printf("listen ok (network:%s, addr:%s)", network, addr)
var (
tempDelay time.Duration // how long to sleep on accept failure
closeCh = p.closeCh
)
for {
conn, e := lis.Accept()
...
AnywayGo(func() {
if c, ok := conn.(*tls.Conn); ok {
...
}
var sess = newSession(p, conn, protoFunc)
if rerr := p.pluginContainer.PostAccept(sess); rerr != nil {
sess.Close()
return
}
Tracef("accept session(network:%s, addr:%s) ok", network, sess.RemoteIp(), sess.Id())
p.sessHub.Set(sess)
sess.startReadAndHandle()
})
}
}
7 Router路由器
TP是对等通信,路由不再是服务端的专利,只要是Peer端点就支持注册PULL
和PUSH
这两类消息处理路由。
type Router struct {
handlers map[string]*Handler
unknownApiType **Handler
// only for register router
pathPrefix string
pluginContainer PluginContainer
typ string
maker HandlersMaker
}
func (r *Router) Group(pathPrefix string, plugin ...Plugin) *Router
func (r *Router) Reg(ctrlStruct interface{}, plugin ...Plugin)
func (r *Router) SetUnknown(unknownHandler interface{}, plugin ...Plugin)
$ Go技巧分享
1. 根据maker HandlersMaker
(Handler的构造函数)字段的不同,分别实现了PullRouter
和PushRouter
两类路由。
// HandlersMaker makes []*Handler
type HandlersMaker func(
pathPrefix string,
ctrlStruct interface{},
pluginContainer PluginContainer,
) ([]*Handler, error)
2. 简洁地路由分组实现:
- 继承各级路由的共享字段:
handlers
、unknownApiType
、maker
- 在上级路由节点的
pathPrefix
、pluginContainer
字段基础上追加当前节点信息
// Group add handler group.
func (r *Router) Group(pathPrefix string, plugin ...Plugin) *Router {
pluginContainer, err := r.pluginContainer.cloneAdd(plugin...)
if err != nil {
Fatalf("%v", err)
}
warnInvaildRouterHooks(plugin)
return &Router{
handlers: r.handlers,
unknownApiType: r.unknownApiType,
pathPrefix: path.Join(r.pathPrefix, pathPrefix),
pluginContainer: pluginContainer,
maker: r.maker,
}
}
8 控制器
控制器是指用于提供Handler操作的结构体。
$ Go技巧分享
1. Go没有泛型,我们通常使用interface{}
空接口来代替。
但是,空接口不能用于表示结构体的方法。
下面是控制器结构体及其方法的模型定义:
PullController Model:
type Aaa struct {
tp.PullCtx
}
// XxZz register the route: /aaa/xx_zz
func (x *Aaa) XxZz(args *<T>) (<T>, *tp.Rerror) {
...
return r, nil
}
// YyZz register the route: /aaa/yy_zz
func (x *Aaa) YyZz(args *<T>) (<T>, *tp.Rerror) {
...
return r, nil
}
PushController Model:
type Bbb struct {
tp.PushCtx
}
// XxZz register the route: /bbb/yy_zz
func (b *Bbb) XxZz(args *<T>) {
...
return r, nil
}
// YyZz register the route: /bbb/yy_zz
func (b *Bbb) YyZz(args *<T>) {
...
return r, nil
}
以PullController为例,使用reflect
反射包对未知类型的结构体进行模型验证:
func pullHandlersMaker(pathPrefix string, ctrlStruct interface{}, pluginContainer PluginContainer) ([]*Handler, error) {
var (
ctype = reflect.TypeOf(ctrlStruct)
handlers = make([]*Handler, 0, 1)
)
if ctype.Kind() != reflect.Ptr {
return nil, errors.Errorf("register pull handler: the type is not struct point: %s", ctype.String())
}
var ctypeElem = ctype.Elem()
if ctypeElem.Kind() != reflect.Struct {
return nil, errors.Errorf("register pull handler: the type is not struct point: %s", ctype.String())
}
if _, ok := ctrlStruct.(PullCtx); !ok {
return nil, errors.Errorf("register pull handler: the type is not implemented PullCtx interface: %s", ctype.String())
}
iType, ok := ctypeElem.FieldByName("PullCtx")
if !ok || !iType.Anonymous {
return nil, errors.Errorf("register pull handler: the struct do not have anonymous field PullCtx: %s", ctype.String())
}
...
for m := 0; m < ctype.NumMethod(); m++ {
method := ctype.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" || isPullCtxType(mname) {
continue
}
// Method needs two ins: receiver, *args.
if mtype.NumIn() != 2 {
return nil, errors.Errorf("register pull handler: %s.%s needs one in argument, but have %d", ctype.String(), mname, mtype.NumIn())
}
// Receiver need be a struct pointer.
structType := mtype.In(0)
if structType.Kind() != reflect.Ptr || structType.Elem().Kind() != reflect.Struct {
return nil, errors.Errorf("register pull handler: %s.%s receiver need be a struct pointer: %s", ctype.String(), mname, structType)
}
// First arg need be exported or builtin, and need be a pointer.
argType := mtype.In(1)
if !goutil.IsExportedOrBuiltinType(argType) {
return nil, errors.Errorf("register pull handler: %s.%s args type not exported: %s", ctype.String(), mname, argType)
}
if argType.Kind() != reflect.Ptr {
return nil, errors.Errorf("register pull handler: %s.%s args type need be a pointer: %s", ctype.String(), mname, argType)
}
// Method needs two outs: reply error.
if mtype.NumOut() != 2 {
return nil, errors.Errorf("register pull handler: %s.%s needs two out arguments, but have %d", ctype.String(), mname, mtype.NumOut())
}
// Reply type must be exported.
replyType := mtype.Out(0)
if !goutil.IsExportedOrBuiltinType(replyType) {
return nil, errors.Errorf("register pull handler: %s.%s first reply type not exported: %s", ctype.String(), mname, replyType)
}
// The return type of the method must be Error.
if returnType := mtype.Out(1); !isRerrorType(returnType.String()) {
return nil, errors.Errorf("register pull handler: %s.%s second reply type %s not *tp.Rerror", ctype.String(), mname, returnType)
}
...
}
...
}
2. 参考HTTP的成熟经验,TP的路由路径采用类URL格式,且支持query参数:如/a/b?n=1&m=e
9 Unknown操作函数
TP可通过func (r *Router) SetUnknown(unknownHandler interface{}, plugin ...Plugin)
方法设置默认Handler,用于处理未找到路由的PULL
或PUSH
消息。
UnknownPullHandler Type:
func(ctx UnknownPullCtx) (interface{}, *Rerror) {
...
return r, nil
}
UnknownPushHandler Type:
func(ctx UnknownPushCtx)
10 Handler的构造
// Handler pull or push handler type info
Handler struct {
name string
isUnknown bool
argElem reflect.Type
reply reflect.Type // only for pull handler doc
handleFunc func(*readHandleCtx, reflect.Value)
unknownHandleFunc func(*readHandleCtx)
pluginContainer PluginContainer
}
通过HandlersMaker
对Controller各个方法进行解析,构造出相应数量的Handler。以pullHandlersMaker
函数为例:
func pullHandlersMaker(pathPrefix string, ctrlStruct interface{}, pluginContainer PluginContainer) ([]*Handler, error) {
var (
ctype = reflect.TypeOf(ctrlStruct)
handlers = make([]*Handler, 0, 1)
)
...
var ctypeElem = ctype.Elem()
...
iType, ok := ctypeElem.FieldByName("PullCtx")
...
var pullCtxOffset = iType.Offset
if pluginContainer == nil {
pluginContainer = newPluginContainer()
}
type PullCtrlValue struct {
ctrl reflect.Value
ctxPtr *PullCtx
}
var pool = &sync.Pool{
New: func() interface{} {
ctrl := reflect.New(ctypeElem)
pullCtxPtr := ctrl.Pointer() + pullCtxOffset
ctxPtr := (*PullCtx)(unsafe.Pointer(pullCtxPtr))
return &PullCtrlValue{
ctrl: ctrl,
ctxPtr: ctxPtr,
}
},
}
for m := 0; m < ctype.NumMethod(); m++ {
method := ctype.Method(m)
mtype := method.Type
mname := method.Name
...
var methodFunc = method.Func
var handleFunc = func(ctx *readHandleCtx, argValue reflect.Value) {
obj := pool.Get().(*PullCtrlValue)
*obj.ctxPtr = ctx
rets := methodFunc.Call([]reflect.Value{obj.ctrl, argValue})
ctx.output.SetBody(rets[0].Interface())
rerr, _ := rets[1].Interface().(*Rerror)
if rerr != nil {
rerr.SetToMeta(ctx.output.Meta())
} else if ctx.output.Body() != nil && ctx.output.BodyCodec() == codec.NilCodecId {
ctx.output.SetBodyCodec(ctx.input.BodyCodec())
}
pool.Put(obj)
}
handlers = append(handlers, &Handler{
name: path.Join(pathPrefix, ctrlStructSnakeName(ctype), goutil.SnakeString(mname)),
handleFunc: handleFunc,
argElem: argType.Elem(),
reply: replyType,
pluginContainer: pluginContainer,
})
}
return handlers, nil
}
$ Go技巧分享
- 对不可变的部分进行预处理获得闭包变量,抽离可变部分的逻辑构造子函数。在路由处理过程中直接执行这些
handleFunc
子函数可达到显著提升性能的目的 - 使用反射来创建任意类型的实例并调用其方法,适用于类型或方法不固定的情况
- 使用对象池来复用
PullCtrlValue
,可以降低GC开销与内存占用 - 通过unsafe获取
ctrlStruct.PullCtx
字段的指针偏移量,进而可以快速获取该字段的值
11 Session会话
Session是封装了socket连接的会话管理实例。它使用一个包外不可见的结构体session
来实现会话相关的三个接口:
PreSession
、Session
、PostSession
。(此处session实现多接口的做法类似于Packet)
type (
PreSession interface {
...
}
Session interface {
// SetId sets the session id.
SetId(newId string)
// Close closes the session.
Close() error
// Id returns the session id.
Id() string
// Health checks if the session is ok.
Health() bool
// Peer returns the peer.
Peer() *Peer
// AsyncPull sends a packet and receives reply asynchronously.
// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
AsyncPull(uri string, args interface{}, reply interface{}, done chan *PullCmd, setting ...socket.PacketSetting)
// Pull sends a packet and receives reply.
// Note:
// If the args is []byte or *[]byte type, it can automatically fill in the body codec name;
// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
Pull(uri string, args interface{}, reply interface{}, setting ...socket.PacketSetting) *PullCmd
// Push sends a packet, but do not receives reply.
// Note:
// If the args is []byte or *[]byte type, it can automatically fill in the body codec name;
// If the session is a client role and PeerConfig.RedialTimes>0, it is automatically re-called once after a failure.
Push(uri string, args interface{}, setting ...socket.PacketSetting) *Rerror
// ReadTimeout returns readdeadline for underlying net.Conn.
ReadTimeout() time.Duration
// RemoteIp returns the remote peer ip.
RemoteIp() string
// LocalIp returns the local peer ip.
LocalIp() string
// ReadTimeout returns readdeadline for underlying net.Conn.
SetReadTimeout(duration time.Duration)
// WriteTimeout returns writedeadline for underlying net.Conn.
SetWriteTimeout(duration time.Duration)
// Socket returns the Socket.
// Socket() socket.Socket
// WriteTimeout returns writedeadline for underlying net.Conn.
WriteTimeout() time.Duration
// Public returns temporary public data of session(socket).
Public() goutil.Map
// PublicLen returns the length of public data of session(socket).
PublicLen() int
}
PostSession interface {
...
}
session struct {
...
}
)
Session采用读写异步的方式处理通信消息。在创建Session后,立即启动一个循环读取数据包的协程,并为每个成功读取的数据包创建一个处理协程。
而写操作则是由session.Pull、session.Push或者Handler三种方式来触发执行。
$ Go技巧分享
在以客户端角色执行PULL请求时,Session支持同步和异步两种方式。这是Go的一种经典的兼容同步异步调用的技巧:
func (s *session) AsyncPull(uri string, args interface{}, reply interface{}, done chan *PullCmd, setting ...socket.PacketSetting) {
...
cmd := &PullCmd{
sess: s,
output: output,
reply: reply,
doneChan: done,
start: s.peer.timeNow(),
public: goutil.RwMap(),
}
...
if err := s.write(output); err != nil {
cmd.rerr = rerror_writeFailed.Copy()
cmd.rerr.Detail = err.Error()
cmd.done()
return
}
s.peer.pluginContainer.PostWritePull(cmd)
}
// Pull sends a packet and receives reply.
// If the args is []byte or *[]byte type, it can automatically fill in the body codec name.
func (s *session) Pull(uri string, args interface{}, reply interface{}, setting ...socket.PacketSetting) *PullCmd {
doneChan := make(chan *PullCmd, 1)
s.AsyncPull(uri, args, reply, doneChan, setting...)
pullCmd := <-doneChan
close(doneChan)
return pullCmd
}
实现步骤:
- 在返回结果的结构体中绑定一个chan管道
- 在另一个协程中进行结果计算
- 将该chan做为返回值返回给调用者
- 将计算结果写入该chan中
- 调用者从chan中读出该结果
- (同步方式是对异步方式的封装,等待从chan中读到结果后,再将该结果作为返回值返回)
12 Context上下文
类似常见的Go HTTP框架,TP同样提供了Context上下文。它携带Handler操作相关的参数,如Peer、Session、Packet、PublicData等。
根据调用场景的不同,定义不同接口来限制其方法列表。
此外,TP的平滑关闭、平滑重启也是建立在对Context的使用状态监控的基础上。
type (
BackgroundCtx interface {
...
}
PushCtx interface {
...
}
PullCtx interface {
...
}
UnknownPushCtx interface {
...
}
UnknownPullCtx interface {
...
}
WriteCtx interface {
...
}
ReadCtx interface {
...
}
readHandleCtx struct {
sess *session
input *socket.Packet
output *socket.Packet
apiType *Handler
arg reflect.Value
pullCmd *PullCmd
uri *url.URL
query url.Values
public goutil.Map
start time.Time
cost time.Duration
pluginContainer PluginContainer
next *readHandleCtx
}
)
13 Plugin插件
TP提供了插件功能,具有完备的挂载点,便于开发者实现丰富的功能。例如身份认证、心跳、微服务注册中心、信息统计等等。
type (
Plugin interface {
Name() string
}
PostRegPlugin interface {
Plugin
PostReg(*Handler) *Rerror
}
PostDialPlugin interface {
Plugin
PostDial(PreSession) *Rerror
}
...
PostReadReplyBodyPlugin interface {
Plugin
PostReadReplyBody(ReadCtx) *Rerror
}
...
// PluginContainer plugin container that defines base methods to manage plugins.
PluginContainer interface {
Add(plugins ...Plugin) error
Remove(pluginName string) error
GetByName(pluginName string) Plugin
GetAll() []Plugin
PostReg(*Handler) *Rerror
PostDial(PreSession) *Rerror
...
PostReadReplyBody(ReadCtx) *Rerror
...
cloneAdd(...Plugin) (PluginContainer, error)
}
pluginContainer struct {
plugins []Plugin
}
)
func (p *pluginContainer) PostReg(h *Handler) *Rerror {
var rerr *Rerror
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostRegPlugin); ok {
if rerr = _plugin.PostReg(h); rerr != nil {
Fatalf("%s-PostRegPlugin(%s)", plugin.Name(), rerr.String())
return rerr
}
}
}
return nil
}
func (p *pluginContainer) PostDial(sess PreSession) *Rerror {
var rerr *Rerror
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostDialPlugin); ok {
if rerr = _plugin.PostDial(sess); rerr != nil {
Debugf("dial fail (addr: %s, id: %s): %s-PostDialPlugin(%s)", sess.RemoteIp(), sess.Id(), plugin.Name(), rerr.String())
return rerr
}
}
}
return nil
}
func (p *pluginContainer) PostReadReplyBody(ctx ReadCtx) *Rerror {
var rerr *Rerror
for _, plugin := range p.plugins {
if _plugin, ok := plugin.(PostReadReplyBodyPlugin); ok {
if rerr = _plugin.PostReadReplyBody(ctx); rerr != nil {
Errorf("%s-PostReadReplyBodyPlugin(%s)", plugin.Name(), rerr.String())
return rerr
}
}
}
return nil
}
$ Go技巧分享
Go接口断言的灵活运用,实现插件及其管理容器:
- 定义基础接口并创建统一管理容器
- 在实现基础接口的基础上,增加个性化接口(具体挂载点)的实现,将其注册进基础接口管理容器
- 管理容器使用断言的方法筛选出指定挂载点的插件并执行
最新地址:https://github.com/henrylee2cn/tpdoc/blob/master/01/README.md