这是基于golang socket 一个轻量级,支持高并发操作的开发框架chitchat。本文将介绍chitchat的基本使用方法;通过源码分析该框架的具体工作流程;简要讲解作者留下的Demo文件和该框架的使用技巧;下载链接。通过该框架,我们可以方便建立起Server-Client长连接并通信。
使用chitchat
chitchat得以支持高并发连接的关键在于其能够快速响应客户端发起的链接并及时开启goroutine确保一对一的通信。对于使用者而言,只需负责向框架注册正确的IP socket(ipAddr:ipPort)(注:除非特别说明,否则后续提到的地址Addr均指addr:port
)并正确编写用于处理接收数据
和异常处理
函数即可正常运行。
开启一个Server
仅需创建一个Server实例并调用其Listen()方法即可使一个Server开始正常工作。一个Server通常只用于监听一个端口,负责一类事物的调度处理。我们看一下具体调度的API:
func NewServer(
ipaddrsocket string,
delim byte,
readfunc ServerReadFunc,
additional interface{}) Server
可以看到,创建一个Server实例需要提供四个参数,分别为监听对象
,分隔符
,处理函数
,附加数据
。其中附加数据
可置为空(nil)。
监听对象
即可供Client连接的IPsocket;当Server读到一连串数据后,将通过delim分隔符
将数据切片并交予readfunc处理,多片数据将调用多次readfunc。delim可置为0,此时Server将持续读到EOF后才会交付数据。当delim置为‘\n’时,Server会默认换行交付,此时会根据Windows‘\r\n’作出对应调整;处理函数
将处理Server交付的数据流;附加数据
是为了配合readfunc更好的完成对数据的处理。后续在讲解如何编写readfunc时会提及如何使用additional给出的数据。
Server实则是一个可供调用的对外API接口interface
,其中包含Listen()方法启动该Server开始监听。
func (t *server) Listen() error
Listen是一个异步方法,如果发现配置参数有误或端口被占用等错误将会直接返回,否则就在后台拉起新的goroutine处理具体事务。Listen()方法不阻塞进程,也不会等到后台goroutine全部正常工作后再返回。
后台goroutine在运转处理的过程中若遇到错误将通过Err channel告知使用者,因此使用者需要显式地接收并处理error。注意即使不需要这些error信息,我们也需要有一个接收的过程,否则会导致后台进程堵塞。通过ErrChan()
获取该Channel:
type Errsocket struct {
Err error
RemoteAddr string
}
func (t *server) ErrChan() <-chan Errsocket
发送的错误消息包含两部分,error
和对端ip(addr:port)
。
当我们想关闭该Server,只需调用其Cut
函数:
func (t *server) Cut() error
Cut()方法会使Server停止监听Socket,同时释放所有已连接的Connection。该方法和Listen()一样,也不会等待所有Connection全部关闭后再返回。倘若希望关闭某特定的Connection(当然在我们已经知道该Connection对端连接IPaddr的前提下),我们可以使用CloseRemote
方法:
func (t *server) CloseRemote(remoteAddr string) error
至此较为重要的Server API已经简单介绍完成了,另外有些较为简单的API根据名字便可知道其作用,不再简单赘述。之后我们会通过一个简单的例子演示这些API的用法。
type Server interface {
Listen() error
Cut() error
CloseRemote(string) error
RangeRemoteAddr() []string
GetLocalAddr() string
SetDeadLine(time.Duration, time.Duration)
ErrChan() <-chan Errsocket
Write(interface{}) error
}
开启一个Client
通过NewClient函数创建一个Client实例,并通过调用其API方法向服务端发起连接。
func NewClient(
ipremotesocket string,
delim byte,
readfunc ClientReadFunc,
additional interface{}) Client {
可以发现,创建Client实例的函数参数与创建Server实例NewServer
的函参形式和意义基本相同。再次便不再多加解释。注意的是,v1.0.0版本Client还未能指定自己的ipaddr,只能连接成功后随机分配;另外,readfunc对于Server
而言是不可置为空(nil)的,但对于Client
而言可以置为nil,即忽略所有Server发送的消息。再有,对于一对Server/Client而言,其分隔符delim
应该约定好是相同的,否则可能会出现消息切分错误的情况。
Client通过调用Dial()
方法向Server发起连接。
func (t *client) Dial() error
若连接错误,则会返回具体错误原因,否则拉起相应goroutine执行后续操作并返回。
关闭连接可使用API提供的Close()命令。
func (t *client) Close()
该函数的作用仅仅是向Client发送了退出的信号,若此时还有业务处于运行状态(如readfunc)则会等待业务正常关闭后再退出。
以下是Client的全部对外API:
type Client interface {
Dial() error
Close()
SetDeadLine(time.Duration)
ErrChan() <-chan Errsocket
Write(interface{}) error
GetRemoteAddr() string
GetLocalAddr() string
}
最后我们讲解Write()
方法。Write函数将传递的类型通过json编码为[]byte并发送。因此我们可以在readfunc
中使用Unmarshal()解码。同时,框架提供了一个函数使得使用者可以自定义Write()方法:
type wf func(net.Conn, interface{}, byte) error
func SetWriteFunc(f wf)
readfunc与APIs:
该框架最为重要的核心部分即readfunc的编写,它的作用是处理由Server/Client递交的数据片。我们先看一下readfunc的函数签名:
type ClientReadFunc func([]byte, ReadFuncer) error
type ServerReadFunc func([]byte, ReadFuncer) error
无论是Client的readfunc或Server的readfunc,其函数签名都是相同的。ReadFuncer是一个接口interface
,它提供了一系列在readfunc函数中可用的API。稍后我们会对其中部分方法进行讲解。
type ReadFuncer interface {
GetRemoteAddr() string
GetLocalAddr() string
GetConn() net.Conn
Close()
Write(interface{}) error
Addon() interface{}
}
由于socket只允许传递[]byte
类型的数据,因此我们要做的第一步就是将[]byte类型转变为我们希望的数据类型。如果写入的是一个数据类型,我们想从[]byte转为struct可使用:
var t = *(**yourStruct)(unsafe.Pointer(&str))
这里将yourStruct
替换为你自己的结构体别名即可。若是string类型,则简单使用类型转换即可。
readfunc提供了够用的API,包括获得本地/远程IP socket与Conn,发送数据,关闭连接,获得附加数据
。还记得附加数据
吗,这是我们在最初创建Server/Client实例时传入的一个参数,现在可以通过Addon()
将其取出来使用了。一般建议传入的是一个指针类型的Addon,这样readfunc可对其进行修改。
关于Close()
函数:不用担心在readfunc中使用Close()方法会提前终止readfunc业务,导致数据无法正常交付。正如前文所言,Close()只是向框架传递一个关闭的信号。框架会等待readfunc全部执行完毕后再关闭这个连接。
源码分析
在分析Demo之前,我们先简单探究一下约600多行的源码,看一下其内部各goroutine的支配运行情况。
当Server调用Listen()方法时,Server内部会拉起一个hL
goroutine(handleListen);当成功响应Client的Dial方法时,hL
拉起新的goroutinehC4s
(handleConnforServer);hC4s
通过拉起read
读取DATA并负责将DATA交付给Readfunc
。一个hC4s
对应一个连接,多个连接将开启多个hC4s
和read
。Client向服务端发起连接成功后也将拉起一个goroutinehC4c
(handleConnforClient)和read
。eD是一个较为特殊的goroutine,他负责用户监听的errChannel是否处于关闭状态并将goroutine产生的错误数据传递给用户。本文将不详细分析eD,详情可以参考这篇文章 多goroutine异步通知error的一种方法 。
当Server调用Cut()方法关闭监听后,它将关闭hL与所有的hC4s和read,以及负责错误转发的eD,同时关闭errChannel;调用Close()/CloseRemote(...)方法时,仅关闭当前连接对应的hC4s和read,不关闭errChannel;Client调用Close()关闭连接后,将关闭开启的所有goroutine和errChannel。
hC4s和hC4c大同小异,我们着重分析一下hC4s源码:
func handleConnServer(h *hConnerServer, eC chan Errsocket, ctx context.Context, s *server) {...}
type hConnerServer struct {
conn net.Conn
d byte
mu *sync.Mutex
readfunc ServerReadFunc
}
hConnerServer
结构体主要包含了以下内容:连接实例conn,分隔符d,普通锁mu,readfunc,其中mutex主要用于维护eD的正常工作;eC
是上游传递下来的错误发送通道;监听ctx.Done()保证与上游一起收到退出信号,但不保证退出的顺序;server
提供readfunc使用的API。defer()语句保证了当hC4s退出时,将安全关闭conn和eD goroutine。
拉起的read
goroutine将读到的DATA分片通过channel发送给hC4s,hC4s将该DATA交给readfunc处理:
//hC4s
case strReq, ok := <-strReqChan: //read a data slice successfully
if !ok {
return //EOF && d!=0
}
err := h.readfunc(strReq, &server{
currentConn: h.conn,
delimiter: h.d,
remoteMap: s.remoteMap,
additional: s.additional,
})
if err != nil {
h.mu.Lock()
eC <- Errsocket{err, h.conn.RemoteAddr().String()}
}
}
一个server struct同时实现了Server interface和ReadFuncer interface中的所有方法,并通过接口的方式将特定的方法暴露给框架的使用者,这样设计使一些重复的方法在代码上得到复用。
hC4c在这段代码上稍有不同:
//hC4c
case strReq, ok := <-strReqChan: //read a data slice successfully
if !ok {
return //EOF && d!=0
}
if h.readfunc != nil {
h.rcmu.Lock()
err := h.readfunc(strReq, client)
if err != nil {
h.eD.mu.Lock()
eC <- Errsocket{err, h.conn.RemoteAddr().String()}
}
h.rcmu.Unlock()
}
}
区别在于:
- 对于Client而言,其readfunc是可为nil的,这样正常读数据但不会被处理;
- 与Server相比,多了一个rcmu的锁。该锁是防止在readfunc中调用了Close()方法后error Channel被提前关闭,导致readfunc的错误信息无法被正确送达。我们可以看一下Client的Close()方法:
func (t *client) Close() {
go func() {
t.rcmu.Lock()
t.mu.Lock()
t.closed = true
close(t.eU)
t.mu.Unlock()
t.rcmu.Unlock()
t.cancelfunc()
}()
}
可以看到,Close()方法会等待rcmu锁被释放后再执行后续操作。而为什么Server不需要为之加锁呢。因为Server在readfunc调用的Close()方法不会关闭上游的error Channel。
Server通过并发安全的map存储每个Conn对应的ip socket与cancelFunc,保证能够独立关闭任意Conn。
Demo分析
与上文一样,首先将各goroutine运作与调度的流程关系通过图的形式表现出来,并简要解释各goroutine的作用:
Master提供的Listen()方法将注册一个名为registerNode的readfunc
;当Node节点向Master注册成功后,Node节点拉起一个dHBL(daemon-HeartBeatListener) goroutine,在7939端口发起监听并注册hb4node readfunc
,用于接收ping报文并发送pong回应;Master会拉起一个dHBC(daemon-HeartBeatChecker),定时向Node端发起连接并发送ping报文,并注册hb4masterreadfunc
,当成功接收到pong报文后主动关闭连接。若在接收报文消息过程中出现错误,将发送错误消息至HBC/L error 错误处理器,供作进一步处理。
当dHBC连续接收到三次以上错误消息后,判定对端Node失去连接;当HBL error十秒以上未收到Master发来的消息后,判定Master已丢失自己。
Demo Tricks
在hb4master/node readfunc
中,无论结果成功与否,都会发送一个error("succeed"或 具体错误),这样在HBC/L error便可根据error得知此次消息传递的结果,并作进一步操作。
Github
Github:chitchat
或者也可以通过
go get github.com/ovenvan/chitchat
下载并使用。希望大家多pr并提issue,帮助这个框架更加完善。