仓颉语言实现了M:N轻量线程模型,支持在少量系统线程之上创建海量用户线程,在实现层面用户线程对应协程,仓颉runtime会自动管理和调度这些协程。
当用户线程t做I/O等资源访问操作时,若资源尚未就绪,线程t就会被runtime挂起等待、并调入其他线程运行,当资源就绪后又会适时恢复t的执行,高效利用CPU资源,实现高并发能力。
创建线程
创建一个新的仓颉线程,可以使用关键字 spawn 并传递一个无形参的 lambda 表达式,该 lambda 表达式即为在新线程中执行的代码。
main() {
spawn { =>
println("New thread before sleeping")
sleep(100 * Duration.millisecond) // sleep for 100ms.
println("New thread after sleeping")
}
println("Main thread")
}
在上面的例子中,新线程会在主线程结束时一起停止,无论这个新线程是否已完成运行。
访问线程
spawn 表达式的返回类型是 Future<T>,T是线程函数的返回值类型。
public class Future<T> {
public func get(): T
public func get(timeout: Duration): T
public func tryGet(): Option<T>
public func cancel():Unit
public prop thread:Thread
}
get()
阻塞当前线程,等待并获取当前Future<T> 对象对应的线程的结果。
main() {
let fut: Future<Int64> = spawn {=>
//睡眠 1 秒
sleep(1000 * Duration.millisecond)
return 1
}
//等待线程完成
let result: Int64 = fut.get()
println(result) // 1
}
get(timeout: Duration)
阻塞当前线程,等待指定时长并获取当前Future<T> 对象对应的线程的返回值。如果相应的线程在指定时间内未完成执行,则该函数将抛出异常TimeoutException。
cancel()
给当前Future实例对应的仓颉线程发送取消请求。该方法不会立即停止线程执行,仅发送请求,相应地Future类的函数hasPendingCancellation可用于检查线程是否存在取消请求,开发者可以通过该检查来自行决定是否提前终止线程以及如何终止线程。
main(): Unit {
/* 创建线程 */
let future = spawn {
while (true) {
if (Thread.currentThread.hasPendingCancellation) {
return 0
}
}
return 1
}
/* 向线程发送取消请求 */
future.cancel()
let res = future.get()
println(res) // 0
}
同步机制
在并发编程中,如果缺少同步机制来保护多个线程共享的变量,很容易会出现数据竞争问题。
例如以下案例,当创建1000个线程对一个变量加1时,理想输出应该是1000,然而实际每次执行结果都不是1000并且值都不一样。
import std.collection.*
var count = 0
main() {
let list = ArrayList<Future<Int64>>()
for (_ in 0..1000) {
let fut= spawn {
sleep(Duration.millisecond)
count++
return count
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}") // count = 993
}
仓颉编程语言提供三种常见的同步机制来确保数据的线程安全:原子操作、互斥锁和条件变量。
原子操作 Atomic
仓颉提供整数类型、Bool 类型和引用类型的原子操作。
整数类型的原子操作支持基本的读写、交换以及算术运算操作:
| 操作 | 功能 |
|---|---|
| load | 读取 |
| store | 写入 |
| swap | 交换,返回交换前的值 |
| compareAndSwap | 比较再交换,交换成功返回 true,否则返回 false |
| fetchAdd | 加法,返回执行加操作之前的值 |
| fetchSub | 减法,返回执行减操作之前的值 |
| fetchAnd | 与,返回执行与操作之前的值 |
| fetchOr | 或,返回执行或操作之前的值 |
| fetchXor | 异或,返回执行异或操作之前的值 |
Bool类型和引用类型的原子操作只提供读写和交换操作:
| 操作 | 功能 |
|---|---|
| load | 读取 |
| store | 写入 |
| swap | 交换,返回交换前的值 |
| compareAndSwap | 比较再交换,交换成功返回 true,否则返回 false |
将以上案例使用原子操作修改如下:
import std.collection.*
import std.sync.*
var count = AtomicInt64(0);
main() {
let list = ArrayList<Future<Int64>>()
for (_ in 0..1000) {
let fut= spawn {
sleep(Duration.millisecond)
return count.fetchAdd(1)
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count.load()}") //count = 1000
}
可重入互斥锁Mutex
可重入互斥锁的作用是对临界区加以保护,使得任意时刻最多只有一个线程能够执行临界区的代码。当一个线程试图获取一个已被其他线程持有的锁时,该线程会被阻塞,直到锁被释放,该线程才会被唤醒,可重入是指线程获取该锁后可再次获得该锁。
使用可重入互斥锁时,必须牢记两条规则:
1.在访问共享数据之前,必须尝试获取锁;
2.处理完共享数据后,必须释放锁,以便其他线程可以获得锁。
import std.sync.*
import std.collection.*
var count: Int64 = 0
let mtx = Mutex()
main() {
let list = ArrayList<Future<Unit>>()
for (_ in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond)
mtx.lock() // 获取锁
count++
mtx.unlock() // 释放锁
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}") //count = 1000
}
Condition
Condition 实例由互斥锁创建,一个互斥锁可以创建多个 Condition 实例。Condition 可以使线程阻塞并等待来自另一个线程的信号以恢复执行。
调用 Condition 接口的 wait、notify 或 notifyAll 方法前,需要确保当前线程已经持有绑定的锁。
调用 Condition 接口的 wait方法包含如下动作:
1.添加当前线程到对应锁的等待队列中;
2.阻塞当前线程,同时完全释放该锁,并记录锁的重入次数;
3.等待某个其他线程使用同一个 Condition 实例的 notify 或 notifyAll 方法向该线程发出信号;
4.当前线程被唤醒后,会自动尝试重新获取锁,且持有锁的重入状态与第 2 步记录的重入次数相同;但是如果尝试获取锁失败,则当前线程会阻塞在该锁上。
import std.sync.*
let mtx = Mutex()
let condition = synchronized(mtx) {
mtx.condition()
}
var flag: Bool = true
main(): Int64 {
let fut = spawn {
mtx.lock()
while (flag) {
println("执行1")
condition.wait()
println("执行4")
}
mtx.unlock()
}
sleep(10 * Duration.millisecond)
mtx.lock()
println("执行2")
flag = false
mtx.unlock()
mtx.lock()
println("执行3")
condition.notifyAll()
mtx.unlock()
fut.get()
return 0
}
synchronized 关键字
仓颉编程语言提供一个 synchronized 关键字,搭配 Lock 一起使用,可以在其后跟随的作用域内自动进行加锁解锁操作,用来解决类似的问题。
一个线程在进入 synchronized 修饰的代码块之前,会自动获取 Lock 实例对应的锁,如果无法获取锁,则当前线程被阻塞;
一个线程在退出 synchronized 修饰的代码块之前,会自动释放该 Lock 实例的锁。
import std.sync.*
import std.collection.*
var count: Int64 = 0
var mtx: Mutex = Mutex()
main() {
let list = ArrayList<Future<Unit>>()
for (i in 0..10) {
let fut = spawn {
while (true) {
synchronized(mtx) {
count = count + 1
break
}
}
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}") //count = 10
}
ThreadLocal
使用 core 包中的 ThreadLocal 可以创建并使用线程局部变量,每一个线程都有它独立的一个存储空间来保存这些线程局部变量。因此,在每个线程可以安全地访问他们各自的线程局部变量,而不受其他线程的影响。
import std.sync.*
main() {
let tl = ThreadLocal<Int64>()
let fut = spawn {
tl.set(0)
var t=0
for(_ in 0..1000){
t++
}
tl.set(t)
println("tl in spawn1 = ${tl.get().getOrThrow()}") //1000
}
let fut2 = spawn {
tl.set(0)
var t=0
for(_ in 0..10){
t++
}
tl.set(t)
println("tl in spawn2 = ${tl.get().getOrThrow()}") //10
}
fut.get()
fut2.get()
}