聊聊kotlin.coroutines【java协程】(1)

kotlin这个夏天java最有竞争力的语言。关于它的语法糖在这就不一一阐述了,毕竟它能甜死你。
先说说什么是协程吧,用户态的子线程,轻量级,进程->线程->协程。

进程、线程、协程的关系和区别:
进程拥有自己独立的堆和栈,既不共享堆,亦不共享栈,进程由操作系统调度。
线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,线程亦由操作系统调度(标准线程是的)。
协程和线程一样共享堆,不共享栈,协程由程序员在协程的代码里显示调度。

协程的好处如下:
1.减少cpu线程上下文切换的开销
2.降低了内存消耗;
3.提高了cpu缓存命中率;
4.整体上提高了性能;
5.不提高硬件的前提下,提升了系统的负载能力。

只需要极少的栈内存(大概是4~5KB),默认情况下,线程栈的大小为1MB,一个线程可以开启数十万的协程,线程占用的内存开销远比协程要大得多。
golang原生就实现了协程,由runtime自行管理,一个go关键字就能开启goroutine。简直完美,但是今天要讲的不是golang。

总之,协程就是便宜,廉价,高效的代名词。

java里面要拥有这种高性能的协程,要通过第三方包来实现quasarcomsatkilim

上面这三位,就是目前所有java里面能快速实现coroutines的jar。
quasar:通过织入java字节码的方式,改变字节码结果,来使用,javaagent的方式
comsat:quasar的包装版本,提供轻量化的包装能快速使用。
kilim:和quasar一样,也要织入字节码来使用

但都有一个问题,必须预先给到注解,以上都能通过编译,但是到了linux环境,需要通过javaagent,因字节码被改写,无法追踪具体问题。协程管理是个大问题,会被线程kill,无故消失,笔者通过大半个月的实验,发现它们无法通过大部分环境,因而放弃。

kotlin.corouties

kotlin.corouties真是个非常好的api。语法简化,可以和golang的go关键字有得一拼。但在目前的kotlin api中是实验性质,不过已经具备上生产环境的能力,预计会在1.1.5中正式发布。因kotlin和java可以混编,所以coroutines是个下个高并发必备的知识点了。

kotlin.corouties调度器

CommonPool 调度器默认是通过fork/join的方式实现,目前还不提供接口,做自定义实现
launch(CommonPool)

Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
It uses[java.util.concurrent.ForkJoinPool]when available, which implements efficient work-stealing algorithm for its queues, so every coroutine resumption is dispatched as a separate task even when it already executes inside the pool.When available, it wraps ForkJoinPool.commonPool and provides a similar shared pool where not.

也就是说,kotlin的协程是并行调度的,关于fork/join也可以单独开一章讲了,暂不表。

Unconfined 调度器,默认是主线程调度 ,无限制启动协程,一旦协程睡了或者挂了,会启动新的协程

launch(Unconfined)

A coroutine dispatcher that is not confined to any specific thread.
It executes initial continuation of the coroutine right here in the current call-frame
and let the coroutine resume in whatever thread that is used by the corresponding suspending function, without
mandating any specific threading policy.

Note, that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
but still want to execute it in the current call-frame until its first suspension, then you can use
an optional [CoroutineStart] parameter in coroutine builders like [launch] and [async] setting it to the
the value of [CoroutineStart.UNDISPATCHED].

ThreadPoolDispatcher.newSingleThreadContext调度器,单个线程的调度器

launch(newSingleThreadContext("MyOwnThread"))

Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
All continuations are dispatched immediately when invoked inside the thread of this context.
Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
The specified [name] defines the name of the new thread.
An optional [parent] job may be specified upon creation.

launch(newFixedThreadPoolContext(100,"MyOwnThread")) 调度器,指定线程数量的调度器

Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
All continuations are dispatched immediately when invoked inside the threads of this context.
Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
The specified [name] defines the names of the threads.
An optional [parent] job may be specified upon creation.

默认请全部使用launch(CommonPool),有特殊的限制问题,再考虑其他的调度器

launch(CommonPool) 异步协程开启

async(CommonPool) 同步协程开启

官方示例的Hello,World!,欢迎进入kotlin协程的世界

fun main(args: Array<String>) {
    launch(CommonPool) { // create new coroutine in common thread pool
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main function continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

kotlin中的sleep将不能暂停协程了,是个大坑,后面会讲到。

launch 启动协程,默认情况下直接开始执行,也可以显式执行

var job= launch(CommonPool) 
  if(job.isActive){
          job.cancel()
       }else{
            job.start()
     }

job任务可以根据需要什么时候开始执行,是否存活,取消等,提供了一系列api
有个小事,kotlin去掉了; 估计这个又可以引发一波大战

CommonPool 调度器
delay将会暂停1秒协程运行,
printlin是kotlin的打印方法,等同于System.out.printlin
Thread.sleep 这句只能暂停启动协程的线程

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) { // create new coroutine and keep a reference to its Job
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // wait until child coroutine completes
}

runBlocking<Unit> 启动一个非阻塞并且无返回值的任务
job.join() 等待协程任务完成

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
            doWorld() 
    }
    println("Hello,")
    job.join()
}

// this is your first suspending function
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

这个讲suspend 关键字,为的是代码分离,不然就只能在 launch(CommonPool){}内部用delay来睡协程了,去掉了suspend是无法在其他方法调用delay睡协程了,直接编译错误。

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = List(100_000) { // create a lot of coroutines and list their jobs
        launch(CommonPool) {
            delay(1000L)
            print(".")
        }
    }
    jobs.forEach { it.join() } // wait for all jobs to complete
}

这个例子比较搞,启动100K的协程,如果你像作者一样,2G内存的渣机可能直接out-of-memory error,像笔者这样的8G大内存,是没有一点问题的。轻松愉快500ms执行完毕。

这个例子也是为了展示协程的轻量级和强悍,线程别说100K,就算10K,你的CPU和内存分分钟炸了,只能重启。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        var nextPrintTime = 0L
        var i = 0
        while (isActive) { // cancellable computation loop
            val currentTime = System.currentTimeMillis()
            if (currentTime >= nextPrintTime) {
                println("I'm sleeping ${i++} ...")
                nextPrintTime = currentTime + 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to see if it was cancelled....
    println("main: Now I can quit.")
}

delay的例子太多,单独讲一个。启动了一个协程任务去计算当前的时间,然后你会发现协程内置了一个isActive属性,这也是线程内部唯三的三大内置属性之一。其他的两个为context和coroutineContext,不过context已经被放弃了,大概是作者觉得context,词不达意吧,从这点也可以发现kotlin不会随意的删除api,而是通过重命名,重载的方式提供新的。

isActive:如果协程处于存活或任务未完成,状态就返回true,如果取消或已完成,则返回false

例子的意思也很明显告诉你如果任务在delay时间内未被cancel则一直计算下去并打印三次I'm sleeping,然后任务被cancel,协程取消。主线程输出main: Now I can quit

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = launch(CommonPool) {
        try {
            repeat(1000) { i ->
                println("I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            run(NonCancellable) {
                println("I'm running finally")
                delay(1000L)
                println("And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    delay(1300L) // delay a bit to ensure it was cancelled indeed
    println("main: Now I can quit.")
}

这个例子讲的不可取消, run(NonCancellable)+finally=绝对执行的代码
run(NonCancellable)协程内部启动一个新的协程,并且不能取消,霸道总裁般的代码
run...{}内可以使用coroutineContext,跟上一级的协程块代码做交互。

fun main(args: Array<String>) = runBlocking<Unit> {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

repeat(1000) :迭代器,输入要迭代的次数:1000次
withTimeout(1300L) 时间1.3秒。
这里讲这个wiathTimeout主要是为了控制协程的超时时间,避免协程,一直在活动。虽然便宜,不代表能让任务一直执行下去,到了超时的时间会直接抛出异常

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}
fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(CommonPool, CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

三个例子
measureTimeMillis :返回代码块的执行耗时,比起java,话就是少,就是这么屌

CoroutineStart:协程的执行模式(async和launch都可以用)

LAZY
懒加载

DEFAULT 默认的模式
默认 - 根据其上下文立即执行。

ATOMIC
根据其上下文原则(不可取消)计划协调执行。
跟[DEFAULT]类似,但协程在开始执行前无法取消。

UNDISPATCHED
未分派:暂不明白用途

println("The answer is ${one.await() + two.await()}")
kotlin执行计算可在字符串中一起计算
.await实际拿到的是协程返回的值,在例子中也就是13和29


suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 20
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 20
}


// The result type of asyncSomethingUsefulOne is Deferred<Int>
fun asyncSomethingUsefulOne() = async(CommonPool) {
    doSomethingUsefulOne()
}

// The result type of asyncSomethingUsefulTwo is Deferred<Int>
fun asyncSomethingUsefulTwo() = async(CommonPool)  {
    doSomethingUsefulTwo()
}


// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = asyncSomethingUsefulOne()
        val two = asyncSomethingUsefulTwo()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

runBlocking{}是个同步非阻塞的代码块执行器,能统一拿到coroutines的返回值,支持泛型和接受返回参,多个或单个协程一旦启动后我们要拿返回值不仅可以用await,也可以用runBlocking

      var result= runBlocking<Int> {
            var resultint = one.await() + two.await()
            println("The answer is resultint="+resultint)
            //基本类型直接这样写就可以
            resultint
        }
     println(result)

============================================================================

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
        println("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
        println("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

介绍了多个调度器

launch(Unconfined) 
launch(coroutineContext):这个调度器只有在runBlocking内部才能用,严格来说不算调度器,内部协程的下上文中,继续启动协程

launch(CommonPool) 
launch(newSingleThreadContext("MyOwnThread")) 

具体解释看开篇的说明

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = arrayListOf<Job>()
    jobs += launch(Unconfined) { // not confined -- will work with main thread
        println("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("      'Unconfined': After delay in thread ${Thread.currentThread().name}")
    }
    jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
        println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
    }
    jobs.forEach { it.join() }
}

println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
这一句将会在新的协程中打印出来,因为协程本身被delay了

private val log = LoggerFactory.getLogger(X::class.java)

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main(args: Array<String>) = runBlocking<Unit> {
    val a = async(coroutineContext) {
        log.info("I'm computing a piece of the answer")
        log("I'm computing a piece of the answer")
        6
    }
    val b = async(coroutineContext) {
        log.info("I'm computing another piece of the answer")
        log("I'm computing a piece of the answer")
        7
    }
    log.info("The answer is ${a.await() * b.await()}")
}

这里要讲的是日志:如果你是lombok的使用者,那么很遗憾,lombox现在暂不支持在kotlin使用@Slf4j或者@Log4j

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
这一句是官方示例给的,最好别用

private val log = LoggerFactory.getLogger(X::class.java)

跟以前一样用LoggerFactory拿就好了

fun main(args: Array<String>) = runBlocking<Unit> {
    println("My job is ${coroutineContext.get(Job.Key)}")
    println("My job is ${coroutineContext.get(Job)}")
    println("My job is ${coroutineContext[Job]}")
}

runBlocking<Unit> 这个老伙计了,老伙计本身其实也是coroutines启动的没想到吧,惊不惊喜,意不意外。这种设计就跟golang一样,有个统一的runtime管理器,但这里是显式的。
它被设计出来最大的原因就是阻塞执行了,在它内部可以启动多个async协程,然后共同计算出一个复杂的对象,然后统一返回给runBlocking,外部就可以直接接收

maven配置
其实可以直接引用kotlinx-coroutines-core,不过它的依赖项会晚于官方的发布版本所以我们先排除它的依赖自己引用最新版的kotlin

kotlin-stdlib-jre8或者kotlin-stdlib-jre7
或者直接就用kotlin-stdlib都是可以的。

          <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-stdlib</artifactId>
                <version>1.1.3-2</version>
            </dependency>

       
          <!--  <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-stdlib-jre8</artifactId>
                <version>1.1.3-2</version>
            </dependency>-->

               <!--  <     <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-stdlib-jre7</artifactId>
                <version>1.1.3-2</version>
            </dependency>-->


            <dependency>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-test</artifactId>
                <version>1.1.3-2</version>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <artifactId>kotlin-stdlib</artifactId>
                        <groupId>org.jetbrains.kotlin</groupId>
                    </exclusion>
                </exclusions>
            </dependency>

            <dependency>
                <groupId>org.jetbrains.kotlinx</groupId>
                <artifactId>kotlinx-coroutines-core</artifactId>
                <version>0.17</version>
                <exclusions>
                    <exclusion>
                        <artifactId>kotlin-stdlib</artifactId>
                        <groupId>org.jetbrains.kotlin</groupId>
                    </exclusion>
                </exclusions>
            </dependency>



plugin

           <plugin>
                    <groupId>org.jetbrains.kotlin</groupId>
                    <artifactId>kotlin-maven-plugin</artifactId>
                    <version>1.1.3-2</version>
                    <executions>
                        <execution>
                            <id>compile</id>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>test-compile</id>
                            <phase>test-compile</phase>
                            <goals>
                                <goal>test-compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <executions>
                        <execution>
                            <id>compile</id>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>testCompile</id>
                            <phase>test-compile</phase>
                            <goals>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

关于Thread.sleep,这个最好不要在使用了coroutines后继续用了,
如果是你配合spring-retry这种线程sleep的框架更要注意,高并发的情况下如果线程sleep,可能会导致线程无法唤醒,整个应用处理不了请求

今天就聊到这,delay一下。欢迎留言交流关于kotlin.coroutines的问题
kotlinx.coroutines

下一章,应该是下周

转载请联系我本人授权

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容