butex
butex是一种类似于futex的同步原语,由brpc实现 。但futex只能够用于同步pthread,而butex除了pthread,还可以同步bthread。
butex_wait
实现了等待语义的butex接口如下,
// Atomically wait on |butex| if *butex equals |expected_value|, until the
// butex is woken up by butex_wake*, or CLOCK_REALTIME reached |abstime| if
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
// Returns 0 on success, -1 otherwise and errno is set.
int butex_wait(void* butex, int expected_value, const timespec* abstime);
对于bthread来说,butex_wait工作流程为:
- 快速检查
butex
是否等于expected_value
,如果不相等,返回-1,错误码为EWOULDBLOCK
。有时候,我们需要在值不相等的情况下立刻执行某些动作。为了保证能够看到butex值改变前的某些动作,我们在返回前施加一个acquire屏障。 - 创建一个
ButexBthreadWaiter
结构,记录相关信息,它会被插入到 butex 的等待队列中。其成员如下所示:-
tid
:bthread的标识。 -
container
:记录了butex的值,用于判断该结构是否在butex的等待队列。 -
task_meta
:bthread的控制结构,包含了一个bthread的所有信息。 -
sleep_id
:定时器任务标识。如果abstime
不为NULL
,那么会添加一个定时器任务,在超时后调用相应的回调函数。 -
waiter_state
:等待者的状态,错误时会根据这个状态设置相应的错误码。 -
expected_value
:期望值,用于再次检查butex
是否等于expected_value
。 -
initial_butex
:用于记录butex的值。 -
control
:一个全局的TaskControl结构,用于在non-worker线程中调度bthread。
-
- 如果
abstime
不为NULL
,那就添加一个定时器任务,在超时后调用erase_from_butex_and_wakeup
函数,该函数会将ButexBthreadWaiter从butex的等待队列中删除。如果删除成功,将waiter_state
的值设置为WAITER_STATE_TIMEDOUT
并调度对应的bthread运行。注意,abstime是绝对时间,不是相对时间,而且距离当前时间要大于2微秒,否则接口返回-1,错误码设置为ETIMEDOUT
。这么做是因为少于2us的超时时间是低效且无用的。 - 在
task_meta.current_waiter
记录ButexBthreadWaiter
结构,用于bthread的中断。 - 设置剩余函数为
wait_for_butex
函数,它会在下一个task的栈中执行,主要用于将ButexBthreadWaiter
结构添加到butex的等待队列中,并设置container
的值来表示该结构已在butex的等待队列。 - 找到下一个运行的task并跳转。在该task的栈中执行
wait_for_butex
函数。
一段时间后,bthread可能因为超时或者被 butex_wake_*
系列函数唤醒,从切换点继续执行。
- 尝试删除定时任务。如果定时任务还在运行,等待。
- 如果
current_waiter
为空,说明TaskGroup::interrupt()
正在使用ButexBthreadWaiter
结构。自旋一段时间,等待中断函数执行完毕。否则,中断函数有可能访问到一个已析构的ButexBthreadWaiter
结构。 - 如果被中断、或超时、或值不匹配,返回-1并设置相应的errno,否则返回0。
butex_wake
实现了唤醒语义的butex接口如下,
// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void* butex);
对于bthread来说,butex_wake的工作流程如下:
- 获取butex等待队列的锁
- 如果队列为空,立即返回;否则从队列中取出一个正在等待的bthread
- 如果bthread还有定时任务,取消它
- 接下来,就是调度该bthread运行。
- 如果是在某个bthread中调用butex_wake函数,我们直接挂起当前bthread,调度等待的bthread运行
- 如果不是在worker pthread中,则随机取一个TaskGroup,将等待的bthread加入到它的远程队列。
butex_requeue
butex_requeue接口如下,
// Wake up at most 1 thread waiting on |butex1|, let all other threads wait
// on |butex2| instead.
// Returns # of threads woken up.
int butex_requeue(void* butex1, void* butex2);
唤醒butex1等待队列上的一个bthread,然后将butex1上剩余的bthread转而在butex2上等待。