并发
- Rust 称之为 fearless concurrency 据说可以写出没有诡异缺陷的代码,接下来就带着兴奋的心情了解一下吧。
- 并发中需要讨论或者说需要关注的问题大致如下:
创建线程同时运行代码。
使用通道在线程间发送消息的"消息传递式并发"。
允许多个线程访问同一片数据的状态式并发。
Sync trait 与 Send trait,能够将Rust的并发保证从标准库中提供的类型扩展至用户自定义的类型。
知识汇总
创建线程同时运行代码可能遇到的问题
- 多个线程一不一致的顺序访问数据或者资源时产生的竞争状态(race condition)。
- 当两个线程同时尝试获取对方持有资源时产生的死锁(deadlock)。
- 只会出现在特定情况下的且难以稳定重现和修复的bug。
使用spawn 创建新线程
- 主线程运行结束子线程也就会提前终止,可以看下面的例子:
use std::thread;
use std::time::Duration;
// 如下代码只要main() 运行完毕,那么子线成也就被终止了
// 可以看大如下代码中 thread::spawn 闭包调用的次数多,而main函数中调用的次数少。
// 但是只要main 中的1..5打印结束,那么子线程循环就会被提前终止。
fn main() {
thread::spawn(||{
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
- thread::spawn 的返回值类型是一个自持有所有权的JoinHandle,调用它的join方法可以阻塞当前线程直到对应的新线程运行结束。
- 把上面的例子稍稍改进:
use std::time::Duration;
fn main() {
// 返回自持的JoinHandler 所有权
let handler = thread::spawn(||{
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
// 调用join 方法来保证新线程可以执行完毕。
handler.join().unwrap();
}
使用 move 关键字
- move 会将环境变量的所有权转移到线程中,看下面的例子和注释就明白了。
use std::thread;
use std::time::Duration;
fn main() {
let v = vec![1,2,3];
// 这个里面如果使用外部的变量比如上面的 v:Vec<i32>
// 那么这时候Rust 就遇到了一问题,他并不知道外面的这个v 的有效期有多长
// 这是后就可以通过 move 关键字,强制闭包获得它所需值的所有权,而不仅仅是基于借用。
let handler = thread::spawn(move ||{
println!("Here's a vector : {:?}", v);
});
// 调用join 方法来保证新线程可以执行完毕。
handler.join().unwrap();
}
使用消息传递在线程间转移数据
- Go语言文档中的口号是"不要通过共享内存来通信,而是要通过通信来共享内存"。
- Rust 在"标准库"中实现了一个名为通道(channel)的概念,他可以被用来实现基于消息传递的并发机制。
- 通道由发送者(transmitter)和接受者(receiver)两部分组成,当丢弃了任何一方,通道就关闭了(closed)。
- 下面是一个举例,接受者和发送者通过一个带模式的 let 语句进行元祖解构,获取变量:
use std::thread;
use std::time::Duration;
use std::sync::mpsc;
fn main() {
// tx 是发送者 ,rx 是接收者 , mpsc = (Multiple producer,single consumer.)
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
// 注意 recv() 方法会阻塞主线程,直到有值被传入。
let received = rx.recv().unwrap();
// 另外的 try_recv() 方法不会阻塞线程,他会立即返回Result<T,E>
// 当通道中存在消息时,返回包含该消息的Ok变体,否则便返回Err变体。
// 当某个线程需要一边等待消息一边工作是,可以编写一个不断调用try_recv方法的循环,并在有消息来时对其进行处理。
println!("Got : {}", received);
}
通道和所有权转移
- 所有权规则在消息传递的过程中扮演了至关重要的角色,因为它可以帮助你写出安全的并发代码。
发送多个值并观察接收者的等待过程
- 通过一个例子进行展示,以英国可以看到接受了多条消息,而且每条消息发送后暂停了一面,充分感受这种美妙:
use std::thread;
use std::time::Duration;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 这里并没有在主线程的for 循环中执行暂停或者延迟命令,这也就表明了主线程确实是在等待接收新线程中传递过来的值。
for received in rx {
println!("Got : {}", received);
}
}
通过克隆发送者创建多个生产者。
- 这里将尝试通过克隆通道的发送端来创建出多个能够发送值到同一个接收端的线程。
- 注意克隆发送者需要用到
mpsc::Sender::clone(&tx)
不要搞错,参考下面的例子:
use std::thread;
use std::time::Duration;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
// 用Copy 的mpsc::Sender 发送
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("message"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
// 因为是两个线程启动的发送端,所以会插着接收两个tx 的值。
println!("Got : {}", received);
}
}
共享状态的并发
任何编程语言中的通道都有些类似于单一所有权的概念.
因为你不应该在值传递给通道后在使用它。
而基于共享内存的并发通信机制更类似于"多重所有权"的概念。
多个线程可以同时访问相同的内存地址。
互斥体(Mutex)一次只允许一个线程访问数据
- 互斥体Mutex 是英文你 mutual exclusion 的缩写,也就是说一个互斥体在任意时刻只允许一个线程访问数据。
- 使用时线程必须发出信号来获取互斥体的锁(lock)通过锁机制,互斥体守护(guarding)了它所持有的数据。
- 你可以想象成只有一个话筒大会发言,其中一个人发言后,会把话筒传递给下一个人,话筒持有人如果在发言后忘记将话筒移交出去,整个大会就会停摆,这也就是为什么会有那么多通道机制的拥护者。
- 但是还是有必要了解一下Rust 中这种结构的处理方式的。
- 首先是 Mutex<T> 接口,为了便于演示先看下面的单线程代码:
use std::sync::Mutex;
fn main() {
// 创建一个互斥体变量,如你所见 Mutex 也是一种智能指针。
let m = Mutex::new(5);
{
// 为了访问Mutex<T>实例中的数据,我们首先需要调用它的lock方法来获取锁。
// 注意这个调用会阻塞当前线程直到我们取得锁为止。
// 当前lock()函数的调用会在其他某个持有锁的线程是发生panic
// 返回值实际上是一种名字叫 MutexGuard<T>的智能指针,它通过 Deref 来指向存储在内容的数据,它还通过实现Drop来完成自己离开作用域时的自动解锁操作。
let mut num = m.lock().unwrap();
// 一旦获取了锁,便可以将它的返回值num视作一个指向内部数据的可变引用
*num = 6; //因为Mutex<i32>并不是i32,所以必须获取锁才能使用i32的值。
}// 一旦作用域离开,那么锁就会被自动释放,这个释放过程是自动的。
// 打印后可以发现这个互斥体的变量值确实从5变成了6. :m = Mutex { data: 6 }
println!("m = {:?}", m);
}
- 了解了上面的概念后接下来看看在多个线程间共享Mutex<T>
事实上这个相对会复杂很多,这里涉及到 Arc<T> (原子引用计数)
它既拥有类似Rc<T>的行为,又保证了自己可以被安全地用于并发场景。
下面的例子如果你尝试用 Rc<T> 替代 Arc<T> 就会报错,可以试一下:
use std::thread;
use std::sync::{Mutex, Arc};
use std::rc::Rc;
fn main() {
// 这里不能使用 Rc<T> 因为他不是跨线程安全的,因为它没有使用任何原语来保证修改计数的过程不会被另一个线程打断。
// let counter = Rc::new(Mutex::new(0)) ;
let counter = Arc::new(Mutex::new(0)) ;
let mut handles = vec![];
for _ in 0..10 {
// Rc 和 Arc 的接口方法一致,直接对调就可以了,不能用Rc 多线程要用 Arc 直接记下结论也可以。
// let counter = Rc::clone(&counter);
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result {}", *counter.lock().unwrap());
}
- 事实上 Mutex<T> 与 RefCell<T> 系列有着很相似的功能,他们都提供了内部可变性。
- 另外需要注意使用 Mutex<T> 也会有产生死锁的风险(deadlock),对此可以查看标准库API文档的Mutex<T>和MutexGuard的页面获取更多信息。
使用 Sync trait 和 Send trait 对并发进行扩展
允许线程间访问的 Send trait
- 只有实现了Send trait 的类型才可以安全的在线程间转移所有权。出了Rc<T> 等极少数的类型,几乎所有的Rust 类型都实现了Send trait。
- Rc<T> 只被设计在单场景中使用,它也无须为线程安全付出额外的性能开销。
- 任何完全由Send 类型组成的复合类型都会被挨冻标记为Send,除了接下来讨论的"裸指针",几乎所有原生类型都满足Send需求。
允许多线程同时访问的Sync trait
- 只有实现了Sync trait的类型才可以安全地被多个线程引用。
- 换句话说对于任何T,如果&T满足约束Send,那么T就是满足Sync的。
- 智能指针中Rc<T>,RefCell<T>,Cell<T>系列不满足Sync约束,RefCell<T>实现的运行时借用检查并没有提供有关的线程安全保证。
手动实现Send和Sync是不安全的
- 当某个类型完全由实现了Send与Sync的类型组成时,它就自动实现Send与Sync因此我们并不需要手动的去实现这类相关的trait。
- 当你需要使用多线程时,请不要忘记到网络上搜索最新的、最具最高水准的第三方包。
结束
- 感谢阅读,See you at work.