Libco协程库实现

腾讯开源的Libco协程库,以前看过部分源码,所有的协程都用数组模拟栈表示,里面使用到的技术点有hook系统函数,时间轮定时器,epoll,共享栈等,但没有协程池,当协程完成任务时,这里只在example中用了vector保存。

涉及的源文件不是很多,从example_echosvr.cpp中开始分析,echo例子最简单(读写和连接),只考虑单进程单线程情况,只分析原理,有些细节比如如何实现闭包,类似lua中的closure/upvalue,不会分析,不过lua中是会存储相关的索引,有时间会分析下lua中实现此功能的原理;只考虑x86-64架构。

struct stCoRoutine_t表示的是协程对象的基本数据:

 49 struct stCoRoutine_t
 50 {
 51     stCoRoutineEnv_t *env;
 52     pfn_co_routine_t pfn;
 53     void *arg;
 54     coctx_t ctx;
 55 
 56     char cStart;
 57     char cEnd;
 58     char cIsMain;
 59     char cEnableSysHook;
 60     char cIsShareStack;
 61     
 62     void *pvEnv;
 65     stStackMem_t* stack_mem;
 67 
 68     //save satck buffer while confilct on same stack_buffer;
 69     char* stack_sp;
 70     unsigned int save_size;
 71     char* save_buffer;
 72 
 73     stCoSpec_t aSpec[1024];
 75 };

其中ctx是协程的上下文:

 28 struct coctx_t
 29 {
 30 #if defined(__i386__)
 31     void *regs[ 8 ];
 32 #else
 33     void *regs[ 14 ];
 34 #endif
 35     size_t ss_size;
 36     char *ss_sp;
 37   
 38 };

这些寄存器regs和栈指针,大小会在后面分析;pfn是协程运行入口函数;cIsShareStack表示是否启用共享栈;其他几个变量在分析实现时会解释作用。

这里没有使用ucontext_t,而是使用的struct coctx_t,可以对比着看:

typedef struct ucontext_t {
        struct ucontext_t *uc_link;
        sigset_t          uc_sigmask;
        stack_t           uc_stack;
        mcontext_t        uc_mcontext;
        ...
} ucontext_t;

以上定义在<sys/ucontext_t.h>文件中,uc_sigmask是信号集合typedef unsigned long int __sigset_t,这里不做过多说明,uc_link表示后继协程的上下文,在另外两篇分析协程的博客包括本篇,uc_link表示的是主协程即mainuc_stack表示栈的信息即栈指针,ss_size栈大小 ,ss_flags暂时不是很清楚作用:

 26 typedef struct 
 27   {
 28     void *ss_sp;
 29     size_t ss_size;
 30     int ss_flags;
 31   } stack_t

uc_mcontext表示寄存器:

 118 typedef struct
 119   {
 120     gregset_t __ctx(gregs);
        //more code...
 122   } mcontext_t;

 38 /* Container for all general registers.  */
 39 typedef greg_t gregset_t[__NGREG];

关于ucontext_t实现原理和四个操作函数会在链接中给出参考资料,可能这里也会做一些说明,综上,这个结构和上面声明的struct coctx_t整体上差不多,作用也是。

struct stCoRoutineEnv_t表示的是整个环境信息:

  51 struct stCoRoutineEnv_t
  52 {
  53     stCoRoutine_t *pCallStack[ 128 ];
  54     int iCallStackSize;
  55     stCoEpoll_t *pEpoll;
  56 
  57     //for copy stack log lastco and nextco
  58     stCoRoutine_t* pending_co;
  59     stCoRoutine_t* occupy_co;
  60 };

pending_cooccupy_co表示切出和切入协程;pCallStack表示有哪些协程在等待,最多只支持128个,可调整。

 312 struct stCoEpoll_t
 313 {
 314     int iEpollFd;
 315     static const int _EPOLL_SIZE = 1024 * 10;
 316 
 317     struct stTimeout_t *pTimeout;
 319     struct stTimeoutItemLink_t *pstTimeoutList;
 321     struct stTimeoutItemLink_t *pstActiveList;
 323     co_epoll_res *result;
 325 };

struct stCoEpoll_t这个结构体用于等待事件和超时,所有的协程对象最后都会挂在上面,作用和Pebble协程中的声明类似,可参考另外一篇。

下面以例子来分析整个工作过程,假设一个task如下声明:

 45 struct task_t
 46 {
 47     stCoRoutine_t *co;
 48     int fd;
 49 };

创建一个task并运行,这里例举有两个task,不包括主协程:

243             task_t * task = (task_t*)calloc( 1,sizeof(task_t) );
244             task->fd = -1;
245 
246             co_create( &(task->co),NULL,readwrite_routine,task );
247             co_resume( task->co );

readwrite_routine为协程的入口函数;task是函数的参数:

 521 int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
 522 {
 523     if( !co_get_curr_thread_env() )
 524     {
 525         co_init_curr_thread_env();
 526     }
 527     stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
 528     *ppco = co;
 529     return 0;
 530 }
 704 static stCoRoutineEnv_t* g_arrCoEnvPerThread[ 204800 ] = { 0 };
 705 void co_init_curr_thread_env()
 706 {
 707     pid_t pid = GetPid();
 708     g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );
 709     stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];
 710 
 711     env->iCallStackSize = 0;
 712     struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
 713     self->cIsMain = 1;
 714 
 715     env->pending_co = NULL;
 716     env->occupy_co = NULL;
 717 
 718     coctx_init( &self->ctx );
 719 
 720     env->pCallStack[ env->iCallStackSize++ ] = self;
 721 
 722     stCoEpoll_t *ev = AllocEpoll();
 723     SetEpoll( env,ev );
 724 }

以上当stCoRoutineEnv_t环境不存在时,以当前线程id创建一个环境变量,并创建主协程co_create_env(我们举的单线程),并coctx_initself->cIsMain = 1;env->pCallStack[0]为主协程:

 461 struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
 462         pfn_co_routine_t pfn,void *arg )
 463 {
         ... more code
 495     if( at.share_stack )
 496     {
 497         stack_mem = co_get_stackmem( at.share_stack);
 498         at.stack_size = at.share_stack->stack_size;
 499     }
 500     else
 501     {
 502         stack_mem = co_alloc_stackmem(at.stack_size);
 503     }

 504     lp->stack_mem = stack_mem;
 506     lp->ctx.ss_sp = stack_mem->stack_buffer;
 507     lp->ctx.ss_size = at.stack_size;

以上根据参数attr是否开启共享栈,这里分析不考虑共享栈,后面会单独分析下使用共享栈的作用。

 269 stStackMem_t* co_alloc_stackmem(unsigned int stack_size)
 270 {
 271     stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t));
 272     stack_mem->occupy_co= NULL;
 273     stack_mem->stack_size = stack_size;
 274     stack_mem->stack_buffer = (char*)malloc(stack_size);
 275     stack_mem->stack_bp = stack_mem->stack_buffer + stack_size;
 276     return stack_mem;
 277 }

以上创建协程的栈并设置大小,目前是128kb,然后由于栈是从高地址往低地址增长的,故设置桢指针stack_mem->stack_bp = stack_mem->stack_buffer + stack_size

然后创建task的协程:

stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );

并设置这几个值:

 490     lp->env = env;
 491     lp->pfn = pfn;
 492     lp->arg = arg;

即:

 490     lp->env = env;
 491     lp->pfn = readwrite_routine;
 492     lp->arg = task;

以上是create一个协程的过程,下面是创建完后要resume,即切入到刚才创建的协程运行:

 547 void co_resume( stCoRoutine_t *co )
 548 {          
 549     stCoRoutineEnv_t *env = co->env;
 550     stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
 551     if( !co->cStart ) 
 552     {      
 553         coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
 554         co->cStart = 1;
 555     }  
 556     env->pCallStack[ env->iCallStackSize++ ] = co; 
 557     co_swap( lpCurrRoutine, co );  
 560 }

这个函数的大概功能是将要运行的协程(切入)和主协程调换(切出),第一次resumecStart为0,只设置一次并设置协程的入口函数CoRoutineFunc和参数co,后续的yieldresume不会再coctx_make,然后当前协程压栈进行co_swap

115 int coctx_make( coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1 )
116 {
117     char *sp = ctx->ss_sp + ctx->ss_size;
118     sp = (char*) ((unsigned long)sp & -16LL  );
119 
120     memset(ctx->regs, 0, sizeof(ctx->regs));
121 
122     ctx->regs[ kRSP ] = sp - 8;
124     ctx->regs[ kRETAddr] = (char*)pfn;
125 
126     ctx->regs[ kRDI ] = (char*)s;
127     ctx->regs[ kRSI ] = (char*)s1;
128     return 0;
129 }

详细分析下这里面的实现,以上设置sp指向高地址并对齐;另外,“%rdi,%rsi,%rdx,%rcx,%r8,%r9用作函数参数,依次对应第1参数,第2参数...”,所以ctx->regs[ kRDI ]为协程co参数,ctx->regs[ kRSP ]设置栈指针减8处的地址,具体用处在后面分析;ctx->regs[ kRETAddr]为协程的入口函数;

co_swap主要功能是保存切出协程的栈空间,然后恢复切入协程的栈空间:

 599 void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
 600 {  
 601     stCoRoutineEnv_t* env = co_get_curr_thread_env();
 602    
 603     //get curr stack sp
 604     char c;
 605     curr->stack_sp= &c;
 606 
 607     if (!pending_co->cIsShareStack)
 608     {
 609         env->pending_co = NULL;
 610         env->occupy_co = NULL;
 611     }
 612     else
 613     {   
 614         env->pending_co = pending_co;
 615         //get last occupy co on the same stack mem
 616         stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
 617         //set pending co to occupy thest stack mem;
 618         pending_co->stack_mem->occupy_co = pending_co;
 619         
 620         env->occupy_co = occupy_co;
 621         if (occupy_co && occupy_co != pending_co)
 622         {   
 623             save_stack_buffer(occupy_co);
 624         }
 625     }
 626 
 627     //swap context
 628     coctx_swap(&(curr->ctx),&(pending_co->ctx) );
 629 
 630     //stack buffer may be overwrite, so get again;
 631     stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
 632     stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
 633     stCoRoutine_t* update_pending_co = curr_env->pending_co;
 634 
 635     if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
 636     {
 637         //resume stack buffer
 638         if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
 639         {
 640             memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
 641         }
 642     }
 643 }

 582 void save_stack_buffer(stCoRoutine_t* occupy_co)
 583 {
 584     ///copy out
 585     stStackMem_t* stack_mem = occupy_co->stack_mem;
 586     int len = stack_mem->stack_bp - occupy_co->stack_sp;
 587 
 588     if (occupy_co->save_buffer)
 589     {
 590         free(occupy_co->save_buffer), occupy_co->save_buffer = NULL;
 591     }
 592 
 593     occupy_co->save_buffer = (char*)malloc(len); //malloc buf;
 594     occupy_co->save_size = len;
 595 
 596     memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len);
 597 }

上面的实现涉及到了共享栈的模式,如下图:


共享栈

考虑在共享栈模式下,一开始某块栈被协程a占用,然后此时b要切入,那么代码行614〜624执行后:

env->pending_co = b
pending_co->stack_mem->occupy_co为null,故occupy_co=null
pending_co->stack_mem->occupy_co = b

表示此时共享栈被b占用,然后也不需要把共享栈的内容拷贝到occupy_co,因为此时为null;
如果再发生一次切换,即a切入,b切出:

env->pending_co = a
pending_co->stack_mem->occupy_co为b,故occupy_co=b
pending_co->stack_mem->occupy_co = a

表示此时栈被a占用,然后进行了save_stack_buffer,算出实际使用多少栈空间,然后释放原来的save_buffer,并分配实际需要大小的栈空间,并从stack_sp拷贝len大小的内容进行保存。
使用共享栈,可以不必为每个协程创建固定大小的栈,比如每个协程占用128kb空间,那么32GB的内存,假设都用来创建协程,也只有二十五万个左右,而如果使用共享栈,比如每10个共享一个128kb,那么有二百五十万个这样。
当然使用共享栈也带来了一些性能方面的问题,比如malloc/free/memcpy等,还有一定的碎片。
代码行630〜643表示coctx_swap后需要恢复栈空间;

coctx_swap是一段汇编代码:

 56 #elif defined(__x86_64__)
 57     leaq 8(%rsp),%rax
 58     leaq 112(%rdi),%rsp
 59     pushq %rax
 60     pushq %rbx
 61     pushq %rcx
 62     pushq %rdx
 63 
 64     pushq -8(%rax) //ret func addr
 65 
 66     pushq %rsi
 67     pushq %rdi
 68     pushq %rbp
 69     pushq %r8
 70     pushq %r9
 71     pushq %r12
 72     pushq %r13
 73     pushq %r14
 74     pushq %r15

这部分是切出过程:在coctx_swap(&(curr->ctx),&(pending_co->ctx) )执行时,参数curr->ctx地址入rdipending_co->ctx地址入rsi,在进入coctx_swap后,此时rsp指向返回地址(下一条指令地址);
语句leaq 8(%rsp),%rax,把rsp + 8处的地址入rax,想象ret指令时,把返回地址poprip中(rip = rsp, rsp = rsp - 8),此时rsp指向的是父函数的栈顶地址(下一条指令地址),和这里一样意思,不需要把返回地址也保存,因为有regs[kRETAddr]保存;
那么语句leaq 112(%rdi),%rspcurr->ctx加112个字节偏移量处的地址给rsp,112是sizeof(void*)(=8)*14,即regs[14]
后面的pushq就是把寄存器中的内容保存到regs数组中,rax 中的值是要切出的协程调用coctx_swap时,栈顶除返回地址外栈顶的地址,保存到resg[13](pushq等价于rsp = rsp - 8; mov rax, %rsp);
其他对应关系如下:

 62 // 64 bit
 63 low | regs[0]: r15 |
 64     | regs[1]: r14 |
 65     | regs[2]: r13 |
 66     | regs[3]: r12 |
 67     | regs[4]: r9  |
 68     | regs[5]: r8  | 
 69     | regs[6]: rbp |
 70     | regs[7]: rdi |
 71     | regs[8]: rsi |
 72     | regs[9]: ret |  //ret func addr
 73     | regs[10]: rdx |
 74     | regs[11]: rcx | 
 75     | regs[12]: rbx |
 76 hig | regs[13]: rsp |
 76     movq %rsi, %rsp
 77     popq %r15
 78     popq %r14
 79     popq %r13
 80     popq %r12
 81     popq %r9
 82     popq %r8
 83     popq %rbp
 84     popq %rdi
 85     popq %rsi
 86     popq %rax //ret func addr
 87     popq %rdx
 88     popq %rcx
 89     popq %rbx
 90     popq %rsp
 91     pushq %rax
 92 
 93     xorl %eax, %eax
 94     ret
 95 #endif

这部分是切入过程:rsi为第二个参数,这里rsp指向新的栈空间即regs[0],以上实现把regs中的原有内容popq到各寄存器中,其中popq %rsp,把要恢复的栈指针弹到rsp(即调用coctx_swap前的rsp)[这类比于正常的函数调用:压入返回地址;压入上一帧的rbp,然后使用rbp指向新的rsp];
pushq %rax把返回地址压栈,用xorl 把 %rax 低32位清0以实现地址对齐,然后ret指令把返回地址弹到rip中进入跳转;
以上切换部分比较难以理解,关键点在rsprip,然后借助笔和纸可以画一下栈的交互过程,有助于理解;

由于切入的协程执行readwrite_routine时,没有连接到达,故把自己pushvector中,并切出去:

 561 void co_yield_env( stCoRoutineEnv_t *env )
 562 {  
 564     stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
 565     stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
 566        
 567     env->iCallStackSize--;
 569     co_swap( curr, last);
 570 }

当执行完两个task协程时,都因为fd == -1判断而成立,把自己切出,然后在主协程中创建一个协程用于accept连接co_accept,这里略过这几个实现;
大概介绍下co_accept实现,死循环,如果没有可用的协程则通过epoll睡个timeout时间,超时或唤醒在下面介绍;
然后进行co_eventloop

为了简化分析,假设在理想情况下,如果有连接到来co_accept协程有事件发生并进行resume,然后pop一个task协程,设置co->fd = fd,并resumetask协程,如果中途遇到阻塞则yield等。

整体上libco差不多就这样了,上面也说到了,没有协程池的实现,用汇编实现切换有点难以理解,且在切换的时候会可能会很频繁的使用malloc/memcpy等保存栈内容等,虽然官方“为了减少这种内存拷贝次数,共享栈的内存拷贝只发生在不同协程间的切换。当共享栈的占用者一直没有改变的时候,则不需要拷贝运行栈。”。

在使用协程过程中注意的地方还蛮多的,比如不能使用sleep,全局变量,线程私有变量以及锁等,这些如何改造的可以参考libco的其他实现,这里不过多分析这些细节。

参考资料:
https://segmentfault.com/a/1190000012656741
https://segmentfault.com/p/1210000009166339/read
https://blog.csdn.net/lqt641/article/details/73002566
https://blog.csdn.net/lqt641/article/details/73287231
https://www.zhihu.com/question/52193579?sort=created
https://cloud.tencent.com/developer/article/1049637
https://cloud.tencent.com/developer/article/1005648
https://github.com/Tencent/libco/issues
https://blog.csdn.net/kobejayandy/article/details/46993021
https://segmentfault.com/a/1190000007407881

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容

  • 这里补充下libco后续对于协程间切换的汇编新实现,原来的实现方法之前分析过Libco协程库实现[https://...
    fooboo阅读 350评论 0 0
  • 对于协程做一个整体的描述,从概念、原理、实现三个方面叙述。侧重有栈协程。 1 概览 1.1 什么是协程 有很多与协...
    chnmagnus阅读 770评论 0 1
  • 这篇文章主要介绍些汇编和函数调用栈的变化过程以及x86-64体系结构下各寄存器的作用,为后面两篇博客分析协程库(L...
    fooboo阅读 1,081评论 0 0
  • 目录 Linux高性能网络:协程系列01-前言 Linux高性能网络:协程系列02-协程的起源 Linux高性能网...
    C_GO流媒体后台开发阅读 1,822评论 0 0
  • go语言热门起来之后,goroutine 和 协程的概念 也开始流行起来。云风很早的时候在自己的github上面开...
    DayDayUpppppp阅读 1,413评论 0 2