Rust学习笔记7 多线程与线程通信

github地址:https://github.com/bradyjoestar/rustnotes(欢迎star!)
pdf下载链接:https://github.com/bradyjoestar/rustnotes/blob/master/Rust%E8%AF%AD%E8%A8%80%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0.pdf
参考:
https://rustcc.gitbooks.io/rustprimer/content/ 《RustPrimer》
https://kaisery.github.io/trpl-zh-cn/ 《Rust程序设计语言-简体中文版》

第七章 多线程与线程通信

从结论上来说,rust的编译器并不能防止所有的线程引起的问题,例如:

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();// mpsc 是多个发送者,一个接收者

    thread::spawn(move || {
        let val = String::from("hi");
//        tx.send(val).unwrap();
    });

    let received:String = rx.recv().unwrap(); // 阻塞等待,直到接收到一个消息
    println!("Got: {}", received);
}

例如上述代码编译通过,但是在运行过程中会死锁。而rust的编译器无法识别此类错误。Rust对安全的理解是死锁不属于内存安全问题。

Rust在并发上的改进是:

通过改进所有权和类型检查,Rust 很多并发错误都是 编译时 错误,而非运行时错误。

更强大的编译器和类型检查排除了很多并发问题,但并不能解决所有并发问题。所以在进行并发编程时仍然要小心谨慎。

7.1 线程

7.1.1 不同语言的线程实现

在大部分现代操作系统中,执行中程序的代码运行于一个 进程(process)中,操作系统则负责管理多个进程。在程序内部,也可以拥有多个同时运行的独立部分。这个运行这些独立部分的功能被称为 线程(threads)。

编程语言有一些不同的方法来实现线程。很多操作系统提供了创建新线程的 API。这种由编程语言调用操作系统 API 创建线程的模模型有时被称为 1:1,一个 OS 线程对应一个语言线程。

很多编程语言提供了自己特殊的线程实现。编程语言提供的线程被称为 绿色(green)线程,使用绿色线程的语言会在不同数量的 OS 线程的上下文中执行它们。为此,绿色线程模式被称为 M:N 模型:M 个绿色线程对应 N 个 OS 线程,这里 M 和 N 不必相同。

在当前上下文中,运行时 代表二进制文件中包含的由语言自身提供的代码。这些代码根据语言的不同可大可小,不过任何非汇编语言都会有一定数量的运行时代码。为此,通常人们说一个语言 “没有运行时”,一般意味着 “小运行时”。更小的运行时拥有更少的功能不过其优势在于更小的二进制输出,这使其易于在更多上下文中与其他语言相结合。虽然很多语言觉得增加运行时来换取更多功能没有什么问题,但是 Rust 需要做到几乎没有运行时,同时为了保持高性能必需能够调用 C 语言,这点也是不能妥协的。

绿色线程的 M:N 模型更大的语言运行时来管理这些线程。为此,Rust 标准库只提供了 1:1 线程模型实现。Rust 是足够底层的语言,所以有相应的 crate 实现了 M:N 线程模型,如果你宁愿牺牲性能来换取例如更好的线程运行控制和更低的上下文切换成本。

7.1.2 使用spawn创建新线程

为了创建一个新线程,需要调用 thread::spawn 函数并传递一个闭包(第十三章学习了闭包),其包含希望在新线程运行的代码。

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }
}
运行结果:
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

由于主线程结束,上面例子中的代码大部分时候不光会提早结束新建线程,甚至不能实际保证新建线程会被执行。其原因在于无法保证线程运行的顺序!

7.1.3 使用join等待所有线程结束

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

通过调用 handle 的 join 会阻塞当前线程直到 handle 所代表的线程结束。阻塞(Blocking) 线程意味着阻止该线程执行工作或退出。因为我们将join调用放在了主线程的 for 循环之后,运行示例 16-2 应该会产生类似这样的输出:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

这两个线程仍然会交替执行,不过主线程会由于 handle.join() 调用会等待直到新建线程执行完毕。

7.1.4 线程与move闭包

thread::spawn 的闭包并没有任何参数:并没有在新建线程代码中使用任何主线程的数据。为了在新建线程中使用来自于主线程的数据,需要新建线程的闭包获取它需要的值。

use std::thread;

fn main() {
    let v = vec![1, 2, 3];

    let handle = thread::spawn(move || {
        println!("Here's a vector: {:?}", v);
    });

    handle.join().unwrap();
}

线程中使用的闭包只能是move修饰。

7.2 消息传递

Rust的通道(channel)可以把一个线程的消息(数据)传递到另一个线程,从而让信息在不同的线程中流动,从而实现协作。详情请参见std::sync::mpsc。通道的两端分别是发送者(Sender)和接收者(Receiver),发送者负责从一个线程发送消息,接收者则在另一个线程中接收该消息。

简单的例子:

use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建一个通道
    let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = 
        mpsc::channel();

    // 创建线程用于发送消息
    thread::spawn(move || {
        // 发送一个消息,此处是数字id
        tx.send(1).unwrap();
    });

    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
}

运行结果:

receive 1

结果表明main所在的主线程接收到了新建线程发送的消息,用Rust在线程间传递消息就是这么简单!
虽然简单,但使用过其他语言就会知道,通道有多种使用方式,且比较灵活,为此我们需要进一步考虑关于Rust的Channel的几个问题:

  1. 通道能保证消息的顺序吗?是否先发送的消息,先接收?
  2. 通道能缓存消息吗?如果能的话能缓存多少?
  3. 通道的发送者和接收者支持N:1,1:N,N:M模式吗?
  4. 通道能发送任何数据吗?
  5. 发送后的数据,在线程中继续使用没有问题吗?
    下面各小节会对上面各部分问题进行回答。

7.2.1 通道与所有权的转移

在并发编程中避免错误是在整个 Rust 程序中必须思考所有权所换来的一大优势。

现在让我们做一个试验来看看通道与所有权如何一同协作以避免产生问题:我们将尝试在新建线程中的通道中发送完 val 值 之后 再使用它。

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

这里尝试在通过 tx.send 发送 val 到通道中之后将其打印出来。允许这么做是一个坏主意:一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。
编译报错:

error[E0382]: use of moved value: `val`
  --> src/main.rs:10:31
   |
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value used here after move
   |
   = note: move occurs because `val` has type `std::string::String`, which does
not implement the `Copy` trait

7.2.2 通道保证发送的顺序

use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

这一次,在新建线程中有一个字符串 vector 希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个Duration值调用thread::sleep 函数来暂停一秒。
在主线程中,不再显式调用 recv 函数:而是将 rx 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当通道被关闭时,迭代器也将结束。

打印结果:
Got: hi
Got: from
Got: the
Got: thread

7.2.3 通过克隆发送者来创建多个生产者

之前我们提到了mpsc是 multiple producer, single consumer 的缩写。可以运用 mpsc以创建都向同一接收者发送值的多个线程。这可以通过克隆通道的发送端在来做到。

let (tx, rx) = mpsc::channel();

let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];

    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];

    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});

for received in rx {
    println!("Got: {}", received);
}

这一次,在创建新线程之前,我们对通道的发送端调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的通道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向通道的接收端发送不同的消息。
如果运行这些代码,你可能会看到这样的输出(两次的运行结果):

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: you
Got: thread

虽然你可能会看到这些值以不同的顺序出现。在并发下,运行结果可能每次都不相同。以进入到channel的顺序为主。

7.2.4 异步通道与同步通道

Rust的标准库其实提供了两种类型的通道:异步通道和同步通道。在前面使用的都是异步通道。异步通道指的是:不管接收者是否正在接收消息,消息发送者在发送消息时都不会阻塞。为了验证这一点,我们尝试多增加个线程来发送消息:

use std::sync::mpsc;
use std::thread;

// 线程数量
const THREAD_COUNT :i32 = 6;

fn main() {
    // 创建一个通道
    let (tx, rx): (mpsc::Sender<i32>, mpsc::Receiver<i32>) = mpsc::channel();

    // 创建线程用于发送消息
    for id in 0..THREAD_COUNT {
        // 注意Sender是可以clone的,这样就可以支持多个发送者
        let thread_tx = tx.clone();
        thread::spawn(move || {
            // 发送一个消息,此处是数字id
            thread_tx.send(id + 1).unwrap();
            println!("send {}", id + 1);
        });
    }

    thread::sleep_ms(2000);
    println!("wake up");
    // 在主线程中接收子线程发送的消息并输出
    for _ in 0..THREAD_COUNT {
        println!("receive {}", rx.recv().unwrap());
    }
}

返回结果:

send 1
send 2
wake up
receive 1
receive 2

在代码中,我们故意让main所在的主线程睡眠2秒,从而让发送者所在线程优先执行,通过结果可以发现,发送者发送消息时确实没有阻塞。
异步通道具备消息缓存的功能,理论上是无穷的,直至内存耗光为止。
异步通道的具有良好的灵活性和扩展性,针对业务需要,可以灵活地应用于实际项目中。
同步通道:
同步通道在使用上同异步通道一样,接收端也是一样的,唯一的区别在于发送端,唯一不同的在于创建同步通道的那行代码。同步通道是sync_channel,对应的发送者也变成了SyncSender。为了显示出同步通道的区别,故意添加了一些打印。和异步通道相比,存在两点不同:

  1. 同步通道是需要指定缓存的消息个数的,但需要注意的是,最小可以是0,表示没有缓存。
    2.发送者是会被阻塞的。当通道的缓存队列不能再缓存消息时,发送者发送消息时,就会被阻塞。当缓存队列有空间存放时,会从阻塞状态唤醒。
use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建一个同步通道
    let (tx, rx): (mpsc::SyncSender<i32>, mpsc::Receiver<i32>) = mpsc::sync_channel(1);

    let tx1 = tx.clone();
    // 创建线程用于发送消息
    let new_thread = thread::spawn(move || {
        // 发送一个消息,此处是数字id
        println!("before send");
        tx.send(1).unwrap();
        println!("after send");
    });

    let thread_two = thread::spawn(move || {
        // 发送一个消息,此处是数字id
        println!("before send two");
        tx1.send(2).unwrap();
        println!("after send two");
    });

    println!("before sleep");
    thread::sleep_ms(5000);
    println!("after sleep");
    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
    println!("receive {}", rx.recv().unwrap());
    thread_two.join().unwrap();
}

7.2.5 可发送的消息类型

之前例子中我们传递的消息类型大部分为i32,除了这种类型之外,是否还可以传递更多的原始类型,或者更复杂的类型,和自定义类型?下面我们尝试发送一个更复杂的Rc类型的消息:

use std::fmt;
use std::sync::mpsc;
use std::thread;
use std::rc::Rc;

pub struct Student {
    id: u32
}

impl fmt::Display for Student {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "student {}", self.id)
    }
}

fn main() {
    // 创建一个通道
    let (tx, rx): (mpsc::Sender<Rc<Student>>, mpsc::Receiver<Rc<Student>>) = 
        mpsc::channel();

    // 创建线程用于发送消息
    thread::spawn(move || {
        // 发送一个消息,此处是数字id
        tx.send(Rc::new(Student{
            id: 1,
        })).unwrap();
    });

    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
}

编译报错:

error: the trait `core::marker::Send` is not 
implemented for the type `alloc::rc::Rc<Student>` [E0277]
note: `alloc::rc::Rc<Student>` cannot be sent between threads safely

将Rc改成Arc,代码如下:

use std::fmt;
use std::sync::{mpsc, Arc};
use std::thread;
use std::rc::Rc;

pub struct Student {
    id: u32
}

impl fmt::Display for Student {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "student {}", self.id)
    }
}

fn main() {
    // 创建一个通道
    let (tx, rx): (mpsc::Sender<Arc<Student>>, mpsc::Receiver<Arc<Student>>) =
        mpsc::channel();

    // 创建线程用于发送消息
    thread::spawn(move || {
        // 发送一个消息,此处是数字id
        tx.send(Arc::new(Student{
            id: 1,
        })).unwrap();
    });

    // 在主线程中接收子线程发送的消息并输出
    println!("receive {}", rx.recv().unwrap());
}

消息类型必须实现marker trait Send。Rust之所以这样强制要求,主要是为了解决并发安全的问题,再一次强调,安全是Rust考虑的重中之重。如果一个类型是Send,则表明它可以在线程间安全的转移所有权(ownership),当所有权从一个线程转移到另一个线程后,同一时间就只会存在一个线程能访问它,这样就避免了数据竞争,从而做到线程安全。

在上面两个例子中,Rc这一类型的智能指针没有实现trait Send,而 Arc实现了。关于send与sync在下一节详细说明。

7.3 send与sync

7.3.1 send

Send 标记 trait 表明类型的所有权可以在线程间传递。几乎所有的 Rust 类型都是Send 的,不过有一些例外,包括 Rc<T>:这是不能 Send 的,因为如果克隆了 Rc<T> 的值并尝试将克隆的所有权转移到另一个线程,这两个线程都可能同时更新引用计数。为此,Rc<T> 被实现为用于单线程场景,这时不需要为拥有线程安全的引用计数而付出性能代价。

因此,Rust 类型系统和 trait bound 确保永远也不会意外的将不安全的 Rc<T> 在线程间发送。当尝试这么做的时候,会得到错误 the trait Send is not implemented for Rc<Mutex<i32>>。而使用标记为 Send 的 Arc<T> 时,就没有问题了。

任何完全由 Send 的类型组成的类型也会自动被标记为 Send。几乎所有基本类型都是 Send 的,

7.3.2 sync

Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用。换一种方式来说,对于任意类型 T,如果 &T(T 的引用)是 Send 的话 T 就是 Sync 的,这意味着其引用就可以安全的发送到另一个线程。类似于 Send 的情况,基本类型是 Sync 的,完全由 Sync 的类型组成的类型也是 Sync 的。

智能指针 Rc<T> 也不是 Sync 的,出于其不是 Send 相同的原因。RefCell<T>和 Cell<T> 系列类型不是 Sync 的。RefCell<T> 在运行时所进行的借用检查也不是线程安全的。Mutex<T> 是 Sync 的,正如 “在线程间共享Mutex<T>” 部分所讲的它可以被用来在多线程中共享访问。

7.3.3 手动实现send和sync需要加上unsafe

通常并不需要手动实现 Send 和 Sync trait,因为由Send和 Sync 的类型组成的类型,自动就是 Send和 Sync 的。因为他们是标记 trait,甚至都不需要实现任何方法。他们只是用来加强并发相关的不可变性的。

7.4 共享内存

go中虽然提供了channel方式进行通信,但同样提供了共享内存的方式。主要原因在于所有的数据交互通过channel方式通信过于复杂。在rust中同样提供了共享内存的方式和相应的锁等机制。

在rust中,共享内存主要有两种方式:

static和堆。

7.4.1 static

Rust语言中也存在static变量,其生命周期是整个应用程序,并且在内存中某个固定地址处只存在一份实例。所有线程都能够访问到它。这种方式也是最简单和直接的共享方式。

use std::thread;

static mut VAR: i32 = 5;

fn main() {
    // 创建一个新线程
    let new_thread = thread::spawn(move|| {
        unsafe {
            println!("static value in new thread: {}", VAR);
            VAR = VAR + 1;
        }
    });

    // 等待新线程先运行
    new_thread.join().unwrap();

    // compile error
    //error[E0133]: use of mutable static is unsafe and requires unsafe function or block
    //        --> src/main.rs:16:49
    //        |
    //        16 |     println!("static value in main thread: {}", VAR);
    //    |                                                 ^^^ use of mutable static

    // println!("static value in main thread: {}", VAR);

    unsafe {
        println!("static value in main thread: {}", VAR);
    }

}

运行结果:

static value in new thread: 5
static value in main thread: 6

从结果来看VAR的值变了,从代码上来看,除了在VAR变量前面加了mut关键字外,更加明显的是在使用VAR的地方都添加了unsafe代码块。为什么?所有的线程都能访问VAR,且它是可以被修改的,自然就是不安全的。上面的代码比较简单,同一时间只会有一个线程读写VAR,不会有什么问题,所以用unsafe来标记就可以。如果是更多的线程,还是请使用接下来要介绍的同步机制来处理。
static如此,那const呢? const会在编译时内联到代码中,所以不会存在某个固定的内存地址上,也不存在可以修改的情况,并不是内存共享的。

7.4.2 堆

由于现代操作系统的设计,线程寄生于进程,可以共享进程的资源,如果要在各个线程中共享一个变量,那么除了上面的static,还有就是把变量保存在堆上了。

基于Arc创建资源就是使用了堆,可以从Arc::New查看到。

use std::thread;
use std::sync::Arc;

fn main() {
    let var : Arc<i32> = Arc::new(5);
    let share_var = var.clone();

    // 创建一个新线程
    let new_thread = thread::spawn(move|| {
        println!("share value in new thread: {}, address: {:p}", share_var, &*share_var);
    });

    // 等待新建线程先执行
    new_thread.join().unwrap();
    println!("share value in main thread: {}, address: {:p}", var, &*var);
}

运行结果:

share value in new thread: 5, address: 0x2825070
share value in main thread: 5, address: 0x2825070

可以看出,它们的内存地址相同。

7.5 同步

如果是要在多个线程中使用,就需要面临两个关键问题:

1.资源何时释放?

2.线程如何安全的并发修改和读取?

由于上面两个问题的存在,这就是为什么我们不能直接用Box变量在线程中共享的原因,可以看出来,共享内存比消息传递机制似乎要复杂许多。Rust用了引用计数的方式来解决第一个问题(即引入Arc。)。

关于上面的第二个问题,Rust语言及标准库提供了一系列的同步手段来解决。

同步指的是线程之间的协作配合,以共同完成某个任务。在整个过程中,需要注意两个关键点:一是共享资源的访问, 二是访问资源的顺序。

在前面的章节中描述了如何访问共享资源,在本节中重点说明访问资源的顺序。

7.5.1 控制访问顺序--等待与通知

等待有三种:

1.主动等待一段时间

2.主动放弃一段时间片

3.被动等待,被动唤醒

通知:

看是简单的通知,在编程时也需要注意以下几点:

1.通知必然是因为有等待,所以通知和等待几乎都是成对出现的,比如std::sync::Condvar::wait和std::sync::Condvar::notify_one,std::sync::Condvar::notify_all。

2.等待所使用的对象,与通知使用的对象是同一个对象,从而该对象需要在多个线程之间共享,参见下面的例子。

3.除了Condvar之外,其实锁也是具有自动通知功能的,当持有锁的线程释放锁的时候,等待锁的线程就会自动被唤醒,以抢占锁。关于锁的介绍,在下面有详解。

4.通过条件变量和锁,还可以构建更加复杂的自动通知方式,比如std::sync::Barrier。

5.通知也可以是1:1的,也可以是1:N的,Condvar可以控制通知一个还是N个,而锁则不能控制,只要释放锁,所有等待锁的其他线程都会同时醒来,而不是只有最先等待的线程。

例子:

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

fn main() {

    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = pair.clone();

    // 创建一个新线程
    thread::spawn(move|| {
        let &(ref lock, ref cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        cvar.notify_one();
        println!("notify main thread");
    });

    // 等待新线程先运行
    let &(ref lock, ref cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        println!("before wait");
        started = cvar.wait(started).unwrap();
        println!("after wait");
    }
}

输出结果:

before wait
notify main thread
after wait

这个例子展示了如何通过条件变量和锁来控制新建线程和主线程的同步,让主线程等待新建线程执行后,才能继续执行。从结果来看,功能上是实现了。对于上面这个例子,还有下面几点需要说明:

  1. Mutex是Rust中的一种锁。
    2.Condvar需要和Mutex一同使用,因为有Mutex保护,Condvar并发才是安全的。
    3.Mutex::lock方法返回的是一个MutexGuard,在离开作用域的时候,自动销毁,从而自动释放锁,从而避免锁没有释放的问题。
    4.Condvar在等待时,会释放锁的,被通知唤醒时,会重新获得锁,从而保证并发安全。

7.5.2控制访问顺序的机制-原子类型与锁

7.5.2.1 原子类型锁

原子类型是最简单的控制共享资源访问的一种机制,相比较于后面将介绍的锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,从硬件到操作系统,到各个语言,基本都支持。在标准库std::sync::atomic中,你将在里面看到Rust现有的原子类型,包括AtomicBool,AtomicIsize,AtomicPtr,AtomicUsize。这4个原子类型基本能满足百分之九十的共享资源安全访问的需要。下面我们就用原子类型,结合共享内存的知识,来展示一下一个线程修改,一个线程读取的情况:

use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

fn main() {
    let var : Arc<AtomicUsize> = Arc::new(AtomicUsize::new(5));
    let share_var = var.clone();

    // 创建一个新线程
    let new_thread = thread::spawn(move|| {
        println!("share value in new thread: {}", share_var.load(Ordering::SeqCst));
        // 修改值
        share_var.store(9, Ordering::SeqCst);
    });

    // 等待新建线程先执行
    new_thread.join().unwrap();
    println!("share value in main thread: {}", var.load(Ordering::SeqCst));
}

输出结果:

share value in new thread: 5
share value in main thread: 9

7.5.2.1 Mutex

为了保障锁使用的安全性问题,Rust做了很多工作,但从效率来看还不如原子类型,那么锁是否就没有存在的价值了?显然事实不可能是这样的,既然存在,那必然有其价值。它能解决原子类型锁不能解决的那百分之十的问题。

use std::sync::{Arc, Mutex, Condvar};
use std::thread;

fn main() {

    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = pair.clone();

    // 创建一个新线程
    thread::spawn(move|| {
        let &(ref lock, ref cvar) = &*pair2;
        let mut started = lock.lock().unwrap();
        *started = true;
        cvar.notify_one();
        println!("notify main thread");
    });

    // 等待新线程先运行
    let &(ref lock, ref cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        println!("before wait");
        started = cvar.wait(started).unwrap();
        println!("after wait");
    }
}

代码中的Condvar就是条件变量,它提供了wait方法可以主动让当前线程等待,同时提供了notify_one方法,让其他线程唤醒正在等待的线程。这样就能完美实现顺序控制了。看起来好像条件变量把事都做完了,要Mutex干嘛呢?为了防止多个线程同时执行条件变量的wait操作,因为条件变量本身也是需要被保护的,这就是锁能做,而原子类型做不到的地方。

在Rust中,Mutex是一种独占锁,同一时间只有一个线程能持有这个锁。这种锁会导致所有线程串行起来,这样虽然保证了安全,但效率并不高。对于写少读多的情况来说,如果在没有写的情况下,都是读取,那么应该是可以并发执行的,为了达到这个目的,几乎所有的编程语言都提供了一种叫读写锁的机制,Rust中也存在,叫std::sync::RwLock

7.6 并行

在rust中,并行借助第三方库实现rayon。rayon尽可能封装,但并行本身就比较复杂,这节略去不说。

目前的应用开发上应该很少会涉及。

7.7 总结

和像 Mutex<T> 和 Arc<T> 这样可以安全的用于并发上下文的智能指针。类型系统和借用检查器会确保这些场景中的代码,不会出现数据竞争和无效的引用。

所有权和生命周期+ Send 和 Sync(本质上为类型系统,向多线程发送的过程中首先进行检查,保证其在编译环境下多线程情况下绝对安全。)来为并发编程提供了安全可靠的基础设施。使得程序员可以放心在其上构建稳健的并发模型。

一旦代码可以编译了(除了RefCell和unsafe模块),我们就可以坚信这些代码可以正确的运行于多线程环境,而不会出现其他语言中经常出现的那些难以追踪的 bug。并发编程不再是什么可怕的概念:无所畏惧地并发吧!

无畏并发并不是保证没有bug,代码有问题还是会出现死锁。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容