Skynet核心部分是一个消息调度机制,Skynet本身是一个独立的进程,其中运行着若干个Worker
工作线程,Worker
工作线程会从消息队列中取出队列中的消息,并找到对应的处理函数进行分发。另外,Skynet还包含Timer
线程用于实现定时机制,以及Socket
线程用于监听epoll
事件并管理网络操作。
消息和任务调度作为Skynet的核心,整个Skynet的核心其实就是一个消息管理系统。在Skynet中可以把每个功能都当作一个服务Service
,整个Skynet工程在执行的过程中会创建多个服务,每个服务相当于一个Actor
。虽然服务是互不依赖并行执行的,但同时也存在服务之间通信和彼此之间的任务调用。
线程种类
Skynet是单进程多线程的,线程的种类有4种:
-
Monitor
线程
用于监控服务是不是陷入了死循环,用于检测节点内的消息是否堵住。 -
Timer
线程
Skynet自己实现的定时器,用于运行定时器。 -
Socket
线程
负责网络数据的收发。 -
Worker
工作线程
负责对消息队列进行调度,数量可通过配置表指定。
Monitor
、Timer
、Socket
都只有1个线程,唯独Worker
工作线程具有多个,Worker
工作线程是可以配置的,默认是8个线程,而工作消息队列内部使用的是循环数组的方式。另外,对于消息的处理者handler
,每个服务都有运行时独一无二的handler
,这个句柄可以跟名字绑定。
消息种类
在Skynet中消息可分为两种:进程内消息、跨进程消息
- 进程内消息
进程内消息是Skynet服务器中各个服务(Actor)之间传递的消息类型,可使用的格式包括:C服务的文本协议、Lua服务的自定义序列化库、自定义的内存数据结构
- 跨进程消息
由于Skynet服务器是一个单进程多线程的异步消息传递框架,当多个Skynet服务器一起构成分布式结构时,每个Skynet服务器就是一个节点,每个节点只有一个进程,所以这里所说的跨进程通信其实是网络通信,其发生的场景主要在Skynet节点与节点之间、Skynet节点与客户端之间。
由于跨进程通信其实是网络通信,所以消息的格式也就对应了网络通信的协议格式,比较常用的包括:自定义协议、sproto
、Google Proto Buffers
、JSON
等。
消息调度机制
每个在线客户端在Skynet服务器上都对应有一个Socket
与其连接,一个Socket
在Skynet内部对应一个Lua虚拟机和一个客户特定的消息队列per client mq
。当客户特定消息队列中有消息时,该队列会挂载到全局队列global message queue
上供工作线程worker Threads
进行调度处理。
一个Socket
线程socket thread
会轮询所有的Socket
,当收到客户端请求后将请求打包成一个消息,发送到该Socket
对应的客户特定消息队列per client mq
中,然后将该消息队列挂到全局队列队尾。
多个Worker
工作线程worker threads
从全局队列头部获取客户特定消息队列,从客户特定消息队列中取出一个消息进行处理,处理完毕后再将消息队列重新挂到全局队列队尾。
Timer
定时器线程会周期性检查以下设置的定时器,将到期的定时器消息发送到客户特定消息队列中,每个Lua虚拟机运行过程中也会向其它Lua虚拟机或自己的客户特定消息队列发送消息。
Monitor
监视线程会监控各个客户端的状态,检查是否有死循环的消息等。
综上所述,每个客户端处理消息时都是按照消息到达的顺序进行处理的,同一时刻一个客户端的消息只会被一个工作线程调度,因此客户端处理逻辑无需考虑多线程并发,所以基本无需加锁。
消息队列
首先需要明确的一点是,在Skynet中所有的消息通信都是异步的,这一点是由Skynet的消息调度机制所决定的。
消息队列是Skynet的核心,Skynet是围绕着消息队列来工作的。Skynet中消息队列分为两部分:全局消息队列和服务消息队列。每个服务都有一个自己的服务队列,服务队列被全局队列引用。
Skynet维护了两级消息队列,严格来说是嵌套的两级消息队列,首先每个服务都拥有一个私有的消息队列,队列中是一个个发送给它的消息。其次,Skynet拥有 一个全局消息队列,其中存放着若干非空的服务队列。
简单来说,每个服务都拥有一个自己的私有消息队列,当服务被创建并生成私有消息队列后,私有消息队列又会被注册到全局消息队列中。严格来讲,Skynet的全局消息队列中存放的是私有消息队列不为空的服务(Actor)。
主进程通过多个线程不断的从全局消息队列中取出服务队列,然后分发服务消息队列中的消息到对应的服务。简单来说,当向某个服务发送消息时,就是向服务的消息队列中添加消息,Skynet通过多线程来分发消息,线程的工作就时遍历全局消息队列,然后分发服务消息队列中的消息到服务。
服务队列
由于服务队列是属于服务的,所以服务队列的生命周期和服务保持一致,它会在载入服务的时候生成并在卸载服务的时候删除。
服务队列支持两个操作:向服务队列中添加消息、从服务队列中取出消息
当添加消息到队列中时如果队列满了,就会触发自动扩容操作,新扩容的数组大小将是原来的两倍。那么,取出消息后,数组所占用的空间会收缩减小吗?答案是不会。
消息调度
在Skynet启动时会建立若干Worker
工作线程,Worker
工作线程不断从主消息队列中取出一个次级消息队列出来,然后再从次级消息队列中取出一条消息,最后调用对应服务的callback
回调函数进行处理。
为了保证调用公平,一次仅处理一条消息而不是耗尽所有消息,以保证没有服务被饿死(没有服务得不到工作线程执行的机会)。这样,Skynet就实现了把一个消息(数据包)从一个服务发送到另一个服务。
每条Worker
工作线程每次从全局消息队列中pop
出一个次级消息队列,并从主从次级消息队列中pop
出一条消息,然后查找到该次级消息队列的所属服务,并将消息传递给该消息队列。每当一条Worker
工作线程从全局消息队列中pop
出一个次级消息队列时,其他线程是获取不到同一个服务的。最后再调用callback
回调函数。因此,不用担心一个服务同时会在多条线程内消费不同的消息,一个服务执行不存在并发,因此线程是安全的。
对于Socket
线程、Timer
线程、Worker
线程,都有可能会向指定服务的次级消息队列中push
消息,push
函数内有个加一自旋锁,用于避免多条线程同时向一个次级消息队列push
消息的悲剧。
综上所述,我们所编写的业务逻辑运行在不同的独立沙盒环境中,它们之间通过消息队列来进行交互。Worker
、Timer
、Socket
线程中运行的模块都有机会向特定的服务push
消息,它们是消息的生产者,而Worker
线程内的模块同时也是消息的消费者。
值得注意的是,服务模块要将数据通过Socket
发送给客户端时,并不是数据写入消息队列,而是通过管道从Worker
线程发送给Socket
线程,并交由Socket
转发。此外设置定时器也不走消息队列,而是直接将定时器模块加入一个timer_node
。因为timer
和socket
线程内运行的模块并不是这里的context
因此消息队列也无法消费。
当初始化一个服务时,Skynet会生成:
- 一个
skynet_context
作为服务的实例 - 一个唯一的服务
handle
(服务的唯一ID)用来标识服务 - 一个消息队列
message_queue
- 向框架注册一个
callback
回调函数(当服务收到有发送来的消息时通过回调方法传入)。
Skynet服务工作时,如果在初始化阶段注册了消息处理函数,只要有消息传入,就会触发注册的消息处理函数,这些消息都是Skynet的内部消息和外部的网络数据,定时器也会通过内部消息的形式表现出来。从Skynet的底层来看,每个服务就是一个消息处理器。
由此可见,Skynet中的服务主要作用是消息回调,框架将消息分发给服务,服务是消费者,同时服务可以发消息,因此服务也是生产者。Skynet的服务处理消息回调的方式是启动一个协程,每个消息都分别启动一个协程,然后在协程里调用使用skynet.dispatch
绑定的消息处理函数。显然使用协程的处理方式是不够高效的,因为协程的计算并不是真正并行,所以在服务之上,还有多线程的调度器,允许多个Lua虚拟机中的协程同时运行。
注意的是,多个Lua虚拟机中的协程并不会出现数据竞争,因为服务之间以Actor
模式通信,线程间高效地传递数据,也就是服务间的消息机制。因此,单个进程内的Skynet有最高效的并行性能。
服务间消息通信
Skynet中每个服务都有一个独立的Lua虚拟机,逻辑上服务之间是相互隔离的,也就是说,不能使用传统意义上的Lua全局变量进行服务间的通讯。在Skynet中服务之间可以通过Skynet消息调度机制来完成通信,Skynet中的服务是基于Actor模型设计出来的,每个服务都可以接收消息、处理消息、发送应答等。Skynet中每条消息都是由6部分组成:消息类型、session、发送服务地址、接收方服务地址、消息C指针、消息长度。
Lua服务的消息处理
Skynet中不同服务是利用系统的多线程完全并行的,当你从服务A向服务B和服务C分别各自发送一条消息时,并不能保证先发的消息先被处理。而当你从服务A向服务B依次发送两条消息时,先发的消息一定会被服务B先处理。
使用Lua实现的服务只是一个内嵌了Lua虚拟机的服务,也遵守上面的规则。如果服务B是一个Lua服务,当服务A向服务B发送两条消息x和y时,Skynet一定保证x先被服务B中的Lua虚拟机接收到,并为消息x生成要给协程X,并运行这个协程。然后才会接收到消息y,并重新生成一个新的协程Y并运行。
大多数情况下系统是会保证运行次序的,一旦协程X中调用了Skynet提供的socket IO处理,或是调用了skynet.call、skynet.sleep等会导致协程挂起的指令,那么消息处理的执行流就被暂时挂起了。注意skynet.send不会导致挂起。
和Erlang的process不同的是,此时Skynet挂起的是Lua虚拟机中的协程,服务B本身是可以继续处理消息的。这个时候,一旦消息Y抵达,一个新的协程Y就会被创建并运行。看起来,服务B中就有两条执行流程并行处理。
从这个意义上来说Skynet中Lua虚拟机上的协程才能看成是Erlang中的process的等价物。Lua虚拟机也是Skynet中的一个服务,它提供了一个共享环境,让不同的协程之间可以共享状态,这也是很多bug的滋生之处。
未完待续...