brpc源码解析之bthread概述

bthread概述

[TOC]

TaskControl

TaskControl用于管理brpc创建的worker pthread。

初始化

创建一个 TaskControl 对象后,调用 init 函数进行初始化,主要做:

  • 启动定时器线程
  • 创建指定数量的worker pthreads,用于执行bthread
  • 曝光一些bvar,用来统计状态
  • 等待至少一个worker pthread创建完成

Worker Pthread

每个worker pthread运行 worker_thread 函数,这个函数主要做:

  • 创建一个 TaskGroup 对象,调用 init 函数初始化完成后加入到 TaskControl 中。在brpc中,每个worker pthread有各自的TaskGroup。
  • 然后调用 run_main_task 函数,开始调用bthread。

worker pthread在任意时刻只会运行一个bthread。它优先运行本地队列,远程队列的bthread,如果没有,就从其它TaskGroup的本地队列或远程队列中偷取。如果仍然没有找到,就会睡眠直到有新的bthread可以运行时被唤醒。

管理TaskGroup

添加TaskGroup
worker pthread会在一开始就创建自己的TaskGroup结构,然后调用 TaskControl::_add_group 将该TaskGroup注册到TaskControl中。
删除TaskGroup
worker pthread在退出前,调用 TaskContol::_destroy_group 将对应的TaskGroup从TaskControl中删除。
线程安全
不管是添加还是删除接口,都有可能被多个线程同时调用,所以brpc使用一个互斥量 _modify_group_mutex 来保护。

偷任务

前面说到,worker pthread 如果没有bthread可以执行,就会尝试从其他worker pthread 偷取,调用的是 TaskControl::steal_task 接口,

bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
    // 1: Acquiring fence is paired with releasing fence in _add_group to
    // avoid accessing uninitialized slot of _groups.
    const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
    if (0 == ngroup) {
        return false;
    }
    // NOTE: Don't return inside `for' iteration since we need to update |seed|
    bool stolen = false;
    size_t s = *seed;
    for (size_t i = 0; i < ngroup; ++i, s += offset) {
        TaskGroup* g = _groups[s % ngroup];
        // g is possibly NULL because of concurrent _destroy_group
        if (g) {
            if (g->_rq.steal(tid)) {
                stolen = true;
                break;
            }
            if (g->_remote_rq.pop(tid)) {
                stolen = true;
                break;
            }
        }
    }
    *seed = s;
    return stolen;
}

从上面可以看到,偷任务的一些设计要点:

  • 尽量减少与其它也在偷任务的worker pthread的冲突,使得它们尽量分散开,采用参数 seedoffset 决定TaskGroup的索引,而不是轮询。
  • 避免没有bthread时的无限循环,限定了每次偷任务最多可查找 _ngroup 个TaskGroup。
  • 这个接口会与增加、删除TaskGroup接口并发,理论上应该使用锁保护。但是出于性能考虑,这里没有用锁。所以,我们需要更小心地分析多线程并发的竞争情况。

我们先来分析下增加一个TaskGroup对偷任务的影响。从上面的代码可知,读取_ngroup 的值时采用了butil::memory_order_acquire内存序,这保证了当我们看到_ngroup的最新值时,_groups 数组中的指针所指向的TaskGroup对象都已经初始化完成了。不会出现这种情况:指针不为空,但是指针所指的对象还没初始化完成,引起程序crash。如果我们看到的是_ngroup的旧值,我们会错过这个新加的TaskGroup。但是这个没有关系,这个TaskGroup多余的bthread也会很快得到调度。因为TaskControl::_add_group 函数在最后会调用 TaskControl::signal_task 来唤醒休眠的worker pthread来偷任务。

我们再来分析下删除一个TaskGroup对偷任务的影响。如果worker pthread找到自己的TaskGroup后,立刻将它从 _groups 中删除,就可能使得 TaskControl::steal_task 解引用一个已析构的TaskGroup。所以,在 TaskContol::_destroy_group 中找到对应的TaskGroup后没有立刻删除,而是注册一个定时器任务,在1秒后再删除。

综上所述,在 TaskControl::steal_task 中遍历 _groups 数组是可以不需要加锁的。

通知任务

当有新的bthread需要调度时,调用 TaskControl::signal_task 唤醒那些休眠的worker pthread。如果没有足够的休眠worker pthread并且当前worker pthread的数量小于设置的并发度,那么调用 TaskControl::add_workers 增加worker pthread。

TaskGroup

初始化

创建一个 TaskGroup 对象后,调用 init 函数进行初始化,主要做:

  • 初始化 _rq 队列,用来存储worker pthread创建的bthread。队列中的bthread可能被其他TaskGroup偷取。
  • 初始化 _remote_rq 队列,大小为 _rq 队列的一半,用来存储non-worker pthread创建的bthread。队列中的bthread可能被其他TaskGroup偷取。
  • 为worker pthread分配 TaskMeta 对象和栈。

偷任务

如果worker pthread执行完TaskGroup _rq 队列中的bthread后,会尝试:

  • 运行 _remote_rq 队列中的bthread。
  • 偷取其他 TaskGroup 对象 _rq 队列或_remote_rq 队列的bthread。

调度bthread

bthread定义了几种栈类型,分别代表不同的栈大小:

栈类型 说明 大小
STACK_TYPE_MAIN worker pthread的栈 默认大小是8MB
STACK_TYPE_PTHREAD 使用worker pthread的栈,不需要额外分配 默认大小是8MB
STACK_TYPE_SMALL 小型栈 32KB
STACK_TYPE_NORMAL 默认栈 1MB
STACK_TYPE_LARGE 大型栈 8MB

调用 bthread_start_* 创建的bthread默认情况下栈大小是1MB(STACK_TYPE_NORMAL),可通过 attr 参数来选择栈的大小。如果分配栈空间失败,将其栈类型改为STACK_TYPE_PTHREAD,即直接在worker pthread的栈上运行该bthread。另外,pthread task的栈类型也是STACK_TYPE_PTHREAD。注意,worker pthread的栈不是brpc分配的,不能释放它。至于STACK_TYPE_MAIN,是为了告诉 get_stack 函数只需要分配栈控制结构,不需要分配栈空间,因为worker pthread已经由系统线程库分配了栈空间。

这里要注意,创建bthread时没有立刻为其分配栈,直到第一次运行时才会分配。这个便于我们优化内存的使用,如果前一个bthread即将退出并且栈类型和下一个bhtread相同,我们可以直接转移栈而不需要重新分配。

任务运行函数

虽然调用 bthread_start_* 创建新的bthread传入了上下文函数及其参数,但是bthread上不是直接运行该上下文函数,实际上运行的是
TaskGroup::task_runner 函数,

  • 执行上一个bthread设置的 _last_context_remained 函数,可能是用来释放栈空间之类的。如果是在worker pthread的栈上运行task,这个步骤会跳过。
  • 运行上下文函数。执行过程中可能有多次跳出、跳回过程,所在的worker pthread可能也变了。
  • 执行完之后,执行一些清理动作,例如释放tls等。
  • 设置下一个bthread需要运行的_last_context_remained 函数。
  • 找到下一个可以运行的bthread,查找次序依次为本地队列,远程队列,其它TaskGroup的本地队列或远程队列。
  • 如果没有找到,就返回worker pthread。

要注意,有两个地方会调用到TaskGroup::task_runner 函数:

  • bthread执行时,实际执行的是TaskGroup::task_runner 函数,而在该函数里面调用上下文函数。
  • worker pthread 执行 pthread task时会调用 TaskGroup::task_runner 函数。

所以,两种情况下会回到worker pthread:

  • 没有需要执行的bthread。返回后,可以调用 TaskGroup::wait_task 从其它TaskGroup偷取任务。如果仍然没有,就休眠等待唤醒。
  • 下一个task使用的是worker-pthread的栈。

调度策略

常见的调度策略:

  • 星切:主线程 --> 协程1 --> 主线程 --> 协程2 --> ... -->主线程
  • 环切:主线程 --> 协程1 --> 协程2 --> 协程3 --> ... -->主线程

从上可以看出,环切比星切少了一半的切换次数,效率更高。bthread采用的是环切

TaskMeta

TaskMeta 是bthread的控制结构,管理bthread的相关信息,包括

  • 运行函数及其参数

待续。

tid

每个bthread都有一个唯一ID,大小为64bit,

  • 高32bit是版本号
  • 低32biit是bthread控制结构的起始地址

task_group_inl.h 文件中定义了tid相关函数,

// Utilities to manipulate bthread_t
inline bthread_t make_tid(uint32_t version, butil::ResourceId<TaskMeta> slot) {
    return (((bthread_t)version) << 32) | (bthread_t)slot.value;
}

inline butil::ResourceId<TaskMeta> get_slot(bthread_t tid) {
    butil::ResourceId<TaskMeta> id = { (tid & 0xFFFFFFFFul) };
    return id;
}
inline uint32_t get_version(bthread_t tid) {
    return (uint32_t)((tid >> 32) & 0xFFFFFFFFul);
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。