在并发编程领域,一个非常让程序员兴奋,感到有成就感的事情就是做性能优化,譬如发现某个线程成为了单点瓶颈,然后上多线程。
提到了上多线程,那自然就会引入 thread pool,也就是我们通常说的线程池,我们会将任务扔给线程池,然后线程池里面自己会负责将任务派发到不同的线程去执行,除开任务自身执行的开销,如何高效的派发也会决定一个线程池是否有足够好的性能。下面,我们就来聊聊几种常见的线程池的实现。
Mutex + channel
在 Rust 里面,我们可以通过标准库提供的 channel 进行通讯,但 channel 其实是一个 multi-producer,single-consumer 的结构,也就是我们俗称的 MPSC。但对于线程池来说,我们需要的是一个 MPMC 的 channel,也就是说,我们需要有一个队列,这个队列可以支持多个线程同时添加,同时获取任务。
虽然单独的 channel 没法支持,但如果我们给 channel 的 Receiver 套上一个 Mutex,在加上 Arc,其实就可以了。通过 Mutex 我们能保证多个线程同时只能有一个线程抢到 lock,然后从队列里面拿到数据。而加上 Arc 主要是能在多个线程共享了,这里就不说明了。
所以实现也就比较简单了,如下:
pub struct ThreadPool {
tx: Option<Sender<Task>>,
handlers: Option<Vec<thread::JoinHandle<()>>>,
}
impl ThreadPool {
pub fn new(number: usize) -> ThreadPool {
let (tx, rx) = channel::<Task>();
let mut handlers = vec![];
let arx = Arc::new(Mutex::new(rx));
for _ in 0..number {
let arx = arx.clone();
let handle = thread::spawn(move || {
while let Ok(task) = arx.lock().unwrap().recv() {
task.call_box();
}
});
handlers.push(handle);
}
ThreadPool {
tx: Some(tx),
handlers: Some(handlers),
}
}
}
Task 其实就是一个 FnBox,因为只有 nightly 版本支持 FnBox,所以我们自定义了一下
pub trait FnBox {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce()> FnBox for F {
fn call_box(self: Box<F>) {
(*self)()
}
}
pub type Task = Box<FnBox + Send>;
上面的代码逻辑非常的简单,创建一个 channel,然后使用 Arc + Mutex 包上 Receiver,创建多个线程,每个线程尝试去获取 channel 任务然后执行,如果 channel 里面没任务,recv 哪里就会等着,而其他的线程这时候因为没法拿到 lock 也会等着。
Condition Variable
抛开 channel,我们还有一种更通用的做法,可以用在不同的语言,譬如 C 上面,也就是使用 condition variable。关于 condition variable 的使用,大家可以 Google,因为在使用 condition variable 的时候,都会配套有一个 Mutex,所以我们可以通过这个 Mutex 同时控制 condition variable 以及任务队列。
首先我们定义一个 State,用来处理任务队列
struct State {
queue: VecDeque<Task>,
stopped: bool,
}
对于不同线程获取任务,我们可以通过
fn next_task(notifer: &Arc<(Mutex<State>, Condvar)>) -> Option<Task> {
let &(ref lock, ref cvar) = &**notifer;
let mut state = lock.lock().unwrap();
loop {
if state.stopped {
return None;
}
match state.queue.pop_front() {
Some(t) => {
return Some(t);
}
None => {
state = cvar.wait(state).unwrap();
}
}
}
}
首先就是尝试用 Mutex 拿到 State,如果外面没有结束,那么就尝试从队列里面获取任务,如果没有,就调用 Condition Variable 的 wait 进行等待了。
任务的添加也比较简单
let &(ref lock, ref cvar) = &*self.notifer;
{
let mut state = lock.lock().unwrap();
state.queue.push_back(task);
cvar.notify_one();
}
也是通过 lock 拿到 State,然后放到队列里面,在通知 Condition Variable。对于线程池的创建,也是比较容易的:
let s = State {
queue: VecDeque::with_capacity(1024),
stopped: false,
};
let notifer = Arc::new((Mutex::new(s), Condvar::new()));
for _ in 0..number {
let notifer = notifer.clone();
let handle = thread::spawn(move || {
while let Some(task) = next_task(¬ifer) {
task.call_box();
}
});
handlers.push(handle);
}
Crossbeam
上面提到的两种做法,虽然都非常的通用,但有一个明显的问题,就在于他是有全局 lock 的,在并发系统里面,lock 如果使用不当,会造成非常严重的性能开销,尤其是在出现 contention 的时候,所以多数时候,我们希望使用的是一个 lock-free 的数据结构。
幸运的是,在 Rust 里面,已经有一个非常稳定的库来提供相关的支持了,这个就是 crossbeam,关于 crossbeam 的相关知识,后面可以再开一篇文章来详细说明,这里我们直接使用 crossbeam 的 channel,不同于标准库的 channel,crossbeam 的 channel 是一个 MPMC 的实现,所以我们能非常方便的用到线程池上面,简单代码如下:
let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];
for _ in 0..number {
let rx = rx.clone();
let handle = thread::spawn(move || {
while let Some(task) = rx.recv() {
task.call_box();
}
});
handlers.push(handle);
}
可以看到,crossbeam 的 channel 使用比标准库的更简单,它甚至不需要 Arc 来包一层,而且还是 lock-free 的。
参考这个 benchmark,分别对不同的 ThreadPool 进行测试,在我的机器上面会发现 crossbeam 的性能会明显好很多,标准库 channel 其次,最后才是 condition variable。
test thread_pool::benchmark_condvar_thread_pool ... bench: 128,924,340 ns/iter (+/- 39,853,735)
test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench: 1,497,272 ns/iter (+/- 355,120)
test thread_pool::benchmark_std_channel_thread_pool ... bench: 50,925,087 ns/iter (+/- 6,753,377)
Channel Per-thread
可以看到,使用 crossbeam 的效果已经非常好了,但这种实现其实还有一个问题,主要在于它有一个全局的队列,当并发严重的时候,多个线程对这个全局队列的争抢,可能成为瓶颈。另外,还有一个问题在于,它的派发机制是任意的,也就是那个线程抢到了任务就执行,在某些时候,我们希望一些任务其实是在某个线程上面执行的,这样对于 CPU 的 cache 来说会更加友好,譬如有一个任务在执行的时候,又会产生一个后续任务,自然,我们希望这个后续任务在同一个线程执行。
为了解决上面的问题,最直观的做法就是每个线程一个队列,这样我们就能够显示的控制任务派发了。一个非常简单的例子
let mut handlers = vec![];
let mut txs = vec![];
for _ in 0..number {
let (tx, rx) = channel::unbounded::<Task>();
let handle = thread::spawn(move || {
while let Some(task) = rx.recv() {
task.call_box();
}
});
txs.push(tx);
handlers.push(handle);
}
上面我们为每个线程创建了一个 channel,这样每个线程就不用去争抢全局的 channel 了。
派发的时候我们也可以手动派发,譬如根据某个 ID hash 到一个对应的 thread 上面,通过 Sender 发送 消息。
Work Stealing
虽然每个线程一个 channel 解决了全局争抢问题,也提升了 CPU cache 的使用,但它引入了另一个问题,就是任务的不均衡。直观的来说,就是会导致某些线程一直忙碌,在不断的处理任务,而另一些线程则没有任务处理,一直很闲。为了解决这个问题,就有了 Work Stealing 的线程池。
Work Stealing 的原理其实很简单,当一个线程执行完自己线程队列里面的所有任务之后,它会尝试去其它线程的队列里面偷一点任务执行。
因为 Work Stealing 的实现过于复杂,这里就不描述了,Rust 的 tokio 库提供了一个 tokio-threadpool,就是基于 Work Stealing 来做的,不过现在只提供了 Future 的支持。
小结
上面简单的列举了一些线程池的实现方式,如果你只是单纯的想用一个比较简单的派发功能,基于 crossbeam 的就可以了,复杂一点的可以使用 Work Stealing 的。当然,这里只是大概列举了一些,如果有更好的实现,麻烦跟我联系讨论,我的邮箱 tl@pingcap.com。