Rust 基础知识21 - 并发

并发

  • Rust 称之为 fearless concurrency 据说可以写出没有诡异缺陷的代码,接下来就带着兴奋的心情了解一下吧。
  • 并发中需要讨论或者说需要关注的问题大致如下:

创建线程同时运行代码。
使用通道在线程间发送消息的"消息传递式并发"。
允许多个线程访问同一片数据的状态式并发。
Sync trait 与 Send trait,能够将Rust的并发保证从标准库中提供的类型扩展至用户自定义的类型。

知识汇总

创建线程同时运行代码可能遇到的问题

  • 多个线程一不一致的顺序访问数据或者资源时产生的竞争状态(race condition)。
  • 当两个线程同时尝试获取对方持有资源时产生的死锁(deadlock)。
  • 只会出现在特定情况下的且难以稳定重现和修复的bug。

使用spawn 创建新线程

  • 主线程运行结束子线程也就会提前终止,可以看下面的例子:

use std::thread;
use std::time::Duration;

// 如下代码只要main() 运行完毕,那么子线成也就被终止了
// 可以看大如下代码中 thread::spawn 闭包调用的次数多,而main函数中调用的次数少。
// 但是只要main 中的1..5打印结束,那么子线程循环就会被提前终止。
fn main() {
    thread::spawn(||{
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}

  • thread::spawn 的返回值类型是一个自持有所有权的JoinHandle,调用它的join方法可以阻塞当前线程直到对应的新线程运行结束。
  • 把上面的例子稍稍改进:
use std::time::Duration;

fn main() {
    // 返回自持的JoinHandler 所有权
    let handler = thread::spawn(||{
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    // 调用join 方法来保证新线程可以执行完毕。
    handler.join().unwrap();
}

使用 move 关键字

  • move 会将环境变量的所有权转移到线程中,看下面的例子和注释就明白了。
use std::thread;
use std::time::Duration;

fn main() {

    let v = vec![1,2,3];
    // 这个里面如果使用外部的变量比如上面的 v:Vec<i32>
    // 那么这时候Rust 就遇到了一问题,他并不知道外面的这个v 的有效期有多长
    // 这是后就可以通过 move 关键字,强制闭包获得它所需值的所有权,而不仅仅是基于借用。
    let handler = thread::spawn(move ||{
        println!("Here's a vector : {:?}", v);
    });


    // 调用join 方法来保证新线程可以执行完毕。
    handler.join().unwrap();
}

使用消息传递在线程间转移数据

  • Go语言文档中的口号是"不要通过共享内存来通信,而是要通过通信来共享内存"。
  • Rust 在"标准库"中实现了一个名为通道(channel)的概念,他可以被用来实现基于消息传递的并发机制。
  • 通道由发送者(transmitter)和接受者(receiver)两部分组成,当丢弃了任何一方,通道就关闭了(closed)。
  • 下面是一个举例,接受者和发送者通过一个带模式的 let 语句进行元祖解构,获取变量:
use std::thread;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    // tx 是发送者 ,rx 是接收者 , mpsc = (Multiple producer,single consumer.)
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    // 注意 recv() 方法会阻塞主线程,直到有值被传入。
    let received = rx.recv().unwrap();
    // 另外的 try_recv() 方法不会阻塞线程,他会立即返回Result<T,E>
    // 当通道中存在消息时,返回包含该消息的Ok变体,否则便返回Err变体。
    // 当某个线程需要一边等待消息一边工作是,可以编写一个不断调用try_recv方法的循环,并在有消息来时对其进行处理。
    println!("Got : {}", received);
}

通道和所有权转移

  • 所有权规则在消息传递的过程中扮演了至关重要的角色,因为它可以帮助你写出安全的并发代码。

发送多个值并观察接收者的等待过程

  • 通过一个例子进行展示,以英国可以看到接受了多条消息,而且每条消息发送后暂停了一面,充分感受这种美妙:
use std::thread;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    
    // 这里并没有在主线程的for 循环中执行暂停或者延迟命令,这也就表明了主线程确实是在等待接收新线程中传递过来的值。
    for received in rx {
        println!("Got : {}", received);
    }
}

通过克隆发送者创建多个生产者。

  • 这里将尝试通过克隆通道的发送端来创建出多个能够发送值到同一个接收端的线程。
  • 注意克隆发送者需要用到 mpsc::Sender::clone(&tx) 不要搞错,参考下面的例子:
use std::thread;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            // 用Copy 的mpsc::Sender 发送
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("message"),
            String::from("for"),
            String::from("you"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        // 因为是两个线程启动的发送端,所以会插着接收两个tx 的值。
        println!("Got : {}", received);
    }
}

共享状态的并发

任何编程语言中的通道都有些类似于单一所有权的概念.
因为你不应该在值传递给通道后在使用它。
而基于共享内存的并发通信机制更类似于"多重所有权"的概念。
多个线程可以同时访问相同的内存地址。

互斥体(Mutex)一次只允许一个线程访问数据

  • 互斥体Mutex 是英文你 mutual exclusion 的缩写,也就是说一个互斥体在任意时刻只允许一个线程访问数据。
  • 使用时线程必须发出信号来获取互斥体的锁(lock)通过锁机制,互斥体守护(guarding)了它所持有的数据。
  • 你可以想象成只有一个话筒大会发言,其中一个人发言后,会把话筒传递给下一个人,话筒持有人如果在发言后忘记将话筒移交出去,整个大会就会停摆,这也就是为什么会有那么多通道机制的拥护者。
  • 但是还是有必要了解一下Rust 中这种结构的处理方式的。
  • 首先是 Mutex<T> 接口,为了便于演示先看下面的单线程代码:

use std::sync::Mutex;

fn main() {
    // 创建一个互斥体变量,如你所见 Mutex 也是一种智能指针。
    let m = Mutex::new(5);
    {
        // 为了访问Mutex<T>实例中的数据,我们首先需要调用它的lock方法来获取锁。
        // 注意这个调用会阻塞当前线程直到我们取得锁为止。
        // 当前lock()函数的调用会在其他某个持有锁的线程是发生panic
        // 返回值实际上是一种名字叫 MutexGuard<T>的智能指针,它通过 Deref 来指向存储在内容的数据,它还通过实现Drop来完成自己离开作用域时的自动解锁操作。
        let mut num = m.lock().unwrap();
        // 一旦获取了锁,便可以将它的返回值num视作一个指向内部数据的可变引用
        *num = 6; //因为Mutex<i32>并不是i32,所以必须获取锁才能使用i32的值。
    }// 一旦作用域离开,那么锁就会被自动释放,这个释放过程是自动的。

    // 打印后可以发现这个互斥体的变量值确实从5变成了6. :m = Mutex { data: 6 }
    println!("m = {:?}", m);
}

  • 了解了上面的概念后接下来看看在多个线程间共享Mutex<T>

事实上这个相对会复杂很多,这里涉及到 Arc<T> (原子引用计数)
它既拥有类似Rc<T>的行为,又保证了自己可以被安全地用于并发场景。
下面的例子如果你尝试用 Rc<T> 替代 Arc<T> 就会报错,可以试一下:

use std::thread;
use std::sync::{Mutex, Arc};
use std::rc::Rc;

fn main() {
    // 这里不能使用 Rc<T> 因为他不是跨线程安全的,因为它没有使用任何原语来保证修改计数的过程不会被另一个线程打断。
    // let counter = Rc::new(Mutex::new(0)) ;
    let counter = Arc::new(Mutex::new(0)) ;
    let mut handles = vec![];

    for _ in 0..10 {
        // Rc 和 Arc 的接口方法一致,直接对调就可以了,不能用Rc 多线程要用 Arc 直接记下结论也可以。
        // let counter = Rc::clone(&counter);
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result {}", *counter.lock().unwrap());
}

  • 事实上 Mutex<T> 与 RefCell<T> 系列有着很相似的功能,他们都提供了内部可变性。
  • 另外需要注意使用 Mutex<T> 也会有产生死锁的风险(deadlock),对此可以查看标准库API文档的Mutex<T>和MutexGuard的页面获取更多信息。

使用 Sync trait 和 Send trait 对并发进行扩展

允许线程间访问的 Send trait

  • 只有实现了Send trait 的类型才可以安全的在线程间转移所有权。出了Rc<T> 等极少数的类型,几乎所有的Rust 类型都实现了Send trait。
  • Rc<T> 只被设计在单场景中使用,它也无须为线程安全付出额外的性能开销。
  • 任何完全由Send 类型组成的复合类型都会被挨冻标记为Send,除了接下来讨论的"裸指针",几乎所有原生类型都满足Send需求。

允许多线程同时访问的Sync trait

  • 只有实现了Sync trait的类型才可以安全地被多个线程引用。
  • 换句话说对于任何T,如果&T满足约束Send,那么T就是满足Sync的。
  • 智能指针中Rc<T>,RefCell<T>,Cell<T>系列不满足Sync约束,RefCell<T>实现的运行时借用检查并没有提供有关的线程安全保证。

手动实现Send和Sync是不安全的

  • 当某个类型完全由实现了Send与Sync的类型组成时,它就自动实现Send与Sync因此我们并不需要手动的去实现这类相关的trait。
  • 当你需要使用多线程时,请不要忘记到网络上搜索最新的、最具最高水准的第三方包。

结束

  • 感谢阅读,See you at work.
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容