Phxrpc中的coroutine实现分析:
由于Phxrpc代码量不是很多,大概花个一两天可以分析明白,里面把epoll+timer+协程用的蛮溜。它的框架还是单进程多线程模型,主线程绑定CPU,跑LoopAccept逻辑,根据hsha_server_->hsha_server_qos_.CanAccept()
创建新连接;然后通过轮询hsha_server_->server_unit_list_[idx_++]->AddAcceptedFd(accepted_fd)
分配给某个HshaServerUnit
对象,然后继续accept socket,贴一张连接中的图:
HshaServerUnit
是由一个线程池和调度线程组成,调度线程也做了IO逻辑,一个线程里可能有协程实现。
当构造HshaServerUnit
时,创建一个线程池,每个工作线程可能是协程模式,由uthread_count_分区:
408 void Worker::Func() {
409 if (uthread_count_ == 0) {
410 ThreadMode();
411 } else {
412 UThreadMode();
413 }
414 }
433 void Worker::UThreadMode() {
434 worker_scheduler_ = new UThreadEpollScheduler(utherad_stack_size_, uthread_count_, true);
435 assert(worker_scheduler_ != nullptr);
436 worker_scheduler_->SetHandlerNewRequestFunc(bind(&Worker::HandlerNewRequestFunc, this));
437 worker_scheduler_->RunForever();
438 }
调度线程的主要逻辑在Run中,部分源码如下:
699 void HshaServerIO::RunForever() {
700 scheduler_->SetHandlerAcceptedFdFunc(bind(&HshaServerIO::HandlerAcceptedFd, this));
701 scheduler_->SetActiveSocketFunc(bind(&HshaServerIO::ActiveSocketFunc, this));
702 scheduler_->RunForever();
703 }
239 bool UThreadEpollScheduler::Run() {
240 ConsumeTodoList();
241
242 struct epoll_event * events = (struct epoll_event*) calloc(max_task_, sizeof(struct epoll_event));
243
244 int next_timeout = timer_.GetNextTimeout();
245
246 for (; (run_forever_) || (!runtime_.IsAllDone());) {
247 int nfds = epoll_wait(epoll_fd_, events, max_task_, 4);
248 if (nfds != -1) {
249 for (int i = 0; i < nfds; i++) {
250 UThreadSocket_t * socket = (UThreadSocket_t*) events[i].data.ptr;
251 socket->waited_events = events[i].events;
252
253 runtime_.Resume(socket->uthread_id);
254 }
255
256 //for server mode
257 if (active_socket_func_ != nullptr) {
258 UThreadSocket_t * socket = nullptr;
259 while ((socket = active_socket_func_()) != nullptr) {
260 runtime_.Resume(socket->uthread_id);
261 }
262 }
263
264 //for server uthread worker
265 if (handler_new_request_func_ != nullptr) {
266 handler_new_request_func_();
267 }
268
269 if (handler_accepted_fd_func_ != nullptr) {
270 handler_accepted_fd_func_();
271 }
272
273 if (closed_) {
274 ResumeAll(UThreadEpollREvent_Close);
275 break;
276 }
277
278 ConsumeTodoList();
279 DealwithTimeout(next_timeout);
280 } else if (errno != EINTR) {
281 ResumeAll(UThreadEpollREvent_Error);
282 break;
283 }
284
285 StatEpollwaitEvents(nfds);
286 }
287
288 free(events);
289
290 return true;
291 }
以上的主要逻辑分别是处理一些事件,和超时事件,强制wait四毫秒,如果有事件发生则resume协程处理,如果有response要发送则从data_flow取出放入某个协程并resume,如果是协程模式则有新的request从data_flow取出交由协程处理,然后是resume协程处理新的连接并关联IOFunc处理函数:
561 void HshaServerIO::HandlerAcceptedFd() {
562 lock_guard<mutex> lock(queue_mutex_);
563 while (!accepted_fd_list_.empty()) {
564 int accepted_fd = accepted_fd_list_.front();
565 accepted_fd_list_.pop();
566 scheduler_->AddTask(bind(&HshaServerIO::IOFunc, this, accepted_fd), nullptr);
567 }
568 }
大概逻辑如下,为accepted_fd创建UThreadSocket_t对象,读请求(封装了socket buffer相关的东西),并塞进data_flow中,并notify协程,然后因wait切出协程,等协程处理完毕后,切回来把response塞进data_flow中,其中有一些逻辑判断就省略了,部分实现如下:
570 void HshaServerIO::IOFunc(int accepted_fd) {
571 UThreadSocket_t *socket{scheduler_->CreateSocket(accepted_fd)};
572 UThreadTcpStream stream;
573 stream.Attach(socket);
574 UThreadSetSocketTimeout(*socket, config_->GetSocketTimeoutMS());
575
576 while (true) {
582 unique_ptr<BaseProtocolFactory> factory(
583 BaseProtocolFactory::CreateFactory(stream));
584 unique_ptr<BaseProtocol> protocol(factory->GenProtocol());
586 BaseRequest *req{nullptr};
587 ReturnCode ret{protocol->ServerRecv(stream, req)};
588 if (ReturnCode::OK != ret) {
589 delete req;
595 break;
596 }
600 if (!data_flow_->CanPushRequest(config_->GetMaxQueueLength())) {
601 delete req;
604 break;
605 }
606
607 if (!hsha_server_qos_->CanEnqueue()) {
609 delete req;
614 break;
615 }
622 data_flow_->PushRequest((void *)socket, req);
625 worker_pool_->Notify();
626 UThreadSetArgs(*socket, nullptr);
628 UThreadWait(*socket, config_->GetSocketTimeoutMS());
629 if (UThreadGetArgs(*socket) == nullptr) {
636 socket = stream.DetachSocket();
637 UThreadLazyDestory(*socket);
641 break;
642 }
645 {
646 BaseResponse *resp((BaseResponse *)UThreadGetArgs(*socket));
647 if (!resp->fake()) {
648 ret = resp->ModifyResp(is_keep_alive, version);
649 ret = resp->Send(stream);
651 }
652 delete resp;
653 }
658 if (ReturnCode::OK != ret) {
660 }
662 if (!is_keep_alive || (ReturnCode::OK != ret)) {
663 break;
664 }
665 }
668 }
然后是处理超时事件,timer用的是小根堆实现的,GetNextTimeout取下一个超时时间,如果没有则break,否则PopTimeout节点处理:
309 void UThreadEpollScheduler::DealwithTimeout(int & next_timeout) {
310 while (true) {
311 next_timeout = timer_.GetNextTimeout();
312 if (next_timeout != 0) {
313 break;
314 }
315
316 UThreadSocket_t * socket = timer_.PopTimeout();
317 socket->waited_events = UThreadEpollREvent_Timeout;
318 runtime_.Resume(socket->uthread_id);
319 }
320 }
以上是整个Run的处理逻辑,运行在调度线程中,有些细节没说明。
78 void EpollNotifier :: Run() {
79 assert(pipe(pipe_fds_) == 0);
80 fcntl(pipe_fds_[1], F_SETFL, O_NONBLOCK);
81 scheduler_->AddTask(std::bind(&EpollNotifier::Func, this), nullptr);
82 }
83
84 void EpollNotifier :: Func() {
85 UThreadSocket_t * socket = scheduler_->CreateSocket(pipe_fds_[0], -1, -1, false);
86 char tmp[2] = {0};
87 while (true) {
88 if (UThreadRead(*socket, tmp, 1, 0) < 0) {
89 break;
90 }
91 }
92 free(socket);
93 }
94
95 void EpollNotifier :: Notify() {
96 ssize_t write_len = write(pipe_fds_[1], (void *)"a", 1);
97 if (write_len < 0) {
99 }
100 }
526 void WorkerPool::Notify() {
527 lock_guard<mutex> lock(mutex_);
528 if (last_notify_idx_ == worker_list_.size()) {
529 last_notify_idx_ = 0;
530 }
531
532 worker_list_[last_notify_idx_++]->Notify();
533 }
485 void Worker::Notify() {
486 if (uthread_count_ == 0) {
487 return;
488 }
489
490 worker_scheduler_->NotifyEpoll();
491 }
上面几行是协程的等待和唤醒,每个工作线程如果是协程模式,也会有个调度功能,跟线程模式没多大关系。当有request/response的时候,会通过pipe写一个字符,然后发生可读事件然后进行后续处理,整个过程比较简单。
以下为accept/read/send/这三部分实现,基本对fd设置了非阻塞,然后注册监听事件和超时,并切换协程,当发生事件时或超时时,resume的协程会进行删除,如果是发生了感兴趣的事件,则进行accept/read/send等处理;
324 int UThreadPoll(UThreadSocket_t & socket, int events, int * revents, int timeout_ms) {
325 int ret = -1;
327 socket.uthread_id = socket.scheduler->GetCurrUThread();
329 socket.event.events = events;
330
331 socket.scheduler->AddTimer(&socket, timeout_ms);
332 epoll_ctl(socket.epoll_fd, EPOLL_CTL_ADD, socket.socket, &socket.event);
333
334 socket.scheduler->YieldTask();
336 epoll_ctl(socket.epoll_fd, EPOLL_CTL_DEL, socket.socket, &socket.event);
337 socket.scheduler->RemoveTimer(socket.timer_id);
338
339 *revents = socket.waited_events;
340
341 if ((*revents) > 0) {
342 if ((*revents) & events) {
343 ret = 1;
344 } else {
345 errno = EINVAL;
346 ret = 0;
347 }
348 } else if ((*revents) == UThreadEpollREvent_Timeout) {
349 //timeout
350 errno = ETIMEDOUT;
351 ret = 0;
352 } else if ((*revents) == UThreadEpollREvent_Error){
353 //error
354 errno = ECONNREFUSED;
355 ret = -1;
356 } else {
357 //active close
358 errno = 0;
359 ret = -1;
360 }
361
362 return ret;
363 }
424 int UThreadAccept(UThreadSocket_t & socket, struct sockaddr *addr, socklen_t *addrlen) {
425 int ret = accept(socket.socket, addr, addrlen);
426 if (ret < 0) {
427 if (EAGAIN != errno && EWOULDBLOCK != errno) {
428 return -1;
429 }
430
431 int revents = 0;
432 if (UThreadPoll(socket, EPOLLIN, &revents, -1) > 0) {
433 ret = accept(socket.socket, addr, addrlen);
434 } else {
435 ret = -1;
436 }
437 }
438
439 return ret;
440 }
441
442 ssize_t UThreadRead(UThreadSocket_t & socket, void * buf, size_t len, int flags) {
443 //int ret = read(socket.socket, buf, len);
444 int ret = -1;
445
446 //if (ret < 0 && EAGAIN == errno) {
447 int revents = 0;
448 if (UThreadPoll(socket, EPOLLIN, &revents, socket.socket_timeout_ms) > 0) {
449 ret = read(socket.socket, buf, len);
450 } else {
451 ret = -1;
452 }
453 //}
454
455 return ret;
456 }
474 ssize_t UThreadSend(UThreadSocket_t & socket, const void *buf, size_t len, int flags) {
475 int ret = send(socket.socket, buf, len, flags);
476
477 if (ret < 0 && EAGAIN == errno) {
478 int revents = 0;
479 if (UThreadPoll(socket, EPOLLOUT, &revents, socket.socket_timeout_ms) > 0) {
480 ret = send(socket.socket, buf, len, flags);
481 } else {
482 ret = -1;
483 }
484 }
485
486 return ret;
487 }
下面的WorkerLogic实现是协程或线程实际处理request的实现,从data_flow中拿消息,然后dispatch_处理后放入data_flow中,并NotifyEpoll:
459 void Worker::WorkerLogic(void *args, BaseRequest *req, int queue_wait_time_ms) {
464 BaseResponse *resp{req->GenResponse()};
465 if (queue_wait_time_ms < MAX_QUEUE_WAIT_TIME_COST) {
466 HshaServerStat::TimeCost time_cost;
468 DispatcherArgs_t dispatcher_args(pool_->hsha_server_stat_->hsha_server_monitor_,
469 worker_scheduler_, pool_->args_);
470 pool_->dispatch_(req, resp, &dispatcher_args);
474 } else {
475 pool_->hsha_server_stat_->worker_drop_requests_++;
476 }
477 pool_->data_flow_->PushResponse(args, resp);
480 pool_->scheduler_->NotifyEpoll();
482 delete req;
483 }
下面是整个协程的实现,协程的调度由UThreadEpollScheduler
来操作,上面也作了些分析,管理由UThreadRuntime
。
以下实现是每个协程的栈,私有栈,使用文件映射的方法作了些保护:
29 class UThreadStackMemory {
30 public:
31 UThreadStackMemory(const size_t stack_size, const bool need_protect = true);
32 ~UThreadStackMemory();
33
34 void * top();
35 size_t size();
36
37 private:
38 void * raw_stack_;
39 void * stack_;
40 size_t stack_size_;
41 int need_protect_;
42 };
33 UThreadStackMemory :: UThreadStackMemory(const size_t stack_size, const bool need_protect) :
34 raw_stack_(nullptr), stack_(nullptr), need_protect_(need_protect) {
35 int page_size = getpagesize();
36 if ((stack_size % page_size) != 0) {
37 stack_size_ = (stack_size / page_size + 1) * page_size;
38 } else {
39 stack_size_ = stack_size;
40 }
41
42 if (need_protect) {
43 raw_stack_ = mmap(NULL, stack_size_ + page_size * 2,
44 PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
45 assert(raw_stack_ != nullptr);
46 assert(mprotect(raw_stack_, page_size, PROT_NONE) == 0);
47 assert(mprotect((void *)((char *)raw_stack_ + stack_size_ + page_size), page_size, PROT_NONE) == 0);
48 stack_ = (void *)((char *)raw_stack_ + page_size);
49 } else {
50 raw_stack_ = mmap(NULL, stack_size_, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
51 assert(raw_stack_ != nullptr);
52 stack_ = raw_stack_;
53 }
54 }
55
56 UThreadStackMemory :: ~UThreadStackMemory() {
57 int page_size = getpagesize();
58 if (need_protect_) {
59 assert(mprotect(raw_stack_, page_size, PROT_READ | PROT_WRITE) == 0);
60 assert(mprotect((void *)((char *)raw_stack_ + stack_size_ + page_size), page_size, PROT_READ | PROT_WRITE) == 0);
61 assert(munmap(raw_stack_, stack_size_ + page_size * 2) == 0);
62 } else {
63 assert(munmap(raw_stack_, stack_size_) == 0);
64 }
65 }
以下是协程对象的基类和派生类的具体实现:
36 class UThreadContext {
37 public:
38 UThreadContext() { }
39 virtual ~UThreadContext() { }
40
41 static UThreadContext * Create(size_t stack_size,
42 UThreadFunc_t func, void * args,
43 UThreadDoneCallback_t callback, const bool need_stack_protect);
44 static void SetContextCreateFunc(ContextCreateFunc_t context_create_func);
45 static ContextCreateFunc_t GetContextCreateFunc();
46
47 virtual void Make(UThreadFunc_t func, void * args) = 0;
48 virtual bool Resume() = 0;
49 virtual bool Yield() = 0;
50
51 private:
52 static ContextCreateFunc_t context_create_func_;
53 };
28 UThreadContext * UThreadContext :: Create(size_t stack_size,
29 UThreadFunc_t func, void * args,
30 UThreadDoneCallback_t callback, const bool need_stack_protect) {
31 if (context_create_func_ != nullptr) {
32 return context_create_func_(stack_size, func, args, callback, need_stack_protect);
33 }
34 return nullptr;
35 }
34 class UThreadContextSystem : public UThreadContext {
35 public:
36 UThreadContextSystem(size_t stack_size, UThreadFunc_t func, void * args,
37 UThreadDoneCallback_t callback, const bool need_stack_protect);
38 ~UThreadContextSystem();
39
40 static UThreadContext * DoCreate(size_t stack_size,
41 UThreadFunc_t func, void * args, UThreadDoneCallback_t callback,
42 const bool need_stack_protect);
43
44 void Make(UThreadFunc_t func, void * args) override;
45 bool Resume() override;
46 bool Yield() override;
47
48 ucontext_t * GetMainContext();
49
50 private:
51 static void UThreadFuncWrapper(uint32_t low32, uint32_t high32);
52
53 ucontext_t context_;
54 UThreadFunc_t func_;
55 void * args_;
56 UThreadStackMemory stack_;
57 UThreadDoneCallback_t callback_;
58 };
30
31 UThreadContextSystem :: UThreadContextSystem(size_t stack_size, UThreadFunc_t func, void * args,
32 UThreadDoneCallback_t callback, const bool need_stack_protect)
33 : func_(func), args_(args), stack_(stack_size, need_stack_protect), callback_(callback) {
34 Make(func, args);
35 }
40 UThreadContext * UThreadContextSystem :: DoCreate(size_t stack_size,
41 UThreadFunc_t func, void * args, UThreadDoneCallback_t callback,
42 const bool need_stack_protect) {
43 return new UThreadContextSystem(stack_size, func, args, callback, need_stack_protect);
44 }
69 ucontext_t * UThreadContextSystem :: GetMainContext() {
70 static __thread ucontext_t main_context;
71 return &main_context;
72 }
上面的实现是用的ucontext_t,其中用到的几个函数说明下:
int getcontext(ucontext_t *ucp) 用于将当前执行状态上下文保存到一个ucontext_t结构中;
int makecontext(ucontext_t ucp, void (func)(), int argc, ...) 用于初始化一个ucontext_t类型的结构,即上下文,每个参数类型都是int型,函数指针func指明了该context的入口函数,在执行该函数前,一般要执行getcontext并设置相关的初始栈信息,包括栈指针和栈大小等信息;
int swapcontext(ucontext_t *oucp, ucontext_t *ucp) 用于完成旧状态的保存和切换到新状态的工作;
以上具体原理可以移到参考资料中。
31 class UThreadRuntime {
32 public:
33 UThreadRuntime(size_t stack_size, const bool need_stack_protect);
34 ~UThreadRuntime();
35
36 int Create(UThreadFunc_t func, void * args);
37 int GetCurrUThread();
38 bool Yield();
39 bool Resume(size_t index);
40 bool IsAllDone();
41 int GetUnfinishedItemCount() const;
42
43 void UThreadDoneCallback();
44
45 private:
46 struct ContextSlot {
47 ContextSlot() {
48 context = nullptr;
49 next_done_item = -1;
50 }
51 UThreadContext * context;
52 int next_done_item;
53 int status;
54 };
55
56 size_t stack_size_;
57 std::vector<ContextSlot> context_list_;
58 int first_done_item_;
59 int current_uthread_;
60 int unfinished_item_count_;
61 bool need_stack_protect_;
62 };
36 UThreadRuntime :: UThreadRuntime(size_t stack_size, const bool need_stack_protect)
37 :stack_size_(stack_size), first_done_item_(-1),
38 current_uthread_(-1), unfinished_item_count_(0),
39 need_stack_protect_(need_stack_protect) {
40 if (UThreadContext::GetContextCreateFunc() == nullptr) {
41 UThreadContext::SetContextCreateFunc(UThreadContextSystem::DoCreate);
42 }
43 }
111 return true;
112 }
上面差不多是管理协程的池,如何使用?比如如下代码段:
193 void UThreadEpollScheduler::ConsumeTodoList() {
194 while (!todo_list_.empty()) {
195 auto & it = todo_list_.front();
196 int id = runtime_.Create(it.first, it.second);
197 runtime_.Resume(id);
198
199 todo_list_.pop();
200 }
201 }
对每一个task,创建协程,如果first_done_item_
表示有可用的协程,那么复用它并修改first_done_item_
,next_done_item
表示下一个可用的协程索引号,然后调用Make进行设置和初始化当前协程的上下文,然后设置uc_link执行完毕回到的主协程,即main_context
,UThreadFuncWrapper
是协程的入口函数,把this分段成两个32位的值,在UThreadFuncWrapper
里又拼接成this,然后进行处理,如果有回调那么进行回调UThreadDoneCallback
,它的作用差不多类似把协程资源归还到vector中,并更新一些数据:
74 void UThreadContextSystem :: UThreadFuncWrapper(uint32_t low32, uint32_t high32) {
75 uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t) high32 << 32);
76 UThreadContextSystem * uc = (UThreadContextSystem *)ptr;
77 uc->func_(uc->args_);
78 if (uc->callback_ != nullptr) {
79 uc->callback_();
80 }
81 }
77 void UThreadRuntime :: UThreadDoneCallback() {
78 if (current_uthread_ != -1) {
79 ContextSlot & context_slot = context_list_[current_uthread_];
80 context_slot.next_done_item = first_done_item_;
81 context_slot.status = UTHREAD_DONE;
82 first_done_item_ = current_uthread_;
83 unfinished_item_count_--;
84 current_uthread_ = -1;
85 }
86 }
如果first_done_item_
为-1表示没有可用的协程,那么创建新的协程对象,并进行初始化,然后入vector进行管理。
51 int UThreadRuntime :: Create(UThreadFunc_t func, void * args) {
52 if (func == nullptr) {
53 return -2;
54 }
55 int index = -1;
56 if (first_done_item_ >= 0) {
57 index = first_done_item_;
58 first_done_item_ = context_list_[index].next_done_item;
59 context_list_[index].context->Make(func, args);
60 } else {
61 index = context_list_.size();
62 auto new_context = UThreadContext::Create(stack_size_, func, args,
63 std::bind(&UThreadRuntime::UThreadDoneCallback, this),
64 need_stack_protect_);
65 assert(new_context != nullptr);
66 ContextSlot context_slot;
67 context_slot.context = new_context;
68 context_list_.push_back(context_slot);
69 }
70
71 context_list_[index].next_done_item = -1;
72 context_list_[index].status = UTHREAD_SUSPEND;
73 unfinished_item_count_++;
74 return index;
75 }
46 void UThreadContextSystem :: Make(UThreadFunc_t func, void * args) {
47 func_ = func;
48 args_ = args;
49 getcontext(&context_);
50 context_.uc_stack.ss_sp = stack_.top();
51 context_.uc_stack.ss_size = stack_.size();
52 context_.uc_stack.ss_flags = 0;
53 context_.uc_link = GetMainContext();
54 uintptr_t ptr = (uintptr_t)this;
55 makecontext(&context_, (void (*)(void))UThreadContextSystem::UThreadFuncWrapper,
56 2, (uint32_t)ptr, (uint32_t)(ptr >> 32));
57 }
创建完协程后,进行Resume
开始切入该协程,设置status为running状态,并切出主协程,切入要运行的协程:
88 bool UThreadRuntime :: Resume(size_t index) {
89 if (index >= context_list_.size()) {
90 return false;
91 }
92
93 auto context_slot = context_list_[index];
94 if (context_slot.status == UTHREAD_SUSPEND) {
95 current_uthread_ = index;
96 context_slot.status = UTHREAD_RUNNING;
97 context_slot.context->Resume();
98 return true;
99 }
100 return false;
101 }
59 bool UThreadContextSystem :: Resume() {
60 swapcontext(GetMainContext(), &context_);
61 return true;
62 }
如果要等待某个资源,那么协程需要让出给另一个协程执行,即Yield
,根据当前的协程索引,获得协程对象,并设置status为挂起suspend,然后让出:
103 bool UThreadRuntime :: Yield() {
104 if (current_uthread_ != -1) {
105 auto context_slot = context_list_[current_uthread_];
106 current_uthread_ = -1;
107 context_slot.status = UTHREAD_SUSPEND;
108 context_slot.context->Yield();
109 }
64 bool UThreadContextSystem :: Yield() {
65 swapcontext(&context_, GetMainContext());
66 return true;
67 }
以上差不多是整个协程池的实现了,后期也会分析下libco中的实现,跟这个不一样,然后pebble中的协程也会分析下。
大概分析如下,其他的一些比如data_flow/buffer/timer等实现,如果有时间会分析下,以下连接中有分析了。基本上对于一个框架的理解,主要是从整个框架比如多进程还是单进程多线程,从消息源到处理,到返回消息响应,这么分析会比较好,然后再慢慢加上其他功能,比如统计,负载情况,以及怎么处理消息,怎么缓存请求等。
参考资料:
https://blog.csdn.net/shanshanpt/article/details/55213287
https://blog.csdn.net/shanshanpt/article/details/55253379
https://blog.csdn.net/lmfqyj/article/details/79437157
https://blog.csdn.net/lmfqyj/article/details/79406952
http://man7.org/linux/man-pages/man2/mmap.2.html
https://segmentfault.com/p/1210000009166339/read
http://man7.org/linux/man-pages/man2/getcontext.2.html
https://segmentfault.com/a/1190000013177055