bthread概述
[TOC]
TaskControl
TaskControl用于管理brpc创建的worker pthread。
初始化
创建一个 TaskControl
对象后,调用 init
函数进行初始化,主要做:
- 启动定时器线程
- 创建指定数量的worker pthreads,用于执行bthread
- 曝光一些bvar,用来统计状态
- 等待至少一个worker pthread创建完成
Worker Pthread
每个worker pthread运行 worker_thread
函数,这个函数主要做:
- 创建一个
TaskGroup
对象,调用init
函数初始化完成后加入到TaskControl
中。在brpc中,每个worker pthread有各自的TaskGroup。 - 然后调用
run_main_task
函数,开始调用bthread。
worker pthread在任意时刻只会运行一个bthread。它优先运行本地队列,远程队列的bthread,如果没有,就从其它TaskGroup的本地队列或远程队列中偷取。如果仍然没有找到,就会睡眠直到有新的bthread可以运行时被唤醒。
管理TaskGroup
添加TaskGroup
worker pthread会在一开始就创建自己的TaskGroup结构,然后调用 TaskControl::_add_group
将该TaskGroup注册到TaskControl中。
删除TaskGroup
worker pthread在退出前,调用 TaskContol::_destroy_group
将对应的TaskGroup从TaskControl中删除。
线程安全
不管是添加还是删除接口,都有可能被多个线程同时调用,所以brpc使用一个互斥量 _modify_group_mutex
来保护。
偷任务
前面说到,worker pthread 如果没有bthread可以执行,就会尝试从其他worker pthread 偷取,调用的是 TaskControl::steal_task
接口,
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
for (size_t i = 0; i < ngroup; ++i, s += offset) {
TaskGroup* g = _groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
stolen = true;
break;
}
if (g->_remote_rq.pop(tid)) {
stolen = true;
break;
}
}
}
*seed = s;
return stolen;
}
从上面可以看到,偷任务的一些设计要点:
- 尽量减少与其它也在偷任务的worker pthread的冲突,使得它们尽量分散开,采用参数
seed
和offset
决定TaskGroup的索引,而不是轮询。 - 避免没有bthread时的无限循环,限定了每次偷任务最多可查找
_ngroup
个TaskGroup。 - 这个接口会与增加、删除TaskGroup接口并发,理论上应该使用锁保护。但是出于性能考虑,这里没有用锁。所以,我们需要更小心地分析多线程并发的竞争情况。
我们先来分析下增加一个TaskGroup对偷任务的影响。从上面的代码可知,读取_ngroup
的值时采用了butil::memory_order_acquire
内存序,这保证了当我们看到_ngroup
的最新值时,_groups
数组中的指针所指向的TaskGroup对象都已经初始化完成了。不会出现这种情况:指针不为空,但是指针所指的对象还没初始化完成,引起程序crash。如果我们看到的是_ngroup
的旧值,我们会错过这个新加的TaskGroup。但是这个没有关系,这个TaskGroup多余的bthread也会很快得到调度。因为TaskControl::_add_group
函数在最后会调用 TaskControl::signal_task
来唤醒休眠的worker pthread来偷任务。
我们再来分析下删除一个TaskGroup对偷任务的影响。如果worker pthread找到自己的TaskGroup后,立刻将它从 _groups
中删除,就可能使得 TaskControl::steal_task
解引用一个已析构的TaskGroup。所以,在 TaskContol::_destroy_group
中找到对应的TaskGroup后没有立刻删除,而是注册一个定时器任务,在1秒后再删除。
综上所述,在 TaskControl::steal_task
中遍历 _groups
数组是可以不需要加锁的。
通知任务
当有新的bthread需要调度时,调用 TaskControl::signal_task
唤醒那些休眠的worker pthread。如果没有足够的休眠worker pthread并且当前worker pthread的数量小于设置的并发度,那么调用 TaskControl::add_workers
增加worker pthread。
TaskGroup
初始化
创建一个 TaskGroup
对象后,调用 init
函数进行初始化,主要做:
- 初始化
_rq
队列,用来存储worker pthread创建的bthread。队列中的bthread可能被其他TaskGroup偷取。 - 初始化
_remote_rq
队列,大小为_rq
队列的一半,用来存储non-worker pthread创建的bthread。队列中的bthread可能被其他TaskGroup偷取。 - 为worker pthread分配
TaskMeta
对象和栈。
偷任务
如果worker pthread执行完TaskGroup _rq
队列中的bthread后,会尝试:
- 运行
_remote_rq
队列中的bthread。 - 偷取其他 TaskGroup 对象
_rq
队列或_remote_rq
队列的bthread。
调度bthread
栈
bthread定义了几种栈类型,分别代表不同的栈大小:
栈类型 | 说明 | 大小 |
---|---|---|
STACK_TYPE_MAIN | worker pthread的栈 | 默认大小是8MB |
STACK_TYPE_PTHREAD | 使用worker pthread的栈,不需要额外分配 | 默认大小是8MB |
STACK_TYPE_SMALL | 小型栈 | 32KB |
STACK_TYPE_NORMAL | 默认栈 | 1MB |
STACK_TYPE_LARGE | 大型栈 | 8MB |
调用 bthread_start_*
创建的bthread默认情况下栈大小是1MB(STACK_TYPE_NORMAL),可通过 attr
参数来选择栈的大小。如果分配栈空间失败,将其栈类型改为STACK_TYPE_PTHREAD,即直接在worker pthread的栈上运行该bthread。另外,pthread task的栈类型也是STACK_TYPE_PTHREAD。注意,worker pthread的栈不是brpc分配的,不能释放它。至于STACK_TYPE_MAIN,是为了告诉 get_stack
函数只需要分配栈控制结构,不需要分配栈空间,因为worker pthread已经由系统线程库分配了栈空间。
这里要注意,创建bthread时没有立刻为其分配栈,直到第一次运行时才会分配。这个便于我们优化内存的使用,如果前一个bthread即将退出并且栈类型和下一个bhtread相同,我们可以直接转移栈而不需要重新分配。
任务运行函数
虽然调用 bthread_start_*
创建新的bthread传入了上下文函数及其参数,但是bthread上不是直接运行该上下文函数,实际上运行的是
TaskGroup::task_runner
函数,
- 执行上一个bthread设置的
_last_context_remained
函数,可能是用来释放栈空间之类的。如果是在worker pthread的栈上运行task,这个步骤会跳过。 - 运行上下文函数。执行过程中可能有多次跳出、跳回过程,所在的worker pthread可能也变了。
- 执行完之后,执行一些清理动作,例如释放tls等。
- 设置下一个bthread需要运行的
_last_context_remained
函数。 - 找到下一个可以运行的bthread,查找次序依次为本地队列,远程队列,其它TaskGroup的本地队列或远程队列。
- 如果没有找到,就返回worker pthread。
要注意,有两个地方会调用到TaskGroup::task_runner
函数:
- bthread执行时,实际执行的是
TaskGroup::task_runner
函数,而在该函数里面调用上下文函数。 - worker pthread 执行 pthread task时会调用
TaskGroup::task_runner
函数。
所以,两种情况下会回到worker pthread:
- 没有需要执行的bthread。返回后,可以调用
TaskGroup::wait_task
从其它TaskGroup偷取任务。如果仍然没有,就休眠等待唤醒。 - 下一个task使用的是worker-pthread的栈。
调度策略
常见的调度策略:
- 星切:主线程 --> 协程1 --> 主线程 --> 协程2 --> ... -->主线程
- 环切:主线程 --> 协程1 --> 协程2 --> 协程3 --> ... -->主线程
从上可以看出,环切比星切少了一半的切换次数,效率更高。bthread采用的是环切。
TaskMeta
TaskMeta
是bthread的控制结构,管理bthread的相关信息,包括
- 栈
- 运行函数及其参数
待续。
tid
每个bthread都有一个唯一ID,大小为64bit,
- 高32bit是版本号
- 低32biit是bthread控制结构的起始地址
在 task_group_inl.h
文件中定义了tid相关函数,
// Utilities to manipulate bthread_t
inline bthread_t make_tid(uint32_t version, butil::ResourceId<TaskMeta> slot) {
return (((bthread_t)version) << 32) | (bthread_t)slot.value;
}
inline butil::ResourceId<TaskMeta> get_slot(bthread_t tid) {
butil::ResourceId<TaskMeta> id = { (tid & 0xFFFFFFFFul) };
return id;
}
inline uint32_t get_version(bthread_t tid) {
return (uint32_t)((tid >> 32) & 0xFFFFFFFFul);
}