开源项目Workflow中有一个非常重要的基础模块:代码仅300行的C语言线程池。
逻辑完备的三个特点在第3部分开始讲解,欢迎跳阅,或直接到Github主页上围观代码。
https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c
0 - Workflow的thrdpool
Workflow的大招:计算通信融为一体的异步调度模式,而计算的核心:Executor调度器,就是基于这个线程池实现的。可以说,一个通用而高效的线程池,是我们写C/C++代码时离不开的基础模块。
thrdpool代码位置在src/kernel/,不仅可以直接拿来使用,同时也适合阅读学习。
而更重要的,秉承Workflow项目本身一贯的严谨极简的作风,这个thrdpool代码极致简洁,实现逻辑上亦非常完备,结构精巧,处处严谨,不得不让我惊叹:
妙啊!!!🤩
你可能会很好奇,线程池还能写出什么别致的新思路吗?先列出一些,你们细品:
- 特点1:创建完线程池后,无需记录任何线程id或对象,线程池可以通过一个等一个的方式优雅地去结束所有线程;
- 特点2:线程任务可以由另一个线程任务调起;甚至线程池正在被销毁时也可以提交下一个任务;(这很重要,因为线程本身很可能是不知道线程池的状态的;
- 特点3:同理,线程任务也可以销毁这个线程池;(非常完整~
我真的迫不及待为大家深层解读一下,这个我愿称之为“逻辑完备”的线程池。
1 - 前置知识
第一部分我先从最基本的内容梳理一些个人理解,有基础的小伙伴可以直接跳过。如果有不准确的地方,欢迎大家指正交流~
为什么需要线程池?(其实思路不仅对线程池,对任何有限资源的调度管理都是类似的)
我们知道,通过系统提供的pthread或者std::thread创建线程,就可以实现多线程并发执行我们的代码。
但是CPU的核数是固定的,所以真正并发执行的最大值也是固定的,过多的线程创建除了频繁产生创建的overhead以外,还会导致对系统资源进行争抢,这些都是不必要的浪费。
因此我们可以管理有限个线程,循环且合理地利用它们。♻️
那么线程池一般包含哪些内容呢?
- 首先是管理若干个~~工具人~~线程;
- 其次是管理交给线程去执行的任务,这个一般会有一个队列;
- 再然后线程之间需要一些同步机制,比如mutex、condition等;
- 最后就是各线程池实现上自身需要的其他内容了;
好了,接下来我们看看Workflow的thrdpool是怎么做的。
2 - 代码概览
以下共7步常用思路,足以让我们把代码飞快过一遍。
第1步:先看头文件,模块提供什么接口。
我们打开thrdpool.h
,可以只关注三个接口:
// 创建线程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把任务交给线程池的入口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool);
第2步:接口上有什么数据结构。
也就是,我们如何描述一个交给线程池的任务。
struct thrdpool_task
{
void (*routine)(void *); // 一个函数指针
void *context; // 一个上下文
};
第3步:再看实现.c,有什么内部数据结构。
struct __thrdpool
{
struct list_head task_queue; // 任务队列
size_t nthreads; // 线程个数
size_t stacksize; // 构造线程时的参数
pthread_t tid; // 运行起来之后,pool上记录的这个是zero值
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_key_t key;
pthread_cond_t *terminate;
};
没有一个多余,每一个成员都很到位:
- tid:线程id,整个线程池只有一个,它不会奇怪地去记录任何一个线程的id,这样就不完美了,它平时运行的时候是空值,退出的时候,它是用来实现链式等待的关键。
- mutex 和 cond是常见的线程间同步的工具,其中这个cond是用来给生产者和消费者去操作任务队列用的。
- key:是线程池的key,然后会赋予给每个由线程池创建的线程作为他们的thread local,用于区分这个线程是否是线程池创建的。
- 我们还看到一个*pthread_cond_t terminate,这有两个用途:不仅是退出时的标记位 ,而且还是调用退出的那个人要等待的condition。
以上各个成员的用途,好像说了,又好像没说,🤔是因为几乎每一个成员都值得深挖一下,所以我们记住它们,后面看代码的时候就会豁然开朗!😃
第4步:接口都调用了什么核心函数。
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t *pool;
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
... // 去掉了其他代码,但是注意到刚才的tid和terminate的赋值
memset(&pool->tid, 0, sizeof (pthread_t));
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
...
这里可以看到__thrdpool_create_threads()
里边最关键的就是循环创建nthreads个线程。
while (pool->nthreads < nthreads)
{
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
...
第5步:略读核心函数的功能。
所以我们在上一步知道了,每个线程执行的是__thrdpool_routine()
。不难想象,它会不停从队列拿任务出来执行:
static void *__thrdpool_routine(void *arg)
{
...
while (1)
{
// 1. 从队列里拿一个任务出来,没有就等待
pthread_mutex_lock(&pool->mutex);
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex);
if (pool->terminate) // 2. 线程池结束的标志位,记住它,先跳过
break;
// 3. 如果能走到这里,恭喜你,拿到了任务~
entry = list_entry(*pos, struct __thrdpool_task_entry, list);
list_del(*pos);
pthread_mutex_unlock(&pool->mutex); // 4. 先解锁
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context); // 5. 再执行
// 6. 这里也先记住它,意思是线程池里的线程可以销毁线程池
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
}
... // 后面还有魔法,留下一章解读~~~
第6步:把函数之间的关系联系起来。
刚才看到的__thrdpool_routine()
就是线程的核心函数了,它可以和谁关联起来呢?
可以和接口thrdpool_schedule()
关联上。
我们说过,线程池上有个队列管理任务,
- 所以,每个执行routine的线程,都是消费者;
- 而每个发起schedule的线程,都是生产者;
我们已经看过消费者了,来看看生产者的代码:
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
entry->task = *task;
pthread_mutex_lock(&pool->mutex);
list_add_tail(&entry->list, &pool->task_queue); // 添加到队列里
pthread_cond_signal(&pool->cond); // 叫醒在等待的线程
pthread_mutex_unlock(&pool->mutex);
}
说到这里,特点2
就非常清晰了:
开篇说的特点2
是说,”线程任务可以由另一个线程任务调起”。
只要对队列的管理做得好,显然我们在消费者所执行的函数也可以做生产者。
第7步:看其他情况的处理,对于线程池来说就是比如销毁的情况。
只看我们接口thrdpool_destroy()的实现是非常简单的:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
...
// 1. 内部会设置pool->terminate,并叫醒所有等在队列拿任务的线程
__thrdpool_terminate(in_pool, pool);
// 2. 把队列里还没有执行的任务都拿出来,通过pending返回给用户
list_for_each_safe(pos, tmp, &pool->task_queue)
{
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
if (pending)
pending(&entry->task);
... // 后面就是销毁各种内存,同样有魔法~
在退出的时候,我们那些已经提交但是还没有被执行的任务是绝对不能就这么扔掉了的,于是我们可以传入一个pending()
函数,上层可以做自己的回收、回调、任何保证上层逻辑完备的事情。
设计的完整性,无处不在。
接下来我们就可以跟着我们的核心问题,针对性地看看每个特点都是怎么实现的。
3 - 特点1: 一个等待一个的优雅退出
这里提出一个问题:线程池要退出,如何结束所有线程?
一般线程池的实现都是需要记录下所有的线程id,或者thread对象,以便于我们去jion等待它们结束。
但是我们刚才看,pool里并没有记录所有的tid呀?正如开篇说的,pool上只有一个tid,而且还是个空的值。
所以特点1
给出了Workflow的thrdpool的答案:
无需记录所有线程,我可以让线程挨个自动退出、且一个等待一个,最终达到我调用完thrdpool_destroy()后内存可以回收干净的目的。
这里先给一个简单的图,假设发起destroy的人是main线程,我们如何做到一个等一个退出:
步骤如下:
- 线程的退出,由thrdpool_destroy()设置pool->terminate开始。
- 我们每个线程,在while(1)里会第一时间发现terminate,线程池要退出了,然后会break出这个while循环。
- 注意这个时候,还持有着mutex锁,我们拿出pool上唯一的那个tid,放到我的临时变量,我会根据拿出来的值做不同的处理。且我会把我自己的tid放上去,然后再解mutex锁。
- 那么很显然,第一个从pool上拿tid的人,会发现这是个0值,就可以直接结束了,不用负责等待任何其他人,但我在完全结束之前需要有人负责等待我的结束,所以我会把我的id放上去。
- 而如果发现自己从pool里拿到的tid不是0值,说明我要负责jion上一个人,并且把我的tid放上去,让下一个人负责我。
- 最后的那个人,是那个发现pool->nthreads为0的人,那么我就可以通过这个terminate(它本身是个condition)去通知发起destroy的人。
- 最后发起者就可以退了。🔚
是不是非常有意思!!!非常优雅的做法!!!
所以我们会发现,其实大家不太需要知道太多信息,只需要知道我要负责的上一个人。
当然每一步都是非常严谨的,我们结合刚才跳过的第一段魔法🔮感受一下:
static void *__thrdpool_routine(void *arg)
{
while (1)
{
pthread_mutex_lock(&pool->mutex); // 1.注意这里还持有锁
... // 等着队列拿任务出来
if (pool->terminate) // 2. 这既是标识位,也是发起销毁的那个人所等待的condition
break;
... // 执行拿到的任务
}
/* One thread joins another. Don't need to keep all thread IDs. */
tid = pool->tid; // 3. 把线程池上记录的那个tid拿下来,我来负责上一人
pool->tid = pthread_self(); // 4. 把我自己记录到线程池上,下一个人来负责我
if (--pool->nthreads == 0) // 5. 每个人都减1,最后一个人负责叫醒发起detroy的人
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex); // 6. 这里可以解锁进行等待了
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 7. 只有第一个人拿到0值
pthread_join(tid, NULL); // 8. 只要不0值,我就要负责等上一个结束才能退
return NULL; // 9. 退出,干干净净~
}
4 - 特点2:线程任务可以由另一个线程任务调起
在第二部分我们看过源码,只要队列管理得好,线程任务里提交下一个任务是完全OK的。
这很合理。👌
那么问题来了,特点1
又说,我们每个线程,是不太需要知道太多线程池的状态和信息的。而线程池的销毁是个过程,如果在这个过程间提交任务会怎么样呢?
因此特点2
的一个重要解读是:线程池被销毁时也可以提交下一个任务。而且刚才提过,还没有被执行的任务,可以通过我们传入的pending()函数拿回来。
简单看看销毁时的严谨做法:
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex); // 1. 加锁设置标识位
pool->terminate = &term; // 2. 之后的添加任务不会被执行,但可以pending拿到
pthread_cond_broadcast(&pool->cond); // 3. 广播所有等待的消费者
if (in_pool) // 4. 这里的魔法等下讲>_<~
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
while (pool->nthreads > 0) // 5. 如果还有线程没有退完,我会等,注意这里是while
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL); // 6.同样地等待打算退出的上一个人
}
5 - 特点3:同样可以在线程任务里销毁这个线程池
既然线程任务可以做任何事情,理论上,线程任务也可以销毁线程池❓
作为一个逻辑完备的线程池,大胆一点,我们把问号去掉。
而且,销毁并不会结束当前任务,它会等这个任务执行完。
想象一下,刚才的__thrdpool_routine()
,while里拿出来的那个任务,做的事情竟然是发起thrdpool_destroy()
...
我们来把上面的图改一下:
如果发起销毁的人,是我们自己内部的线程,那么我们就不是等n个,而是等n-1,少了一个外部线程等待我们。如何实现才能让这些逻辑都完美融合呢?我们把刚才跳过的三段魔法串起来看看。
第一段魔法,销毁的发起者。
如果发现发起销毁的人是线程池内部的线程,那么它具有较强的自我管理意识(因为前面说了,会等它这个任务执行完),而我们可以放心大胆地pthread_detach,无需任何人jion它等待它结束。
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
...
if (in_pool) // 每个由线程池创建的线程都设置了一个key,由此判断是否是in_pool
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
第二段魔法:线程池谁来free?
一定是发起销毁的那个人。所以这里用in_pool来控制main线程的回收:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
// 已经调用完第一段,且挨个pending(未执行的task)了
... // 销毁其他内部分配的内存
if (!in_pool) // 如果不是内部线程发起的销毁,要负责回收线程池内存
free(pool);
}
那现在不是main线程发起的销毁呢?发起的销毁的那个内部线程,怎么能保证我可以在最后关头把所有资源回收干净、调free(pool)、功成身退呢?
在前面阅读源码第5步,其实我们看过,__thrdpool_routine()里有free的地方。
于是现在三段魔法终于串起来了。
第三段魔法:严谨的并发。
static void *__thrdpool_routine(void *arg)
{
while (1)
{
... // 前面执行完一个任务,如果任务里做的事情,是销毁线程池...
// 注意这个时候,其他内存都已经被destroy的那个清掉了,万万不可以再用什么mutex、cond
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
...
非常重要的一点,由于并发,我们是不知道谁先操作的。假设我们稍微改一改这个顺序,就又是另一番逻辑。
比如我作为一个内部线程,在routine里调用destroy期间,发现还有线程没有执行完,我就要等在我的terminate上,待最后看到nthreads==0的那个人叫醒我。然后我的代码继续执行,函数栈就会从destroy回到routine,也就是上面那几行,然后,free(pool);,这时候我已经放飞自我detach了,可以顺利结束。
你看,无论如何,都可以完美地销毁线程池:
是不是太妙了!我写到这里已经要感动哭了!😭
6 - 简单的用法
这个线程池只有两个文件: thrdpool.h
和 thrdpool.c
,而且只依赖内核的数据结构list.h
。我们把它拿出来玩,自己写一段代码:
void my_routine(void *context) // 我们要执行的函数
{
printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
}
void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的任务会到这里
{
printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););
}
int main()
{
thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创建
struct thrdpool_task task;
unsigned long long I;
for (i = 0; i < 5; I++)
{
task.routine = &my_routine;
task.context = reinterpret_cast<void *>(i);
thrdpool_schedule(&task, thrd_pool); // 调用
}
getchar(); // 卡住主线程,按回车继续
thrdpool_destroy(&my_pending, thrd_pool); // 结束
return 0;
}
我们再打印几行log,直接编译就可以跑起来:
7 - 并发与结构之美
最后谈谈感受。
看完之后我有种很后悔为什么没有早点看的感觉,并且有一种,我肯定还没有完全理解到里边的精髓,毕竟我不能深刻地理解到设计者当时对并发的构思和模型上的选择。
我只能说,没有十多年顶级的系统调用和并发编程的功底写不出这样的代码,没有极致的审美与对品控的偏执也写不出这样的代码。
并发编程有很多说道,就正如退出这个这么简单的事情,想要做到退出时回收干净却很难。如果说你写业务逻辑自己管线程,退出什么的sleep(1)都无所谓,但做框架的人如果不能把自己的框架做得完美无暇逻辑自洽,就难免让人感觉差点意思。
而这个thrdpool,它作为一个线程池,是如此地逻辑完备。
再次让我深深地感到震撼:我们身边那些原始的、底层的、基础的代码,还有很多新思路,还可以写得如此美。
Workflow项目源码地址:GitHub - sogou/workflow: C++ Parallel Computing and Asynchronous Networking Engine