work pool
内部维护一个队列,生产者调用work_pool_submit
将entry插入队列,有多个线程作为消费者,去处理队列中的entry。线程的数量会根据entry的个数自动调节。
pqh.qcount
pool->pqh.qcount
小于0时候,代表有几个entry在队列中等待处理。大于0时候,代表有几个没事干的线程在睡觉。
弹性的调节线程数量
当有所有线程都在干活且队列里还有entry没有处理,或者当有很少的线程睡觉时(流出一些裕量),创建更多的处理线程。当没事做睡觉的线程太多时,退出一些线程。
work_pool_submit的处理方法
当所有线程都干活的时候, work_pool_submit将entry插入队列。当有一些没事干而睡觉的线程时,唤醒其中一个,让它去处理这个entry。
svc_work_pool全局变量
struct work_pool svc_work_pool
(gdb) p svc_work_pool
$23 = {
pqh = {
qh = { //work pool entry处理队列
tqh_first = 0x7fffd80008c0, //队列的第一个元素(每个元素都有prev和next指针)
tqh_last = 0x7fffe00008c0 //队列的最后一个元素
},
qmutex = {... }, //mutex
qsize = 0,
qcount = 10 //此值大于0,说明有10个线程无事可做在睡觉。如果小于零,说明所有线程都在干活,队列中有多少entry等待处理
},
wptqh = { //working threads队列
tqh_first = 0x7e00d0,
tqh_last = 0x7fffe8000cb8
},
name = 0x7e00b0 "svc_",
attr = { ... },
params = {
thrd_max = 200,
thrd_min = 7
},
timeout_ms = 31000,
n_threads = 15,
worker_index = 15
}
函数
- work_pool_init: 初始化work pool
- work_pool_thread: 线程处理函数,个数等于svc_work_pool.n_threads
- work_pool_spawn: 为work pool创建新线程
- work_pool_submit: 将
work_pool_entry
插入到pool中处理 - work_pool_shutdown
work pool的应用场景
- 每种连接(TCP,UDP,RDMA)都对应一个channel,每个channel在创建时候,都会构造work pool entry,其处理函数是
svc_rqst_run_task
,将此entry插入到work pool。 - channel内部对epoll的处理函数
svc_rqst_epoll_events
中,如只有一个event,直接调用svc_rqst_xprt_task。如果有大于1的event,将多余的event构造相应的entry,并扔到work pool里处理。 -
svc_rqst_epoll_events
的退出,也将导致svc_rqst_run_task
的退出。所以在svc_rqst_epoll_events
退出前,重新将svc_rqst_run_task
对应的entry插入work pool中。 - 在一定时间内,epoll没有接到数据,将
svc_rqst_expire_task
对应的entry插入work pool中。
数据结构
struct work_pool {
struct poolq_head pqh; //work pool entry list
TAILQ_HEAD(work_pool_s, work_pool_thread) wptqh;//thread list
char *name;
pthread_attr_t attr;
struct work_pool_params params;
long timeout_ms;
uint32_t n_threads;
uint32_t worker_index;
};
//对worker thread的封装
struct work_pool_thread {
struct poolq_entry pqe; /*** 1st ***/
TAILQ_ENTRY(work_pool_thread) wptq;
pthread_cond_t pqcond;
struct work_pool *pool;
struct work_pool_entry *work;
char worker_name[16];
pthread_t pt;
uint32_t worker_index;
};
struct work_pool_entry {
struct poolq_entry pqe; /*** 1st ***/
struct work_pool_thread *wpt;
work_pool_fun_t fun;
void *arg;
};
struct poolq_entry {
TAILQ_ENTRY(poolq_entry) q; /*** 1st ***/
u_int qsize; /* allocated size of q entry,
* 0: default size */
uint16_t qflags;
};
struct poolq_head {
TAILQ_HEAD(poolq_head_s, poolq_entry) qh;
pthread_mutex_t qmutex;
u_int qsize; /* default size of q entries,
* 0: static size */
int qcount; /* number of entries,
* < 0: has waiting workers. */
};
代码注释
static void * work_pool_thread(void *arg)
{
struct work_pool_thread *wpt = arg;
struct work_pool *pool = wpt->pool;
struct poolq_entry *have;
struct timespec ts;
int rc;
bool spawn;
pthread_cond_init(&wpt->pqcond, NULL);
pthread_mutex_lock(&pool->pqh.qmutex);
TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq); //将当前线程插入pool->wptqh
wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);
do {
//如果当前线程有事做
if (wpt->work) {
wpt->work->wpt = wpt;
spawn = pool->pqh.qcount < pool->params.thrd_min
&& pool->n_threads < pool->params.thrd_max;
if (spawn)
pool->n_threads++;
pthread_mutex_unlock(&pool->pqh.qmutex);
if (spawn) {
//线程不够,需要创建新线程
(void)work_pool_spawn(pool);
}
wpt->work->fun(wpt->work);
wpt->work = NULL;
pthread_mutex_lock(&pool->pqh.qmutex);
}
//pool->pqh.qcount小于0说明所有线程都在干活,队列积攒了很多entry需要处理
if (0 > pool->pqh.qcount++) {
//从队列中取出entry
have = TAILQ_FIRST(&pool->pqh.qh);
TAILQ_REMOVE(&pool->pqh.qh, have, q);
//告诉当前线程去处理这个entry
wpt->work = (struct work_pool_entry *)have;
continue;
}
//小技巧,将wpt->pqe插入队列,等同于将当前线程插入队列尾部
TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);
clock_gettime(CLOCK_REALTIME_FAST, &ts);
timespec_addms(&ts, pool->timeout_ms);
//等待CLOCK_REALTIME_FAST时间,看是否被work_pool_submit唤醒
rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
&ts);
if (!wpt->work) {
//如果这期间没有发生work_pool_submit,wpt->work就还为NULL
//将刚才插入队列假的entry,从队列中删除
pool->pqh.qcount--;
TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
}
} while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
//如果有太多无所事事的线程在睡觉,则退出当前线程
pool->n_threads--;
TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
pthread_mutex_unlock(&pool->pqh.qmutex);
cond_destroy(&wpt->pqcond);
mem_free(wpt, sizeof(*wpt));
return (NULL);
}
int work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
{
int rc = 0;
pthread_mutex_lock(&pool->pqh.qmutex);
//如果有没事做的线程在睡觉,此时队列里的元素都是睡觉的线程,而非需要处理的entry
if (0 < pool->pqh.qcount--) {
struct work_pool_thread *wpt = (struct work_pool_thread *)
TAILQ_FIRST(&pool->pqh.qh);
TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
//告诉这个线程去做这件事情
wpt->work = work;
//唤醒这个线程
pthread_cond_signal(&wpt->pqcond);
} else {
//如果所有线程都在忙,就把entry插入队列尾部
TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
}
pthread_mutex_unlock(&pool->pqh.qmutex);
return rc;
}
- 最多同时几个线程可以同时处理epoll产生的数据,
RPC_Ioq_ThrdMax
Log分析
//svc_51 working thread 在等待事件
TRACE 0213 11:23:58.916742 10680 : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_51 waiting
//svc_85 正在处理0x387a9990指向的work_pool_entry
TRACE 0213 11:23:58.916872 3267 : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_85 task 0x387a9990
//接收了5440字节,但还有84192个字节没有读出来
TRACE 0213 11:23:58.921465 8398 : xxxxxxx : <no-file>:0 :rpc :svc_vc_recv: 0x3d60cc00 fd 274 recv 5440, need 84192, flags 2