nfs-ganesha - thread model - work pool

work pool

内部维护一个队列,生产者调用work_pool_submit将entry插入队列,有多个线程作为消费者,去处理队列中的entry。线程的数量会根据entry的个数自动调节。

pqh.qcount

pool->pqh.qcount小于0时候,代表有几个entry在队列中等待处理。大于0时候,代表有几个没事干的线程在睡觉。

弹性的调节线程数量

当有所有线程都在干活且队列里还有entry没有处理,或者当有很少的线程睡觉时(流出一些裕量),创建更多的处理线程。当没事做睡觉的线程太多时,退出一些线程。

work_pool_submit的处理方法

当所有线程都干活的时候, work_pool_submit将entry插入队列。当有一些没事干而睡觉的线程时,唤醒其中一个,让它去处理这个entry。

svc_work_pool全局变量

struct work_pool svc_work_pool

(gdb) p svc_work_pool
$23 = {
  pqh = {
    qh = { //work pool entry处理队列
      tqh_first = 0x7fffd80008c0, //队列的第一个元素(每个元素都有prev和next指针)
      tqh_last = 0x7fffe00008c0 //队列的最后一个元素
    },
    qmutex = {... }, //mutex
    qsize = 0,
    qcount = 10  //此值大于0,说明有10个线程无事可做在睡觉。如果小于零,说明所有线程都在干活,队列中有多少entry等待处理
  },
  wptqh = { //working threads队列
    tqh_first = 0x7e00d0,
    tqh_last = 0x7fffe8000cb8
  },
  name = 0x7e00b0 "svc_",
  attr = { ...  },
  params = {
    thrd_max = 200,
    thrd_min = 7
  },
  timeout_ms = 31000,
  n_threads = 15,
  worker_index = 15
}

函数

  • work_pool_init: 初始化work pool
  • work_pool_thread: 线程处理函数,个数等于svc_work_pool.n_threads
  • work_pool_spawn: 为work pool创建新线程
  • work_pool_submit: 将work_pool_entry插入到pool中处理
  • work_pool_shutdown

work pool的应用场景

  1. 每种连接(TCP,UDP,RDMA)都对应一个channel,每个channel在创建时候,都会构造work pool entry,其处理函数是svc_rqst_run_task,将此entry插入到work pool。
  2. channel内部对epoll的处理函数svc_rqst_epoll_events中,如只有一个event,直接调用svc_rqst_xprt_task。如果有大于1的event,将多余的event构造相应的entry,并扔到work pool里处理。
  3. svc_rqst_epoll_events的退出,也将导致svc_rqst_run_task的退出。所以在svc_rqst_epoll_events退出前,重新将svc_rqst_run_task对应的entry插入work pool中。
  4. 在一定时间内,epoll没有接到数据,将svc_rqst_expire_task对应的entry插入work pool中。

数据结构

struct work_pool {
    struct poolq_head pqh; //work pool entry list
    TAILQ_HEAD(work_pool_s, work_pool_thread) wptqh;//thread list
    char *name;
    pthread_attr_t attr;
    struct work_pool_params params;
    long timeout_ms;
    uint32_t n_threads;
    uint32_t worker_index;
};

//对worker thread的封装
struct work_pool_thread {
    struct poolq_entry pqe;     /*** 1st ***/
    TAILQ_ENTRY(work_pool_thread) wptq;
    pthread_cond_t pqcond;

    struct work_pool *pool;
    struct work_pool_entry *work;
    char worker_name[16];
    pthread_t pt;
    uint32_t worker_index;
};

struct work_pool_entry {
    struct poolq_entry pqe;     /*** 1st ***/
    struct work_pool_thread *wpt;
    work_pool_fun_t fun;
    void *arg;
};

struct poolq_entry {
    TAILQ_ENTRY(poolq_entry) q; /*** 1st ***/
    u_int qsize;            /* allocated size of q entry,
                     * 0: default size */
    uint16_t qflags;
};

struct poolq_head {
    TAILQ_HEAD(poolq_head_s, poolq_entry) qh;
    pthread_mutex_t qmutex;

    u_int qsize;            /* default size of q entries,
                     * 0: static size */
    int qcount;         /* number of entries,
                     * < 0: has waiting workers. */
};

代码注释

static void * work_pool_thread(void *arg)
{
    struct work_pool_thread *wpt = arg;
    struct work_pool *pool = wpt->pool;
    struct poolq_entry *have;
    struct timespec ts;
    int rc;
    bool spawn;

    pthread_cond_init(&wpt->pqcond, NULL);
    pthread_mutex_lock(&pool->pqh.qmutex); 
    TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq); //将当前线程插入pool->wptqh

    wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);

    do {
                //如果当前线程有事做
        if (wpt->work) {
            wpt->work->wpt = wpt;
            spawn = pool->pqh.qcount < pool->params.thrd_min
                  && pool->n_threads < pool->params.thrd_max;
            if (spawn)
                pool->n_threads++;
            pthread_mutex_unlock(&pool->pqh.qmutex);

            if (spawn) {
                //线程不够,需要创建新线程
                (void)work_pool_spawn(pool);
            }

            wpt->work->fun(wpt->work);
            wpt->work = NULL;
            pthread_mutex_lock(&pool->pqh.qmutex);
        }

                //pool->pqh.qcount小于0说明所有线程都在干活,队列积攒了很多entry需要处理
        if (0 > pool->pqh.qcount++) {
                        //从队列中取出entry
            have = TAILQ_FIRST(&pool->pqh.qh);
            TAILQ_REMOVE(&pool->pqh.qh, have, q);
                        //告诉当前线程去处理这个entry
            wpt->work = (struct work_pool_entry *)have;
            continue;
        }
                //小技巧,将wpt->pqe插入队列,等同于将当前线程插入队列尾部
        TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);

        clock_gettime(CLOCK_REALTIME_FAST, &ts);
        timespec_addms(&ts, pool->timeout_ms);
                //等待CLOCK_REALTIME_FAST时间,看是否被work_pool_submit唤醒
        rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
                        &ts);
        if (!wpt->work) {
                        //如果这期间没有发生work_pool_submit,wpt->work就还为NULL
                        //将刚才插入队列假的entry,从队列中删除
            pool->pqh.qcount--;
            TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
        }
    } while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
        //如果有太多无所事事的线程在睡觉,则退出当前线程

    pool->n_threads--;
    TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
    pthread_mutex_unlock(&pool->pqh.qmutex);

    cond_destroy(&wpt->pqcond);
    mem_free(wpt, sizeof(*wpt));

    return (NULL);
}

int work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
{
    int rc = 0;

    pthread_mutex_lock(&pool->pqh.qmutex);

        //如果有没事做的线程在睡觉,此时队列里的元素都是睡觉的线程,而非需要处理的entry
    if (0 < pool->pqh.qcount--) {
        struct work_pool_thread *wpt = (struct work_pool_thread *)
            TAILQ_FIRST(&pool->pqh.qh);

        TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
                //告诉这个线程去做这件事情 
        wpt->work = work;

                //唤醒这个线程
        pthread_cond_signal(&wpt->pqcond);
    } else {
                 //如果所有线程都在忙,就把entry插入队列尾部
        TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
    }

    pthread_mutex_unlock(&pool->pqh.qmutex);
    return rc;
}
  1. 最多同时几个线程可以同时处理epoll产生的数据, RPC_Ioq_ThrdMax

Log分析

//svc_51 working thread 在等待事件
TRACE 0213 11:23:58.916742 10680  : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_51 waiting

//svc_85 正在处理0x387a9990指向的work_pool_entry
TRACE 0213 11:23:58.916872  3267  : xxxxxxx : <no-file>:0 :rpc :work_pool_thread() svc_85 task 0x387a9990

//接收了5440字节,但还有84192个字节没有读出来
TRACE 0213 11:23:58.921465  8398  : xxxxxxx : <no-file>:0 :rpc :svc_vc_recv: 0x3d60cc00 fd 274 recv 5440, need 84192, flags 2
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容