Kotlin 的锁和多线程同步

Synchronized.kt 的源码:

/**
 * Executes the given function [block] while holding the monitor of the given object [lock].
 */
@kotlin.internal.InlineOnly
public inline fun <R> synchronized(lock: Any, block: () -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }

    @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
    monitorEnter(lock)
    try {
        return block()
    }
    finally {
        @Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")
        monitorExit(lock)
    }
}

JvmFlagAnnotations.kt 的源码:


/**
 * Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field
 * are immediately made visible to other threads.
 */
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Volatile

/**
 * Marks the JVM backing field of the annotated property as `transient`, meaning that it is not
 * part of the default serialized form of the object.
 */
@Target(FIELD)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Transient

/**
 * Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision
 * of floating point operations performed inside the method needs to be restricted in order to
 * achieve better portability.
 */
@Target(FUNCTION, CONSTRUCTOR, PROPERTY_GETTER, PROPERTY_SETTER, CLASS)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Strictfp

/**
 * Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method
 * will be protected from concurrent execution by multiple threads by the monitor of the instance (or,
 * for static methods, the class) on which the method is defined.
 */
@Target(FUNCTION, PROPERTY_GETTER, PROPERTY_SETTER)
@Retention(AnnotationRetention.SOURCE)
@MustBeDocumented
public actual annotation class Synchronized

如何使用 Synchronized 同步锁:
在Java中,给一个方法加锁 ,需要给方法加 synchronized 关键字

public synchronized void doSomething() {

}

kotlin 中没有 synchronized 关键之,取而代之的是 @Synchronized 注解

class MyUtil {
    @Synchronized
    fun doSomething() {

    }
}

Kotlin 在方法内,可以使用 block 块

class Util {

    val lock = Any()

    fun main() {
        synchronized(lock) {

        }
    }
}

Volatile 关键字

有时仅仅为了读写一个或者两个实例域就使用同步的话,显得开销过大;而Volatile关键字为实例域的同步访问提供了免锁的机制。

当一个共享变量被vlatile修饰时,其就具备两个含义:
1、一个线程修改了变量的值,变量的新值对其他线程是立即可见的。
2、禁止使用指令重排序。
什么是指令重排序呢?
重排序通常是编译器或者运行环境为了优化程序性能而采取的对指令重新排序执行的一种手段。重排序分为两类:编译期重排序和运行期重排序,分别对应着编译时和运行时环境。

在 kotlin 中没有 volatile 关键字,但是有 @Volatile 注解

class Util {

    @Volatile
    var lock = Any()
}

在 kotlin 中的 Any 和 java 中的 Object 相似,每一个类都是从 Any 继承过来的,但是 Any 并没有声明 wait() , notify() 和 notifyAll() 方法,这就意味着,你不能在kotlin类中调用这些方法。但是你仍然能够使用java.lang.Object的实例作为lock,并且调用相关的方法。下面将会展示一个使用 Object 做为 lock 解决生产者和消费者的问题。

    private val lock = Object()

    fun produce() = synchronized(lock) {
          while(items>=maxItems) {
            lock.wait()
          }
          Thread.sleep(Random.nextInt(100).toLong())
          items++
          println("Produced, count is$items:${Thread.currentThread()}")
          lock.notifyAll()
    }

    fun consume() = synchronized(lock) {
        while(items<=0) {
            lock.wait()
        }
        Thread.sleep(Random.nextInt(100).toLong())
        items--
        println("Consumed, count is$items:${Thread.currentThread()}")
        lock.notifyAll()
    }
  • volatile变量具有可见性
  • volatile不保证原子性
  • volatile保证有序性
    volatile能禁止指令的重排序,因此volatile保证有序性。
    volatile禁止指令重排序的含义:当程序执行到volatile变量的时候,在其前面的语句已经全部执行完成,并且结果对后面可见,而且该变量后面的语句都没有执行。

正确使用volatile

synchronized可以防止多个线程同时执行一段代码,这会阻塞一部分线程的执行,这样就会影响程序的执行效率。
而volatile在某些情况下的性能是优于synchronized的。
但是volatile无法替代synchronized,因为volatile无法保证操作的原子性。


通常情况下,我们使用volatile关键字要避开两种场景:
1、对变量的写操作依赖当前值。
2、该变量包含在具有其他变量的不变式中。

使用volatile的场景有很多,这里介绍两种常见的场景:

场景1:状态标志
当多线程执行该类的时候,我们需要对状态标志stop保持可见性,这样我们的运行才能实时保持正确的执行。这种情况如果使用sychronized的话显然要复杂的多。

class TestVolatile1 {

    @Volatile
    var stop = false;

    fun onInit(){
        if(stop){

        }
    }

    fun onStop(){
        stop = true;
    }

}

场景2: 双重检查模式(DCL)

在 Java 中,我们的单例模式经常会这样写,第一次判空是为了不必要的同步操作,第二次判断是只有在MyLock实例==null的时候才会去new一个实例出来,当多线程调用时,当进行这两次判空时,我们需要保证instance的可见性。

public class MyLock {
    private static volatile MyLock instance = null;

    public static MyLock getInstance() {
        if(instance == null){
            synchronized (MyLock.class){
                if(instance == null){
                    instance = new MyLock();

                }
            }
        }
        return instance;
    }

}

在 Kotlin 中实现多线程同步的方式

“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”
在 Kotlin 中我们有多种实现方式:

  1. Thread.join
  2. Synchronized
  3. ReentrantLock
  4. BlockingQueue
  5. CountDownLatch
  6. CyclicBarrier
  7. CAS
  8. Future
  9. CompletableFuture
  10. Rxjava
  11. Coroutine
  12. Flow

我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:

val task1: () -> String = {
    sleep(2000)
    "Hello".also { println("task1 finished: $it") }
}

val task2: () -> String = {
    sleep(2000)
    "World".also { println("task2 finished: $it") }
}

val task3: (String, String) -> String = { p1, p2 ->
    sleep(2000)
    "$p1 $p2".also { println("task3 finished: $it") }
}

1. Thread.join()

Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Thread 的 join() :

@Test
fun test_join() {
    lateinit var s1: String
    lateinit var s2: String

    val t1 = Thread { s1 = task1() }
    val t2 = Thread { s2 = task2() }
    t1.start()
    t2.start()

    t1.join()
    t2.join()
    
    task3(s1, s2)
}

2. Synchronized

使用 synchronized 锁进行同步

    @Test
    fun test_synchrnoized() {
        lateinit var s1: String

        Thread {
            synchronized(Unit) {
                s1 = task1()
            }
        }.start()
        val s2: String = task2()

        synchronized(Unit) {
            task3(s1, s2)
        }

    }

但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized。

3. ReentrantLock

ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用

    @Test
    fun test_ReentrantLock() {

        lateinit var s1: String

        val lock = ReentrantLock()
        Thread {
            lock.lock()
            s1 = task1()
            lock.unlock()
        }.start()
        val s2: String = task2()

        lock.lock()
        task3(s1, s2)
        lock.unlock()

    }

ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务。

4. BlockingQueue

阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果

    @Test
    fun test_blockingQueue() {

        lateinit var s1: String

        val queue = SynchronousQueue<Unit>()

        Thread {
            s1 = task1()
            queue.put(Unit)
        }.start()

        val s2: String = task2()

        queue.take()
        task3(s1, s2)
    }

当然,阻塞队列更多是使用在生产/消费场景中的同步。

5. CountDownLatch

UC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:

    @Test
    fun test_countdownlatch() {

        lateinit var s1: String
        lateinit var s2: String
        val cd = CountDownLatch(2)
        Thread() {
            s1 = task1()
            cd.countDown()
        }.start()

        Thread() {
            s2 = task2()
            cd.countDown()
        }.start()

        cd.await()
        task3(s1, s2)
    }

共享锁的好处是不必为了每个任务都创建单独的锁,即使再多并行任务写起来也很轻松。

6. CyclicBarrier

CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。
与 CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用。

    @Test
    fun test_CyclicBarrier() {

        lateinit var s1: String
        lateinit var s2: String
        val cb = CyclicBarrier(3)

        Thread {
            s1 = task1()
            cb.await()
        }.start()

        Thread() {
            s2 = task1()
            cb.await()
        }.start()

        cb.await()
        task3(s1, s2)

    }

7. CAS

AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。

    @Test
    fun test_cas() {

        lateinit var s1: String
        lateinit var s2: String

        val cas = AtomicInteger(2)

        Thread {
            s1 = task1()
            cas.getAndDecrement()
        }.start()

        Thread {
            s2 = task2()
            cas.getAndDecrement()
        }.start()

        while (cas.get() != 0) {}

        task3(s1, s2)

    }

While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。

8. Future

上面无论有锁操作还是无锁操作,都需要定义两个变量s1、s2记录结果非常不方便。 Java 1.5 开始,提供了 Callable 和 Future ,可以在任务执行结束时返回结果。

    @Test
    fun test_future() {

        val future1 = FutureTask(Callable(task1))
        val future2 = FutureTask(Callable(task2))

        Executors.newCachedThreadPool().execute(future1)
        Executors.newCachedThreadPool().execute(future2)

        task3(future1.get(), future2.get())

    }

通过 future.get(),可以同步等待结果返回,写起来非常方便。

9. CompletableFuture

future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:

    @Test
    fun test_CompletableFuture() {
        CompletableFuture.supplyAsync(task1)
            .thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
                task3(p1, p2)
            }.join()
    }

10. RxJava

RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务

    @Test
    fun test_Rxjava() {

        Observable.zip(
            Observable.fromCallable(Callable(task1))
                .subscribeOn(Schedulers.newThread()),
            Observable.fromCallable(Callable(task2))
                .subscribeOn(Schedulers.newThread()),
            BiFunction(task3)
        ).test().await()

    }

11. Coroutine

前面那么多方式,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:

    @Test
    fun test_coroutine() {

        runBlocking {
            val c1 = async(Dispatchers.IO) {
                task1()
            }

            val c2 = async(Dispatchers.IO) {
                task2()
            }

            task3(c1.await(), c2.await())
        }
    }

12. Flow

Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:

    @Test
    fun test_flow() {

        val flow1 = flow<String> { emit(task1()) }
        val flow2 = flow<String> { emit(task2()) }

        runBlocking {
            flow1.zip(flow2) { t1, t2 ->
                task3(t1, t2)
            }.flowOn(Dispatchers.IO).collect()

        }

    }

FlowOn 使得 Task 在异步计算并发射结果。

总结

作为结论,在 Kotlin 上最好用的线程同步方案首推协程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容