Rust 并发编程 - Thread Pool

在并发编程领域,一个非常让程序员兴奋,感到有成就感的事情就是做性能优化,譬如发现某个线程成为了单点瓶颈,然后上多线程。

提到了上多线程,那自然就会引入 thread pool,也就是我们通常说的线程池,我们会将任务扔给线程池,然后线程池里面自己会负责将任务派发到不同的线程去执行,除开任务自身执行的开销,如何高效的派发也会决定一个线程池是否有足够好的性能。下面,我们就来聊聊几种常见的线程池的实现。

Mutex + channel

在 Rust 里面,我们可以通过标准库提供的 channel 进行通讯,但 channel 其实是一个 multi-producer,single-consumer 的结构,也就是我们俗称的 MPSC。但对于线程池来说,我们需要的是一个 MPMC 的 channel,也就是说,我们需要有一个队列,这个队列可以支持多个线程同时添加,同时获取任务。

虽然单独的 channel 没法支持,但如果我们给 channel 的 Receiver 套上一个 Mutex,在加上 Arc,其实就可以了。通过 Mutex 我们能保证多个线程同时只能有一个线程抢到 lock,然后从队列里面拿到数据。而加上 Arc 主要是能在多个线程共享了,这里就不说明了。

所以实现也就比较简单了,如下:

pub struct ThreadPool {
    tx: Option<Sender<Task>>,
    handlers: Option<Vec<thread::JoinHandle<()>>>,
}
impl ThreadPool {
    pub fn new(number: usize) -> ThreadPool {
        let (tx, rx) = channel::<Task>();
        let mut handlers = vec![];

        let arx = Arc::new(Mutex::new(rx));
        for _ in 0..number {
            let arx = arx.clone();
            let handle = thread::spawn(move || {
                while let Ok(task) = arx.lock().unwrap().recv() {
                    task.call_box();
                }
            });

            handlers.push(handle);
        }

        ThreadPool {
            tx: Some(tx),
            handlers: Some(handlers),
        }
    }
}

Task 其实就是一个 FnBox,因为只有 nightly 版本支持 FnBox,所以我们自定义了一下

pub trait FnBox {
    fn call_box(self: Box<Self>);
}

impl<F: FnOnce()> FnBox for F {
    fn call_box(self: Box<F>) {
        (*self)()
    }
}

pub type Task = Box<FnBox + Send>;

上面的代码逻辑非常的简单,创建一个 channel,然后使用 Arc + Mutex 包上 Receiver,创建多个线程,每个线程尝试去获取 channel 任务然后执行,如果 channel 里面没任务,recv 哪里就会等着,而其他的线程这时候因为没法拿到 lock 也会等着。

Condition Variable

抛开 channel,我们还有一种更通用的做法,可以用在不同的语言,譬如 C 上面,也就是使用 condition variable。关于 condition variable 的使用,大家可以 Google,因为在使用 condition variable 的时候,都会配套有一个 Mutex,所以我们可以通过这个 Mutex 同时控制 condition variable 以及任务队列。

首先我们定义一个 State,用来处理任务队列

struct State {
    queue: VecDeque<Task>,
    stopped: bool,
}

对于不同线程获取任务,我们可以通过

fn next_task(notifer: &Arc<(Mutex<State>, Condvar)>) -> Option<Task> {
    let &(ref lock, ref cvar) = &**notifer;
    let mut state = lock.lock().unwrap();
    loop {
        if state.stopped {
            return None;
        }
        match state.queue.pop_front() {
            Some(t) => {
                return Some(t);
            }
            None => {
                state = cvar.wait(state).unwrap();
            }
        }
    }
}

首先就是尝试用 Mutex 拿到 State,如果外面没有结束,那么就尝试从队列里面获取任务,如果没有,就调用 Condition Variable 的 wait 进行等待了。

任务的添加也比较简单

let &(ref lock, ref cvar) = &*self.notifer;
{
    let mut state = lock.lock().unwrap();
    state.queue.push_back(task);
    cvar.notify_one();
}

也是通过 lock 拿到 State,然后放到队列里面,在通知 Condition Variable。对于线程池的创建,也是比较容易的:

let s = State {
    queue: VecDeque::with_capacity(1024),
    stopped: false,
};
let notifer = Arc::new((Mutex::new(s), Condvar::new()));
for _ in 0..number {
    let notifer = notifer.clone();
    let handle = thread::spawn(move || {
        while let Some(task) = next_task(&notifer) {
            task.call_box();
        }
    });

    handlers.push(handle);
}

Crossbeam

上面提到的两种做法,虽然都非常的通用,但有一个明显的问题,就在于他是有全局 lock 的,在并发系统里面,lock 如果使用不当,会造成非常严重的性能开销,尤其是在出现 contention 的时候,所以多数时候,我们希望使用的是一个 lock-free 的数据结构。

幸运的是,在 Rust 里面,已经有一个非常稳定的库来提供相关的支持了,这个就是 crossbeam,关于 crossbeam 的相关知识,后面可以再开一篇文章来详细说明,这里我们直接使用 crossbeam 的 channel,不同于标准库的 channel,crossbeam 的 channel 是一个 MPMC 的实现,所以我们能非常方便的用到线程池上面,简单代码如下:

let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];

for _ in 0..number {
    let rx = rx.clone();
    let handle = thread::spawn(move || {
        while let Some(task) = rx.recv() {
            task.call_box();
        }
    });

    handlers.push(handle);
}

可以看到,crossbeam 的 channel 使用比标准库的更简单,它甚至不需要 Arc 来包一层,而且还是 lock-free 的。

参考这个 benchmark,分别对不同的 ThreadPool 进行测试,在我的机器上面会发现 crossbeam 的性能会明显好很多,标准库 channel 其次,最后才是 condition variable。

test thread_pool::benchmark_condvar_thread_pool           ... bench: 128,924,340 ns/iter (+/- 39,853,735)
test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench:   1,497,272 ns/iter (+/- 355,120)
test thread_pool::benchmark_std_channel_thread_pool       ... bench:  50,925,087 ns/iter (+/- 6,753,377)

Channel Per-thread

可以看到,使用 crossbeam 的效果已经非常好了,但这种实现其实还有一个问题,主要在于它有一个全局的队列,当并发严重的时候,多个线程对这个全局队列的争抢,可能成为瓶颈。另外,还有一个问题在于,它的派发机制是任意的,也就是那个线程抢到了任务就执行,在某些时候,我们希望一些任务其实是在某个线程上面执行的,这样对于 CPU 的 cache 来说会更加友好,譬如有一个任务在执行的时候,又会产生一个后续任务,自然,我们希望这个后续任务在同一个线程执行。

为了解决上面的问题,最直观的做法就是每个线程一个队列,这样我们就能够显示的控制任务派发了。一个非常简单的例子

let mut handlers = vec![];
let mut txs = vec![];

for _ in 0..number {
    let (tx, rx) = channel::unbounded::<Task>();
    let handle = thread::spawn(move || {
        while let Some(task) = rx.recv() {
            task.call_box();
        }
    });

    txs.push(tx);
    handlers.push(handle);
}

上面我们为每个线程创建了一个 channel,这样每个线程就不用去争抢全局的 channel 了。

派发的时候我们也可以手动派发,譬如根据某个 ID hash 到一个对应的 thread 上面,通过 Sender 发送 消息。

Work Stealing

虽然每个线程一个 channel 解决了全局争抢问题,也提升了 CPU cache 的使用,但它引入了另一个问题,就是任务的不均衡。直观的来说,就是会导致某些线程一直忙碌,在不断的处理任务,而另一些线程则没有任务处理,一直很闲。为了解决这个问题,就有了 Work Stealing 的线程池。

Work Stealing 的原理其实很简单,当一个线程执行完自己线程队列里面的所有任务之后,它会尝试去其它线程的队列里面偷一点任务执行。

因为 Work Stealing 的实现过于复杂,这里就不描述了,Rust 的 tokio 库提供了一个 tokio-threadpool,就是基于 Work Stealing 来做的,不过现在只提供了 Future 的支持。

小结

上面简单的列举了一些线程池的实现方式,如果你只是单纯的想用一个比较简单的派发功能,基于 crossbeam 的就可以了,复杂一点的可以使用 Work Stealing 的。当然,这里只是大概列举了一些,如果有更好的实现,麻烦跟我联系讨论,我的邮箱 tl@pingcap.com

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,806评论 1 19
  • 接着上节 atomic,本节主要介绍condition_varible的内容,练习代码地址。本文参考http://...
    jorion阅读 8,477评论 0 7
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,664评论 0 12
  • 一个简单的单例示例 单例模式可能是大家经常接触和使用的一个设计模式,你可能会这么写 publicclassUnsa...
    Martin说阅读 2,215评论 0 6
  • 你用你有限的新绿 吸引着远山无限的苍翠, 有限是无限的眺望, 并于无限互为证据, 尘世的秋省亲欲归, 雁群捎回了一...
    质蕙阅读 365评论 0 1