1. 面向并发范式
真实世界都是并发的,C++、JAVA语言都提供了进程、线程方式解决并发,Go、Rust语言不仅提供了进程、线程,进一步提供了协程来解决并发问题。
在并发的世界,我们要解决的一类问题就是共享数据同步,rust数据同步的方法有Atomic、Mutex、Condvar、Channel ,下面我们一一探索一下。
2. Atomic
支持Atomic的类型有
AtomicBool A boolean type which can be safely shared between threads.
AtomicI8 An integer type which can be safely shared between threads.
AtomicI16 An integer type which can be safely shared between threads.
AtomicI32 An integer type which can be safely shared between threads.
AtomicI64 An integer type which can be safely shared between threads.
AtomicIsize An integer type which can be safely shared between threads.
AtomicPtr A raw pointer type which can be safely shared between threads.
AtomicU8 An integer type which can be safely shared between threads.
AtomicU16 An integer type which can be safely shared between threads.
AtomicU32 An integer type which can be safely shared between threads.
AtomicU64 An integer type which can be safely shared between threads.
AtomicUsize An integer type which can be safely shared between threads.
例如说声明一个变量AtomicBool,在两个rust线程之间共享,可以用如下代码进行数据同步
while self
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err() {}
想想Java的锁机制,基本的是自旋锁,自旋锁升级为轻量级锁,就是上面这种原子锁,其底层是一个专门的CPU指令。
3. Mutex
互斥锁的概念相信大家比较熟悉,例如:
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(0));
let c_mutex = Arc::clone(&mutex);
thread::spawn(move || {
*c_mutex.lock().unwrap() = 10;
}).join().expect("thread::spawn failed");
assert_eq!(*mutex.lock().unwrap(), 10);
Mutex还有一种用法
use std::sync::{Arc, Mutex};
use std::thread;
let mutex = Arc::new(Mutex::new(0));
let c_mutex = Arc::clone(&mutex);
thread::spawn(move || {
let mut lock = c_mutex.try_lock();
if let Ok(ref mut mutex) = lock {
**mutex = 10;
} else {
println!("try_lock failed");
}
}).join().expect("thread::spawn failed");
assert_eq!(*mutex.lock().unwrap(), 10);
值得一提的是:
- lock的返回值是LockResult<MutexGuard>
- try_lock的返回值是 TryLockResult<MutexGuard>
其中MutexGuard实现了Drop Trait,在MutexGuard离开作用域时,会自动释放锁。
4. CondVar
信号量的用法和C++、Java语言也没有什么不同,CondVard的方法列表有:
- wait
- wait_timeout
- notify
- notify_one
是不是似曾相识,注意和其它语言原理一样,CondVar必须和Mutex配合使用,一个例子如下
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair2 = Arc::clone(&pair);
// Inside of our lock, spawn a new thread, and then wait for it to start.
thread::spawn(move|| {
let (lock, cvar) = &*pair2;
let mut started = lock.lock().unwrap();
*started = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
});
// Wait for the thread to start up.
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
5. Channel
一听名字就似曾相识,参考了go语言的channel机制,一个例子如下:
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Spawn off an expensive computation
thread::spawn(move|| {
sender.send(expensive_computation()).unwrap();
});
// Do some useful work for awhile
// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());
一个channel有个sender和一个receiver,sender发送数据,receiver接收数据,如果通道为空,receiver会被阻塞。
6. 线程同步与操作系统
按照技术的本质,技术都只组装的,rust这些同步技术,也是基于操作系统封装的。
6.1 原子锁
原子锁的底层,是操作系统的lock指令,如下代码
//定义一个原子类型
typedef struct s_ATOMIC{
volatile s32_t a_count; //在变量前加上volatile,是为了禁止编译器优化,使其每次都从内存中加载变量
}atomic_t;
//原子加1
static inline void atomic_inc(atomic_t *v)
{
__asm__ __volatile__("lock;" "incl %0"
: "+m" (v->a_count));
}
在这个基础上封装了原子锁。
6.2 关中断
在单CPU下,可以通过关中断来实现同步, 它的底层是开关中断指令
//关闭中断
void hal_cli()
{
__asm__ __volatile__("cli": : :"memory");
}
//开启中断
void hal_sti()
{
__asm__ __volatile__("sti": : :"memory");
}
6.3 信号量
信号量定义了运行队列和等待队列,就是将队列从运行队列放入等待队列,当notify的时候,就从等待队列取出一条等待线程放入运行队列,是一种重度操作,其数据结构如下
//等待链数据结构,用于挂载等待代码执行流(线程)的结构,里面有用于挂载代码执行流的链表和计数器变量,这里我们先不深入研究这个数据结构。
typedef struct s_KWLST
{
spinlock_t wl_lock;
uint_t wl_tdnr;
list_h_t wl_list;
}kwlst_t;
//信号量数据结构
typedef struct s_SEM
{
spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
uint_t sem_flg;//信号量相关的标志
sint_t sem_count;//信号量计数值
kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
}sem_t;
6.4 自旋锁
信号量都是重度锁,我们可以使用轻度锁----自旋锁。自旋锁的意思就是如果未取到运行权,就死循环。
//自旋锁结构
typedef struct
{
volatile u32_t lock;//volatile可以防止编译器优化,保证其它代码始终从内存加载lock变量的值
} spinlock_t;
//加锁函数
static inline void x86_spin_lock(spinlock_t * lock)
{
__asm__ __volatile__ (
"1: \n"
"lock; xchg %0, %1 \n"//把值为1的寄存器和lock内存中的值进行交换
"cmpl $0, %0 \n" //用0和交换回来的值进行比较
"jnz 2f \n" //不等于0则跳转后面2标号处运行
"jmp 3f \n" //若等于0则跳转后面3标号处返回
"2: \n"
"cmpl $0, %1 \n"//用0和lock内存中的值进行比较
"jne 2b \n"//若不等于0则跳转到前面2标号处运行继续比较
"jmp 1b \n"//若等于0则跳转到前面1标号处运行,交换并加锁
"3: \n" :
: "r"(1), "m"(*lock));
}