Kotlin/Native 异步并发模型(1)—— Worker 与对象子图

Kotlin/Native 现状的一些讨论

Kotlin/Native 编写的程序作为一种原生二进制程序,没有强大的运行时虚拟机来提供各种运行时的保障,
因此它需要重新思考一套自己的异步并发模型。实际上 JVM 这一套机制是 C/C++
这种传统命令式编程语言的线程同步机制的延续,但 Kotlin 在编程范式上吸收了部分函数式编程的特性,因此 Kotlin/Native
的同步方案从设计思想上向函数式编程靠拢,即对象不可变,其宗旨就是如果对象本身不可变,那就不存在线程安全的问题。

Kotlin/Native 中,我们能实现的异步和并发方案有好几种,甚是混乱。第一种方式是,我们可以直接使用相关操作系统平台提供的 API
来自己开启线程,例如在 Linux 上,我们就可以像写 C 语言程序一样,自己手动调用 pthread_create
来创建线程,但是这样写出来的代码就违反了平台通用性的原则,例如如果你要将你的程序移植到 Windows 上,那异步并发方式就得全部改用
Windows 平台的机制,可移植性太差,在编写多平台程序的时候这种方式就很丑陋。

Kotlin/Native 自身提供给了我们两套异步并发的 API,首先是协程,但 Kotlin/Native 的协程与 Kotlin/JVM
的协程区别很大,Kotlin/Native 的协程是单线程的,也就是说它只能用来执行一些不占用 CPU 资源的并发任务,例如网络请求,如果要利用 CPU
多核的能力来进行并行计算,Native 版的协程就失去了作用,当然,官方说了要尽快解决这个问题,并且前几天(2019 年 12
月底)我发现官方已经发布了 Native 多线程版协程的预览版本,这个会在后文详细讨论。因为当前主分支版本的协程不能并行计算,因此官方在 Kotlin/Native
诞生之初就已经提供了另一套专门做并行任务的工具,即 WorkerWorker 与 Kotlin/Native 的异步并发模型紧密相连,做到了既能利用 CPU
多核能力,又能保障线程安全(虽然做法很粗暴)。这篇文章我们会讨论 Worker 与 Kotlin/Native 异步并发机制,而协程将在下一篇讨论。

Worker 初步使用

首先用 Intellij IDEA 创建一个基本的 Kotlin/Native 工程。我当前电脑的操作系统版本是
macOS 10.15.1,因此后面的一些示例和测试方案都基于该系统,作为类 Unix 系统,Linux 上的对应行为可能也相差无几,
但是这些示例不保证在 Windows 等系统上也全部可用,或行为完全一致。

先来看看 Worker 怎么用。然后我们在 main 函数中编写以下代码:

fun main() {
    val worker = Worker.start(true, "worker1")  // 1
    worker.execute(TransferMode.SAFE, { 2 + 1 }) {
        (it + 100).toString()
    }.consume {
        println(it)
    }
}

使用 Worker.start 函数我们就可以创建一个新的 Worker,然后调用 Workerexecute
函数就可以在别的线程执行任务了。这个函数接收三个参数,第一个我们先不看,第二个参数,即示例中的 { 2 + 1 }
将扮演一个生产者的角色(为了简便,后文我们使用源码中的命名 producer 来称呼它),它会在外面的线程执行,producer
的返回值将在第三个参数(也是个 lambda 表达式,同样,后文我们用源码中的命名 job 来称呼它)中作为参数来提供。
而 job 中的代码会在别的线程中执行。
最后 execute 函数的返回结果是一个 Future<T> 类型的对象,调用它的成员函数 consume
即可获得在 job 执行的结果。运行代码验证一下,结果如下:

103

现在还要验证一个问题,producer 与 job 还有 consume 到底在哪个线程执行,虽然官方文档肯定不会骗我们,但是我们自己要掌握验证的方法:

fun main() {
    val worker = Worker.start(true, "worker1")
    println("位置 1 的线程 id:${pthread_self()!!.rawValue.toLong()}")
    worker.execute(TransferMode.SAFE, {
        println("位置 2 的线程 id:${pthread_self()!!.rawValue.toLong()}")
        2 + 1
    }) {
        println("位置 3 的线程 id:${pthread_self()!!.rawValue.toLong()}")
        (it + 100).toString()
    }.consume {
        println("位置 4 的线程 id:${pthread_self()!!.rawValue.toLong()}")
        // println(it)
    }
}

我们在 3 个位置上都使用 pthread_self() 函数来打印当前线程 id,输出如下:

位置 1 的线程 id:4484095424
位置 2 的线程 id:4484095424
位置 3 的线程 id:123145437896704
位置 4 的线程 id:4484095424

果然,官方文档诚不欺我(手动狗头)。

有了直观的认识之后,我们会发现 Worker 用起来和协程中的 async/await 有点像。但是我们发现它比 async/await
要麻烦,同样,我们先不看 execute 函数的第一个参数,我们可能会觉得 producer 有点多此一举,为什么在其他线程执行的 job
必须使用 producer 传递过来的参数,它直接捕捉上下文的变量不行吗?为了验证这一点,于是就有了如下代码:

fun main() {
    val worker = Worker.start(true, "worker1")
    val a = "第二个参数是干啥用的?"
    worker.execute(TransferMode.SAFE, { 2 + 1 }) {
        println(a)
        (it + 100).toString()
    }.consume {
        println(it)
    }
}

重新运行程序,直接编译报错:

e: kotlin.native.concurrent.Worker.execute must take an unbound, non-capturing function or lambda

为了让信息简洁一点,上面复制过来的报错信息省略了报错的文件以及行数。我们可以看到报错信息中说,在 Worker
中执行的函数或 lambda 表达式不能有变量捕捉。于是,这就代表着,producer 是 job 与外界线程进行数据传递的唯一入口,job
无法通过变量捕捉自由访问外界线程的对象。这么看起来 Worker 的实际太粗暴了,如果我要一次传递两个对象怎么办?用
Pair 包装一下,那一次要传递三个对象呢?用 Triple!四个呢?呃……F**k。

对象的传递

现在,我们在主线程创建了一个对象,我们想把它传递到 Worker 中,由于 producer 是在外部线程中运行的,
且对外部的对象进行变量捕捉不会失败,因此我们自然而然可能会写出如下代码。

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        it
    }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

然后理所当然的运行报错:

Uncaught Kotlin exception: kotlin.IllegalStateException: Illegal transfer state

然后我们去看看 execute 的第一个参数 TransferMode,这是一个枚举类型,共有两个枚举值,
我们去源码注释中看看这两个值的区别:

……

不复制粘贴了,有点长,大意就是:在 SAFE 模式下,如果传递到 Worker 的对象可被别的线程或 Worker 引用到,则直接抛出异常,而在
UNSAFE 模式下,不做检查,而是直接把对象传递过去,但是有可能会造成程序崩溃。接下来我们要验证两个事情:

第一,当主线程把对象传递给 Worker 后就不再持有对该对象的引用,SAFE 模式是否可以正常工作:

fun main() {
    val worker = Worker.start(true, "worker1")
    var testData: TestData? = TestData()
    val future = worker.execute(TransferMode.SAFE, {
        val data = testData!!
        testData = null
        data
    }) { data ->
        repeat(20000) { data.index++ }
        data
    }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

程序正常打印输出 20000。这样来看 SAFE 模式这样设计的确是合理的,如果主线程将对象传递给 Worker
之后仍然可以继续访问对象,那就可能发生线程安全问题,因此 SAFE 模式直接拒绝了这种事情的发生而抛出异常,但是这样的写法太丑陋了,
如果要实现更优雅的写法,唯一的办法就是让 testData 的引用范围不超出 produce,也就是说把数据产生的过程都写到 produce
里面,虽然这样也没有那么优雅,但是还能接受。

官方提供了一套理论来解释上面示例程序所表现出来的行为:被 producer 传递的对象会被包装一个叫做对象子图(object
subgraph)的东西,对象子图生成之后,原线程就不能再访问对象子图,如果是在 SAFE
模式,就会使用图遍历算法检查对象子图的访问。以上都是目前官方文档的阐述,
关于 Worker 的更多资料我觉得官方在日后还会有更多补充,等到那时再详细分析。

再来看看 UNSAFE 模式:

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = TestData()
    val future = worker.execute(TransferMode.UNSAFE, { testData }) { data ->
        repeat(20000) { data.index++ }
        data
    }
    repeat(20000) { testData.index++ }
    future.consume { println(it.index) }
}

data class TestData(var index: Int = 0)

如果线程访问是安全的,应该输出 40000,但是你每次运行这段代码得到的结果都会不同,反正都小于 40000。所以,果然 UNSAFE
模式简单粗暴,直接撒手不管了,我最初的预测是,当两个线程真正发生同一时刻访问同一个变量的时候会发生崩溃,
而在其他情况下,程序照常运行,就像源码注释里说的那样。但事实并非如此,所以我建议,千万不要靠"人"来保障线程安全,在
99.99% 的情况下都应该使用 SAFE 模式,如果使用 UNSAFE 模式,风险将直接暴露出来,且 Kotlin/Native
没有线程锁来帮你兜底。

对象子图冻结、全局变量以及单例

上面已经讨论了很多情况,但是跨线程访问都是在函数内部,也就是局部变量的跨线程访问。但如果是全局变量、
单例这种在多个函数内都可以访问的变量,情况则会有所不同。

先阐述一个对象子图冻结的概念,对于某些变量,我们确切知道其一定不可变,那对于这种变量,无论在多少个线程中同时访问它都是安全的,
既然如此,那 Kotlin/Native 也没必要对这种变量在访问的时候做子图校验,对于这样的变量,我们就可以称其为被冻结的变量,
官方文档关于这个地方有些前后矛盾,
先说冻结的变量只有枚举一种,但后面又阐述了两种变量冻结的情况(后文会介绍)。还有一种情况,也有可能一个变量一开始是非冻结的,
后面又被冻结了,但是有一点是不变的,那就是已冻结的对象不可解冻。关于在多个 Worker
中访问枚举变量的情况这里也就不举例了,很简单。

下面讲讲几个重要的注解和几种重要的情况

访问全局变量

val abc = "abc"

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(abc)
    }
    future.consume { println(abc) }
}

程序正常运行,打印输出:

abc
abc

这很奇怪,官方文档说全局变量(没有特殊标记)
只能在主线程访问,但是我们明明在子线程访问了它,程序却正常运行。我们把修饰变量 abcval 改成
var 再试一试,程序果然抛出异常:IncorrectDereferenceException

那如果是非 String 的引用类型呢?

val testData = TestData()

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(testData)
    }
    future.consume { println(testData) }
}

class TestData

程序抛出异常:IncorrectDereferenceException,多测试几次后,基本可以得出一个结论:

  • 结论 1:对于原生类型与 String 来说,如果这些变量是用 val 修饰的,则在多个线程中访问没有问题,如果是 var
    修饰的变量,则会抛出异常。对于其他引用类型的全局变量(不加特殊修饰)来说,无论用 val 还是 var
    修饰,都只能在主线程访问。

这条结论是官方文档中没有提到的,也算是踩坑的一个收获。

在这里有个插曲,既然 val 修饰的基本类型与 String 一定是不可变的,那对于局部变量这个结论是否也成立?
我们把对象的传递小节中的第一个示例修改一下:

fun main() {
    val worker = Worker.start(true, "worker1")
    val testData = "abc"
    val future = worker.execute(TransferMode.SAFE, { testData }) {
        println(it)
        it
    }
    future.consume { println(it) }
}

最主要的变化就是把 testData 换成了一个 String,程序正常,多测试几次,对原生类型也是成立的,因此结论 1对局部变量也成立。
其实仔细思考一下,对于 val 修饰的原生类型与 String,从逻辑上确实可以证明它们一定是不可变。

@ThreadLocal 与 @SharedImmutable 以及单例

修改上面的示例:

@ThreadLocal
val testData = TestData()

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(++testData.index)
    }
    future.consume { println(testData.index) }
}

data class TestData(var index: Int = 0)

输出如下:

1
0

结果与官方的相同,如果全局变量使用 @ThreadLocal 修饰,则该变量在每个线程都有不同的副本,即使修改,也在线程之间不可见。

再修改示例,仅仅把上一个示例中的 @ThreadLocal 改成 @SharedImmutable,然后程序抛出异常;再把 println(++testData.index)
改成 println(testData.index) 程序运行正常,根据官方的说法 @SharedImmutable 的作用是将变量冻结,这样的话该变量就可以共享了,
但它毕竟只是一个注解,如果你编写了修改该变量的代码,也只能在运行时才能发现问题。

最后看看单例:

object A {
    var index = 1
}

fun main() {
    val worker = Worker.start(true, "worker1")
    val future = worker.execute(TransferMode.UNSAFE, {}) {
        println(A.index)
    }
    future.consume { println(A.index) }
}

如果运行程序,我们就发现 object 修饰的单例与使用 @SharedImmutable 修饰的全局变量行为是一致的,不过,
单例也可以使用 @ThreadLocal 来修饰,这也就不多说了。

总结以及其他

如果说还有什么是我没有提到的,那应该就是对象子图分离和原始共享内存,不过这两部分内容主要是用于 C
程序与 Kotlin/Native 交互的情况,例如将 Kotlin/Native 对象保存到 C 结构体中,在真实的用例中,我们使用 Kotlin/Native
调用 C 代码的情况应该占绝大多数,而使用 C 调用 Kotlin/Native 应该极少发生,因此以后有机会再探讨这部分内容。

开篇讲过 Worker 是目前 Kotlin/Native 实现并行计算的主要工具,不过 Native 版的协程最近也推出了多线程版本的预览版,
关于这部分内容将是下一篇文章要重点探讨的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350