import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.resume
import kotlin.math.max
/**
* 使用mutex或者Semaphore来实现,释放的时候每个Coroutine都要执行一遍获取锁&释放锁的操作,比较慢
* 下面这个类实现了批量地恢复Coroutine
*
* Mutex issue: https://github.com/Kotlin/kotlinx.coroutines/issues/2371
*/
internal class SuspendCountDownLatch {
private val countDownNumber: AtomicInteger
//哨兵节点
private val firstContinuationNode = ContinuationNode(null)
@Volatile
private var lastContinuationNode = firstContinuationNode
constructor(count: Int) {
countDownNumber = AtomicInteger(count)
}
fun getLockedCount() = max(0, countDownNumber.get())
fun isLocked() = countDownNumber.get() > 0
fun countDown() {
if (!isLocked()) return
if (countDownNumber.decrementAndGet() == 0) {
val firstContinuableNode = releaseAllContinuableNodes()
firstContinuableNode?.let { resumeContinuableNodes(it) }
}
}
suspend fun await() {
if (isLocked()) awaitSlowPath()
}
//CancellableContinuationImpl 能够根据resumeMode+协程上下文进行恢复, 并且在协程已取消时不会造成Crash
private suspend fun awaitSlowPath() = suspendCancellableCoroutine<Unit> { cont ->
val newContinuationNode = ContinuationNode(cont)
//抢占链表尾
while (!tryAddToQueueTail(newContinuationNode)) {
if (!isLocked()) {
cont.resume(Unit)
return@suspendCancellableCoroutine
}
}
//确保其它线程可以接着插入节点
lastContinuationNode = newContinuationNode
}
private fun releaseAllContinuableNodes(): ContinuationNode? {
val emptyNode = ContinuationNode(null)
//占位,确保不会有新的节点进入链表
while (!tryAddToQueueTail(emptyNode)) {
//...
}
if (firstContinuationNode.nextRef.get() == emptyNode) {
//没有可恢复的节点
return null
}
//第一个是哨兵节点
val firstContinuableNode = firstContinuationNode.nextRef.get() ?: return null
//释放除了哨兵以外的所有对象,防止内存泄露
firstContinuationNode.nextRef.set(null)
return firstContinuableNode
}
private fun resumeContinuableNodes(first: ContinuationNode) {
var nextNode: ContinuationNode? = first
while (nextNode != null) {
nextNode.continuation?.resume(Unit)
nextNode = nextNode.nextRef.get()
}
}
private fun tryAddToQueueTail(node: ContinuationNode): Boolean =
lastContinuationNode.nextRef.compareAndSet(null, node)
override fun toString(): String {
return super.toString() + "[Count = " + getLockedCount() + "]"
}
private class ContinuationNode(val continuation: CancellableContinuation<Unit>?) {
val nextRef = AtomicReference<ContinuationNode?>(null)
}
}
Kotlin协程实现 CountDownLatch
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
相关阅读更多精彩内容
- 自从6.0加入动态权限之后,很多地方都要用到,开始的时候使用的原生代码写权限请求,代码格式如: 然后不知道在fra...
- 前言 最近一直在修炼Kotlin,说实话真香真好用,刚好公司准备交给我一个新项目,于是打算直接用Kotlin来构建...
- 众所周知在android中当执行程序的耗时超过5秒时就会引发ANR而导致程序崩溃。由于UI的更新操作是在UI主线程...
- 今天我们来聊聊Kotlin的协程Coroutine。 如果你还没有接触过协程,推荐你先阅读这篇入门级文章What?...
- 1.OKhttp:官方依赖及源码地址,移步github: https://github.com/square/ok...