这部分是承接上篇,主要分析skynet中定时器和网络实现部分,还有消息队列,和剩下的第三个小问题,因为这些是skynet的核心。
自己项目中写过和分析过的开源代码,定时器和网络实现,其实都差不多,我准备把skynet分析完后,不再分析这些,准备去研究下LevelDB和brpc,在剩下的大半年,也会着重点框架的设计。
消息队列的实现:
7 struct skynet_message {
8 uint32_t source; //表示消息的来源
9 int session; //对应哪个协程
10 void * data;
11 size_t sz;
12 }; //一条消息的声明
一级全局消息队列如下,他挂载了二级不为空的服务消息队列,由工作线程pop和push,为了公平调度,每次只处理一条消息(可能会多一点,由工作线程的weight决定),且每个服务的消息队列在某个时刻只由一个线程处理:
21 struct message_queue {
22 struct spinlock lock;
23 uint32_t handle; //服务句柄
24 int cap;
25 int head;
26 int tail;
27 int release;
28 int in_global; //表明该消息队列是否在全局队列中
29 int overload;
30 int overload_threshold;
31 struct skynet_message *queue; //消息数组,默认大小64个
32 struct message_queue *next;
33 };
35 struct global_queue {
36 struct message_queue *head; //指向头
37 struct message_queue *tail; //指向尾
38 struct spinlock lock; //自旋锁
39 };
上面一些字段注释了下,其他的从命名便知道作用。
初始化全局消息队列:
211 void
212 skynet_mq_init() {
213 struct global_queue *q = skynet_malloc(sizeof(*q));
214 memset(q,0,sizeof(*q));
215 SPIN_INIT(q);
216 Q=q;
217 }
创建每个服务的消息队列:
77 struct message_queue *
78 skynet_mq_create(uint32_t handle) {
79 struct message_queue *q = skynet_malloc(sizeof(*q));
80 q->handle = handle;
81 q->cap = DEFAULT_QUEUE_SIZE;
82 q->head = 0;
83 q->tail = 0;
84 SPIN_INIT(q)
85 // When the queue is create (always between service create and service init) ,
86 // set in_global flag to avoid push it to global queue .
87 // If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
88 q->in_global = MQ_IN_GLOBAL;
89 q->release = 0;
90 q->overload = 0;
91 q->overload_threshold = MQ_OVERLOAD;
92 q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
93 q->next = NULL;
94
95 return q;
96 }
加自旋锁变量,是因为后面在操作消息队列时,一个工作线程pop消息时,有另一个线程push消息,可能出现一些不一致问题,需要同步。
从服务消息队列中pop消息:
137 int
138 skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
139 int ret = 1;
140 SPIN_LOCK(q)
141
142 if (q->head != q->tail) {
143 *message = q->queue[q->head++];
144 ret = 0;
145 int head = q->head;
146 int tail = q->tail;
147 int cap = q->cap;
148
149 if (head >= cap) {
150 q->head = head = 0;
151 }
152 int length = tail - head;
153 if (length < 0) {
154 length += cap;
155 }
156 while (length > q->overload_threshold) {
157 q->overload = length;
158 q->overload_threshold *= 2;
159 }
160 } else {
161 // reset overload_threshold when queue is empty
162 q->overload_threshold = MQ_OVERLOAD;
163 }
164
165 if (ret) {
166 q->in_global = 0;
167 }
169 SPIN_UNLOCK(q)
171 return ret;
172 }
当q->head == q->tail
表示消息队列为空,就把他从全局消息中删除skynet_globalmq_pop
,即执行q->in_global = 0
:
316 for (i=0;i<n;i++) {
317 if (skynet_mq_pop(q,&msg)) {
318 skynet_context_release(ctx);
319 return skynet_globalmq_pop();
当服务消息队列不为空时,取head
索引处消息,并自增head
,当head >= cap
表示需要回到索引0处等pop
下一条消息;剩下的逻辑是计算有多少条消息;
push消息到服务消息队列中:
189 void
190 skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
191 assert(message);
192 SPIN_LOCK(q)
193
194 q->queue[q->tail] = *message;
195 if (++ q->tail >= q->cap) {
196 q->tail = 0;
197 }
198
199 if (q->head == q->tail) {
200 expand_queue(q);
201 }
202
203 if (q->in_global == 0) {
204 q->in_global = MQ_IN_GLOBAL;
205 skynet_globalmq_push(q);
206 }
207
208 SPIN_UNLOCK(q)
209 }
不管是pop消息还是push消息,都是复制,对指针进行了浅拷贝,由上篇分析可知,消息在各服务间基本是zero-copy;这里虽然用数组存储消息,但并未使用常规方法包括取模判断,留一个空表示队列的满;当++ q->tail >= q->cap
时,表示需要回绕到数组索引0时;当q->head == q->tail
表示队列满,需要扩容;然后如果不在全局队列中(或者第一次进全局队列,或者因没消息被pop出全局队列)的话需要push到全局队列等工作线程处理;
174 static void
175 expand_queue(struct message_queue *q) {
176 struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2) ;
177 int i;
178 for (i=0;i<q->cap;i++) {
179 new_queue[i] = q->queue[(q->head + i) % q->cap];
180 }
181 q->head = 0;
182 q->tail = q->cap;
183 q->cap *= 2;
184
185 skynet_free(q->queue);
186 q->queue = new_queue;
187 }
扩容就是以两倍的大小,然后把原来的消息重新push到新数组中,获取原来的消息以q->queue[(q->head + i) % q->cap]
,因为可能出现q->head>q->tail
的情况;
剩下的进出全局队列,也是需要加自旋锁,并且要正确的更新头和尾指针。
定时器的实现
定时器的入口是在timer线程:
128 static void *
129 thread_timer(void *p) {
130 //more code...
132 for (;;) {
133 skynet_updatetime();
136 usleep(2000); //每2毫秒
137 //more code...
141 }
142 //more code...
149 return NULL;
150 }
266 void
267 skynet_updatetime(void) {
268 uint64_t cp = gettime();
269 if(cp < TI->current_point) {
271 TI->current_point = cp;
272 } else if (cp != TI->current_point) {
273 uint32_t diff = (uint32_t)(cp - TI->current_point);
274 TI->current_point = cp;
275 TI->current += diff;
276 int i;
277 for (i=0;i<diff;i++) {
278 timer_update(TI);
279 }
280 }
281 }
以上是根据上一个时间点到现在时间点,计算过了多少个十毫秒,然后分别timer_update
;
其中初始化的时候,时间精度做到了1/100秒,以前项目中tick为1/5秒,这里每2毫秒跑一次:�
242 struct timeval tv;
243 gettimeofday(&tv, NULL);
244 *sec = tv.tv_sec;
245 *cs = tv.tv_usec / 10000;
258 struct timeval tv;
259 gettimeofday(&tv, NULL);
260 t = (uint64_t)tv.tv_sec * 100;
261 t += tv.tv_usec / 10000;
31 struct timer_event {
32 uint32_t handle; //服务句柄
33 int session; //对应消息的会话,比如协程
34 };
35
36 struct timer_node {
37 struct timer_node *next;
38 uint32_t expire; //超时时间
39 };
40
41 struct link_list {
42 struct timer_node head;
43 struct timer_node *tail;
44 };
45
46 struct timer {
47 struct link_list near[TIME_NEAR]; //即将处理的节点
48 struct link_list t[4][TIME_LEVEL]; //由时间远近分层
49 struct spinlock lock;
50 uint32_t time; //累计多少个十毫秒
51 uint32_t starttime;
52 uint64_t current;
53 uint64_t current_point;
54 };
一共五层;以使用方式来说明定时器的工作原理,当要添加个一秒后超时的节点时(即100个十毫秒):
401 int session = skynet_context_newsession(context);
211 int
212 skynet_timeout(uint32_t handle, int time, int session) {
213 if (time <= 0) {
214 struct skynet_message message;
215 message.source = 0;
216 message.session = session;
217 message.data = NULL;
218 message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT; //响应消息类型
219
220 if (skynet_context_push(handle, &message)) {
221 return -1; //压入服务消息队列
222 }
223 } else {
224 struct timer_event event;
225 event.handle = handle;
226 event.session = session;
227 timer_add(TI, &event, sizeof(event), time);
228 }
229
230 return session;
231 }
以上根据time值表示是否立即超时还是过段时间,并保存相关的信息;如果time大于0则timer_add
节点:
95 static void
96 timer_add(struct timer *T,void *arg,size_t sz,int time) {
97 struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
98 memcpy(node+1,arg,sz);
99
100 SPIN_LOCK(T);
101
102 node->expire=time+T->time; //这里可能发生溢出
103 add_node(T,node);
104
105 SPIN_UNLOCK(T);
106 }
在挨着struct timer_node
地址分配struct timer_event
,并保存arg数据;然后加锁,设置超时时间,添加节点,解锁;
74 static void
75 add_node(struct timer *T,struct timer_node *node) {
76 uint32_t time=node->expire;
77 uint32_t current_time=T->time;
78
79 if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
80 link(&T->near[time&TIME_NEAR_MASK],node);
81 } else {
82 int i;
83 uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
84 for (i=0;i<3;i++) {
85 if ((time|(mask-1))==(current_time|(mask-1))) {
86 break;
87 }
88 mask <<= TIME_LEVEL_SHIFT;
89 }
90
91 link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
92 }
93 }
24 #define TIME_NEAR_SHIFT 8
25 #define TIME_NEAR (1 << TIME_NEAR_SHIFT)
26 #define TIME_LEVEL_SHIFT 6
27 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
28 #define TIME_NEAR_MASK (TIME_NEAR-1) //0xff
29 #define TIME_LEVEL_MASK (TIME_LEVEL-1) //0x1f
以上实现把超时时间和当前时间作比较,如果高24位都相等,则根据低8位,把node放到T->near[time&TIME_NEAR_MASK]中,因为这种方式,表示node会很快超时;
举个例子类比下:
date -r 1500000000时间为2017年 7月14日 星期五 10时40分00秒 CST;
对应的十六进制为:0x59682f00;而当0x59682fxx固定时,不管xx怎么变,时间都只在1500000000〜1500000255变化,即:
2017年 7月14日 星期五 10时44分15秒 CST,最迟是四分钟十几秒;代码82〜91行,是那些超时时间比这个久的,就会按分层,这样对应:0层->高18位(0x3ff),1层->高12位(0xfffff),2层->高6位(0x3ffffff),3层->是剩下的,最后找到适合的i位置;当然这里的time不是真正的时间,而是自程序启动后,总共走了多少个tick;
Q)以上会出现一种情况是,当无符号数最高位进位时,导致又回到了0,那么可能把本来是在T->near
中的放在了T->t[3][0]
中;
回到timer_update
:
172 static void
173 timer_update(struct timer *T) {
174 SPIN_LOCK(T);
176 // try to dispatch timeout 0 (rare condition)
177 timer_execute(T);
179 // shift time first, and then dispatch timer message
180 timer_shift(T);
182 timer_execute(T);
184 SPIN_UNLOCK(T);
185 }
159 static inline void
160 timer_execute(struct timer *T) {
161 int idx = T->time & TIME_NEAR_MASK;
162
163 while (T->near[idx].head.next) {
164 struct timer_node *current = link_clear(&T->near[idx]);
165 SPIN_UNLOCK(T);
166 // dispatch_list don't need lock T
167 dispatch_list(current);
168 SPIN_LOCK(T);
169 }
170 }
141 static inline void
142 dispatch_list(struct timer_node *current) {
143 do {
144 struct timer_event * event = (struct timer_event *)(current+1);
145 struct skynet_message message;
146 message.source = 0;
147 message.session = event->session;
148 message.data = NULL;
149 message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;
150
151 skynet_context_push(event->handle, &message);
152
153 struct timer_node * temp = current;
154 current=current->next;
155 skynet_free(temp);
156 } while (current);
157 }
这里先取得自旋锁,摘下相应的节点列表,并释放锁,然后塞条消息到对应服务的消息队列中;
108 static void
109 move_list(struct timer *T, int level, int idx) {
110 struct timer_node *current = link_clear(&T->t[level][idx]);
111 while (current) {
112 struct timer_node *temp=current->next;
113 add_node(T,current);
114 current=temp;
115 }
116 }
117
118 static void
119 timer_shift(struct timer *T) {
120 int mask = TIME_NEAR;
121 uint32_t ct = ++T->time;
122 if (ct == 0) {
123 move_list(T, 3, 0);
124 } else {
125 uint32_t time = ct >> TIME_NEAR_SHIFT;
126 int i=0;
127
128 while ((ct & (mask-1))==0) {
129 int idx=time & TIME_LEVEL_MASK;
130 if (idx!=0) {
131 move_list(T, i, idx);
132 break;
133 }
134 mask <<= TIME_LEVEL_SHIFT;
135 time >>= TIME_LEVEL_SHIFT;
136 ++i;
137 }
138 }
139 }
由于只处理near数组中的节点,当过了一定的tick数后,在T->t[level]
可能有快超时的,故这里需要做个跨level的过程并处理Q问题;这里针对T->time
为0要特殊处理;当(T->time & (mask-1))==0
才需要处理跨level的情况;每TIME_LEVEL_MASK
比较,找到后就移动,为什么这样做?因为在添加节点到相应的level
上时,正因为(time|(mask-1))==(current_time|(mask-1))
才找到适合的level,而这里是个相反的过程;
以上就是skynet中定时器的实现了,这种方式是第一次见到(看来还是要多学习),不过对于业务场景来说,选择适合的方案才好,减少不必要的计算等,引用连接中的一个比较:
网络模块的实现
以下结构体的声明是整个socket相关的部分:
100 struct socket_server {
101 int recvctrl_fd;
102 int sendctrl_fd;
104 poll_fd event_fd;
105 int alloc_id;
106 int event_n;
107 int event_index;
109 struct event ev[MAX_EVENT];
110 struct socket slot[MAX_SOCKET];
111 char buffer[MAX_INFO];
113 fd_set rfds;
114 }; //more code
其中各成员的使用已在注释中给出。
328 struct socket_server *
329 socket_server_create() {
330 int i;
331 int fd[2];
332 poll_fd efd = sp_create();
// check efd
337 if (pipe(fd)) {
//check fd
341 }
342 if (sp_add(efd, fd[0], NULL)) { //监听管道可读事件,阻塞
//check add event
349 }
351 struct socket_server *ss = MALLOC(sizeof(*ss));
352 ss->event_fd = efd;
353 ss->recvctrl_fd = fd[0];
354 ss->sendctrl_fd = fd[1];
356 ss->checkctrl = 1;
357 for (i=0;i<MAX_SOCKET;i++) { //初始化请求连接socket信息
358 struct socket *s = &ss->slot[i];
359 s->type = SOCKET_TYPE_INVALID;
360 clear_wb_list(&s->high);
361 clear_wb_list(&s->low);
362 }
363 ss->alloc_id = 0;
364 ss->event_n = 0;
365 ss->event_index = 0;
367 FD_ZERO(&ss->rfds);
370 return ss;
371 }
网络工作主要由thread_socket
网络线程来做,处理网络上的数据和事件;然后核心代码在skynet_socket_poll
中,这里当有消息过来时,会wakeup
工作线程。
55 static void
56 wakeup(struct monitor *m, int busy) {
57 if (m->sleep >= m->count - busy) {
58 // signal sleep worker, "spurious wakeup" is harmless
59 pthread_cond_signal(&m->cond);
60 }
61 }
62
63 static void *
64 thread_socket(void *p) {
65 struct monitor * m = p;
67 for (;;) {
68 int r = skynet_socket_poll();
69 if (r==0)
70 break;
71 if (r<0) {
73 continue;
74 }
75 wakeup(m,0);
76 }
77 return NULL;
78 }
尝试唤醒
73 int
74 skynet_socket_poll() {
75 struct socket_server *ss = SOCKET_SERVER;
77 struct socket_message result;
78 int more = 1;
79 int type = socket_server_poll(ss, &result, &more);
80 switch (type) {
81 case SOCKET_EXIT:
82 return 0;
83 case SOCKET_DATA:
84 forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
85 break;
86 case SOCKET_CLOSE:
87 forward_message(SKYNET_SOCKET_TYPE_CLOSE, false, &result);
88 break;
89 case SOCKET_OPEN:
90 forward_message(SKYNET_SOCKET_TYPE_CONNECT, true, &result);
91 break;
//more code...
108 if (more) {
109 return -1;
110 }
111 return 1;
112 }
6 #define SOCKET_DATA 0
7 #define SOCKET_CLOSE 1
8 #define SOCKET_OPEN 2
9 #define SOCKET_ACCEPT 3
10 #define SOCKET_ERR 4
11 #define SOCKET_EXIT 5
12 #define SOCKET_UDP 6
13 #define SOCKET_WARNING 7
以上根据消息类型,然后重新格式化一条服务消息,然后压入相应的队列,考虑简单情况:
33 static void
34 forward_message(int type, bool padding, struct socket_message * result) {
35 struct skynet_socket_message *sm;
36 size_t sz = sizeof(*sm);
48 sm = (struct skynet_socket_message *)skynet_malloc(sz);
49 sm->type = type;
50 sm->id = result->id;
51 sm->ud = result->ud;
56 sm->buffer = result->data;
59 struct skynet_message message;
60 message.source = 0;
61 message.session = 0;
62 message.data = sm;
63 message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);
64
65 if (skynet_context_push((uint32_t)result->opaque, &message)) {
//free buffer
70 }
71 } //more code...
17 struct socket_message {
18 int id;
19 uintptr_t opaque;
20 int ud; // for accept, ud is new connection id ; for data, ud is size of data
21 char * data;
22 };
如果有padding数据,需要额外拷贝result->data数据,这种情况只会在消息类型是SOCKET_ACCEPT
和SOCKET_ERR
时。
再核心代码在socket_server_poll
中:
1353 int
1354 socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
1355 for (;;) {
1356 if (ss->checkctrl) {
1357 if (has_cmd(ss)) {
//用select监听recvctrl_fd上的读事件
1366 }
1367 }
1457 static void
1458 send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
1459 request->header[6] = (uint8_t)type;
1460 request->header[7] = (uint8_t)len;
1461 for (;;) {
1462 ssize_t n = write(ss->sendctrl_fd, &request->header[6], len+2); //写命令字到管道中,然后fd[0]可读,select返回可读事件
1463 if (n<0) {
1464 if (errno != EINTR) {
1466 }
1467 continue;
1468 }
1470 return;
1471 }
1472 }
比如,在lua层,listen一个ip/port的时候,并传backlog
大小,会调用llisten接口,然后调用skynet_socket_listen(ctx, host,port,backlog)
;接着调用socket_server_listen(SOCKET_SERVER, source, host, port, backlog)
;
1676 int
1677 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
1678 int fd = do_listen(addr, port, backlog);
1679 if (fd < 0) {
1680 return -1;
1681 }
1682 struct request_package request;
1683 int id = reserve_id(ss);
1684 if (id < 0) {
1685 close(fd);
1686 return id;
1687 }
1688 request.u.listen.opaque = opaque; //句柄
1689 request.u.listen.id = id;
1690 request.u.listen.fd = fd;
1691 send_request(ss, &request, 'L', sizeof(request.u.listen));
1692 return id;
1693 }
会分配唯一一个id,并存储相关的信息,最后调用send_request
。
has_cmd
中select返回可读事件后,从管道读命令字和具体的数据:
1092 // return type
1093 static int
1094 ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
1095 int fd = ss->recvctrl_fd;
1096 // the length of message is one byte, so 256+8 buffer size is enough.
1097 uint8_t buffer[256];
1098 uint8_t header[2];
1099 block_readpipe(fd, header, sizeof(header));
1100 int type = header[0];
1101 int len = header[1];
1102 block_readpipe(fd, buffer, len);
1103 // ctrl command only exist in local fd, so don't worry about endian.
1104 switch (type) {
1105 case 'S':
1106 return start_socket(ss,(struct request_start *)buffer, result);
1107 case 'B':
1108 return bind_socket(ss,(struct request_bind *)buffer, result);
1109 case 'L':
1110 return listen_socket(ss,(struct request_listen *)buffer, result);
//more code...
867 static int
868 listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) {
869 int id = request->id;
870 int listen_fd = request->fd;
871 struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false); //初始化监听套接字的相关数据
872 if (s == NULL) {
873 goto _failed;
874 }
875 s->type = SOCKET_TYPE_PLISTEN;
876 return -1;
877 _failed:
//do failed logic...
886 }
上面还没有监听listen socket,是在start_socket
接口中完成的:
940 static int
941 start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *re sult) {
942 int id = request->id;
943 result->id = id;
944 result->opaque = request->opaque;
945 result->ud = 0;
946 result->data = NULL;
947 struct socket *s = &ss->slot[HASH_ID(id)];
948 if (s->type == SOCKET_TYPE_INVALID || s->id !=id) {
949 result->data = "invalid socket";
950 return SOCKET_ERR;
951 }
952 struct socket_lock l;
953 socket_lock_init(s, &l);
954 if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
955 if (sp_add(ss->event_fd, s->fd, s)) {
956 force_close(ss, s, &l, result);
957 result->data = strerror(errno);
958 return SOCKET_ERR;
959 }
960 s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN ;
961 s->opaque = request->opaque;
962 result->data = "start";
963 return SOCKET_OPEN;
以上实现完成了对一个fd进行监听的实现,并设置相关的状态信息,以及路由(句柄);
接着socket_server_poll
,然后一个一个处理fd上的网络事件:
52 static int
53 sp_wait(int efd, struct event *e, int max) {
54 struct epoll_event ev[max];
55 int n = epoll_wait(efd , ev, max, -1);
56 int i;
57 for (i=0;i<n;i++) {
58 e[i].s = ev[i].data.ptr;
59 unsigned flag = ev[i].events;
60 e[i].write = (flag & EPOLLOUT) != 0;
61 e[i].read = (flag & (EPOLLIN | EPOLLHUP)) != 0;
62 e[i].error = (flag & EPOLLERR) != 0;
63 }
64
65 return n;
66 }
1368 if (ss->event_index == ss->event_n) {
1369 ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
1370 ss->checkctrl = 1;
1371 if (more) {
1372 *more = 0;
1373 }
1374 ss->event_index = 0;
1375 if (ss->event_n <= 0) {
1376 ss->event_n = 0;
1377 if (errno == EINTR) {
1378 continue;
1379 }
1380 return -1;
1381 }
1382 }
这里把事件都拷贝到了struct event *e
上面,在外层for循环处理;当if (ss->event_index == ss->event_n)
成立时表示本次sp_wait
返回的事件集都处理完毕;
接着socket_server_poll
:
1383 struct event *e = &ss->ev[ss->event_index++];
1384 struct socket *s = e->s;
1385 if (s == NULL) {
1386 // dispatch pipe message at beginning
1387 continue;
1388 }
1389 struct socket_lock l;
1390 socket_lock_init(s, &l);
1391 switch (s->type) {
1392 case SOCKET_TYPE_CONNECTING: //已连接的会在注册可写事件前,先判断有没有更多的数据要发送
1393 return report_connect(ss, s, &l, result);
1394 case SOCKET_TYPE_LISTEN: {
1395 int ok = report_accept(ss, s, result);
1396 if (ok > 0) {
1397 return SOCKET_ACCEPT;
1398 } if (ok < 0 ) {
1399 return SOCKET_ERR;
1400 }
1401 // when ok == 0, retry
1402 break;
1403 }
675 static inline int
676 send_buffer_empty(struct socket *s) {
677 return (s->high.head == NULL && s->low.head == NULL);
678 } //如果高和低优先级发送列表都为空
888 static inline int
889 nomore_sending_data(struct socket *s) {
890 return send_buffer_empty(s) && s->dw_buffer == NULL && (s->sending & 0xffff) == 0;
891 }
然后根据创建socket fd时的状态,比如是监听套接字,还是默认可读写等,这里举例有连接到来:
1289 static int
1290 report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
1291 union sockaddr_all u;
1292 socklen_t len = sizeof(u);
1293 int client_fd = accept(s->fd, &u.s, &len);
1294 if (client_fd < 0) {
1295 //do failed logic...
1304 }
1305 int id = reserve_id(ss);
1306 if (id < 0) {
1307 close(client_fd);
1308 return 0;
1309 }
1310 socket_keepalive(client_fd);
1311 sp_nonblocking(client_fd); //设置非阻塞
1312 struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
1313 if (ns == NULL) {
1314 close(client_fd);
1315 return 0;
1316 }
1317 ns->type = SOCKET_TYPE_PACCEPT;
1318 result->opaque = s->opaque; //服务句柄
1319 result->id = s->id; //索引id
1320 result->ud = id;
1321 result->data = NULL;
1322 //more code...
以上是accept连接,然后分配struct socket
存储结构,设置非阻塞,初始化数据,由gate服务处理;然后由lua
层的业务逻辑调用start_socket
来决定对应套接字是否要监听读写网络事件等。
剩下的就是读写事件了,分析中不考虑error事件:
1407 default:
1408 if (e->read) {
1409 int type;
1410 if (s->protocol == PROTOCOL_TCP) {
1411 type = forward_message_tcp(ss, s, &l, result);
1412 } else {
1413 type = forward_message_udp(ss, s, &l, result);
1414 if (type == SOCKET_UDP) {
1415 // try read again
1416 --ss->event_index;
1417 return SOCKET_UDP;
1418 }
1419 }
1420 if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
1421 // Try to dispatch write message next step if write flag set.
1422 e->read = false;
1423 --ss->event_index;//这里防止写事件饿死,故这次读事件处理一次后让出给同一个event_index的写事件
1424 }
1425 if (type == -1)
1426 break;
1427 return type;
1428 }
1429 if (e->write) {
1430 int type = send_buffer(ss, s, &l, result);
1431 if (type == -1)
1432 break;
1433 return type;
1434 }
以上大概分析下,如果是可读,根据协议类型,这里说明tcp的情况,把读到的消息塞到struct socket_message
中:
1189 result->opaque = s->opaque;
1190 result->id = s->id;
1191 result->ud = n; //buffer长度
1192 result->data = buffer;
但这里有个问题是,我们不知道一条完整的消息,这里可能是分段read的,然后根据消息的头部大小,分割一条条完整的消息,这里就这样把消息push到服务队列?然而这部分是由gate服务处理的,因为gate需要做些过滤处理,然后才把完整的一条消息push到队列中由工作线程处理。
对于写,由于上一次的写可能没有处理完,当本次写到来后,需要把上次没写完的,优先于high队列,并设置正确的剩余字节数:
734 if (s->dw_buffer) {
735 // add direct write buffer before high.head
736 struct write_buffer * buf = MALLOC(SIZEOF_TCPBUFFER);
737 struct send_object so;
738 buf->userobject = send_object_init(ss, &so, (void *)s->dw_buffer, s->dw_size);
739 buf->ptr = (char*)so.buffer+s->dw_offset;
740 buf->sz = so.sz - s->dw_offset;
741 buf->buffer = (void *)s->dw_buffer;
742 s->wb_size+=buf->sz;
743 if (s->high.head == NULL) {
744 s->high.head = s->high.tail = buf;
745 buf->next = NULL;
746 } else {
747 buf->next = s->high.head;
748 s->high.head = buf;
749 } //在high头插入未完成的write_buffer
750 s->dw_buffer = NULL;
751 }
752 int r = send_buffer_(ss,s,l,result);
send_buffer_
实现如注释所言:
680 /*
681 Each socket has two write buffer list, high priority and low priority.
682
683 1. send high list as far as possible.
684 2. If high list is empty, try to send low list.
685 3. If low list head is uncomplete (send a part before), move the head of low list to empty high list (call raise_uncomplete) .
686 4. If two lists are both empty, turn off the event. (call check_close)
687 */
这时会先写high队列,然后写low队列,如果low未写完,要把剩下的移到high队列,如果都没有数据要发送了,则移走可写事件;并判断对方是否是半关闭状态等。
这里举例下如何在主场景服务向client发消息,代理client的为agent,先在lua层调用skynet.send
后,把要发送的消息压入agent服务队列,工作线程处理到agent服务队列时,把消息传到lua层,agent的服务做了一些逻辑,再socket.wirte
到对应的skynet_socket_send
,并由网络模块发送。lua层使用的fd和c层的socket fd不是同一个,而是c的层分配的索引id。
而对于分包的处理在service_gate.c中,由于篇幅过长,这里不打算详细分析了。有兴趣的可以研究下当有client请求建立连接时,消息的走向;向服务发数据,和服务发数据到client时的流程。
总结:
整个网络模块设计时充分考虑的一些方面,比如防止读写事件的饿死情况,有优先级队列。
也在进行读写逻辑时,考虑对端半关闭状态处理;因为当本端业务层要发数据时,而对端已经shutdown读端,那么这些数据就没有必要发直接drop/free掉。
然后一般的网络组件,对于写时,都会先判断某个抽象的连接对象缓冲区有没有待写数据,有的话直接缓存起来并返回,否则直接send,如果没有send完则缓存并注册写事件,之后再删除写事件这样的方法,这里使用默认的水平模式。
浅析skynet底层框架下篇还是分析第三个小问题,以及如何利用协程处理消息,包括创建协程处理消息,挂起协程,切换,这块可能会比较复杂。
参考连接:
Linux下定时器的实现方式分析
深入剖析Linux内核定时器实现机制
clinet对server的通信流程
skynet中watchdog,gate,agent调用关系清理