Kotlin协程完全指南:从入门到精通

目录

  1. 什么是协程
  2. 协程的设计思想
  3. 协程基础用法
  4. 协程作用域和上下文
  5. 协程的调度器
  6. 协程的取消和异常处理
  7. 协程的通信机制
  8. 协程的使用场景
  9. 常见错误和最佳实践
  10. 总结

什么是协程

协程(Coroutines)是一种轻量级的并发编程模型,它允许我们以同步的方式编写异步代码。在Kotlin中,协程是一种用于简化异步编程的工具,它可以让我们避免回调地狱,使代码更加清晰和易于维护。

协程的特点

  • 轻量级:协程比线程更轻量,可以创建成千上万个协程而不会消耗太多资源
  • 非阻塞:协程可以挂起而不阻塞线程
  • 结构化并发:协程提供了结构化的并发模型,便于管理和控制
  • 异常安全:协程提供了良好的异常处理机制
// 简单的协程示例
import kotlinx.coroutines.*

fun main() {
    // 启动一个协程
    runBlocking {
        // 在协程中执行代码
        println("Hello")
        delay(1000L) // 挂起1秒,不阻塞线程
        println("World!")
    }
}

协程的设计思想

1. 挂起函数(Suspend Functions)

协程的核心概念是挂起函数。挂起函数是可以被暂停和恢复的函数,它们使用suspend关键字标记。

// 挂起函数的定义
suspend fun doSomething(): String {
    delay(1000L) // 模拟耗时操作
    return "Result"
}

// 挂起函数只能在协程或其他挂起函数中调用
fun main() {
    runBlocking {
        val result = doSomething() // 在协程中调用挂起函数
        println(result)
    }
}

2. 协程构建器(Coroutine Builders)

协程构建器用于创建和启动协程:

import kotlinx.coroutines.*

fun main() {
    // runBlocking: 阻塞当前线程直到协程完成
    runBlocking {
        println("runBlocking 开始")
        
        // launch: 启动一个新协程,不阻塞当前线程
        val job = launch {
            delay(1000L)
            println("launch 协程完成")
        }
        
        // async: 启动一个新协程并返回Deferred对象
        val deferred = async {
            delay(500L)
            "async 结果"
        }
        
        println("等待协程完成...")
        job.join() // 等待launch协程完成
        val result = deferred.await() // 等待async协程完成并获取结果
        println("结果: $result")
    }
}

3. 协程的状态机实现

协程在底层是通过状态机实现的,每个挂起点都是状态机的一个状态:

// 编译器会将这个挂起函数转换为状态机
suspend fun example() {
    println("开始") // 状态0
    delay(1000L)   // 挂起点1
    println("中间") // 状态1
    delay(1000L)   // 挂起点2
    println("结束") // 状态2
}

协程基础用法

1. 基本的协程启动

import kotlinx.coroutines.*

fun main() {
    println("程序开始")
    
    // 使用runBlocking启动协程
    runBlocking {
        println("协程开始")
        delay(2000L) // 挂起2秒
        println("协程结束")
    }
    
    println("程序结束")
}

2. 并发执行多个协程

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking {
    val time = measureTimeMillis {
        // 串行执行
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("串行结果: ${one + two}")
    }
    println("串行执行时间: $time ms")
    
    val time2 = measureTimeMillis {
        // 并发执行
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("并发结果: ${one.await() + two.await()}")
    }
    println("并发执行时间: $time2 ms")
}

// 模拟耗时操作
suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 模拟1秒的工作
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 模拟1秒的工作
    return 29
}

3. 协程的懒启动

import kotlinx.coroutines.*

fun main() = runBlocking {
    // 懒启动的async
    val lazyDeferred = async(start = CoroutineStart.LAZY) {
        println("懒启动协程开始执行")
        delay(1000L)
        "懒启动结果"
    }
    
    println("协程已创建但未启动")
    delay(2000L)
    
    println("现在启动协程")
    val result = lazyDeferred.await() // 启动并等待结果
    println("结果: $result")
}

协程作用域和上下文

1. 协程作用域(CoroutineScope)

协程作用域定义了协程的生命周期和结构化并发的边界:

import kotlinx.coroutines.*

class MyClass {
    // 创建自定义协程作用域
    private val scope = CoroutineScope(Dispatchers.Default + Job())
    
    fun doSomething() {
        // 在自定义作用域中启动协程
        scope.launch {
            repeat(5) { i ->
                println("协程工作中: $i")
                delay(500L)
            }
        }
    }
    
    fun cleanup() {
        // 取消作用域中的所有协程
        scope.cancel()
    }
}

fun main() = runBlocking {
    val myClass = MyClass()
    myClass.doSomething()
    
    delay(2000L) // 让协程运行一段时间
    myClass.cleanup() // 清理资源
    
    delay(1000L) // 观察协程是否被取消
    println("程序结束")
}

2. 协程上下文(CoroutineContext)

协程上下文是一组元素的集合,包括调度器、Job、异常处理器等:

import kotlinx.coroutines.*

fun main() = runBlocking {
    // 使用不同的上下文元素
    val job = launch(
        Dispatchers.IO + // 调度器
        CoroutineName("MyCoroutine") + // 协程名称
        CoroutineExceptionHandler { _, exception -> // 异常处理器
            println("捕获到异常: $exception")
        }
    ) {
        println("协程名称: ${coroutineContext[CoroutineName]}")
        println("当前线程: ${Thread.currentThread().name}")
        
        // 模拟一些工作
        repeat(3) { i ->
            println("工作 $i")
            delay(1000L)
        }
    }
    
    job.join()
}

3. 父子协程关系

import kotlinx.coroutines.*

fun main() = runBlocking {
    println("父协程开始")
    
    val parentJob = launch {
        println("父协程工作中")
        
        // 启动子协程
        val child1 = launch {
            println("子协程1开始")
            delay(2000L)
            println("子协程1完成")
        }
        
        val child2 = launch {
            println("子协程2开始")
            delay(1000L)
            println("子协程2完成")
        }
        
        println("父协程等待子协程完成")
        // 父协程会自动等待所有子协程完成
    }
    
    parentJob.join()
    println("所有协程完成")
}

协程的调度器

调度器决定了协程在哪个线程或线程池中执行:

1. 内置调度器

import kotlinx.coroutines.*

fun main() = runBlocking {
    // Dispatchers.Default - 用于CPU密集型任务
    launch(Dispatchers.Default) {
        println("Default: ${Thread.currentThread().name}")
        // 适合CPU密集型计算
        val result = (1..1000000).sum()
        println("计算结果: $result")
    }
    
    // Dispatchers.IO - 用于IO密集型任务
    launch(Dispatchers.IO) {
        println("IO: ${Thread.currentThread().name}")
        // 适合文件读写、网络请求等
        delay(1000L) // 模拟IO操作
        println("IO操作完成")
    }
    
    // Dispatchers.Main - 用于UI更新(Android中)
    // 注意:在普通JVM程序中Main调度器不可用
    
    // Dispatchers.Unconfined - 不限制调度器
    launch(Dispatchers.Unconfined) {
        println("Unconfined 1: ${Thread.currentThread().name}")
        delay(100L)
        println("Unconfined 2: ${Thread.currentThread().name}")
    }
    
    delay(2000L) // 等待所有协程完成
}

2. 自定义调度器

import kotlinx.coroutines.*
import java.util.concurrent.Executors

fun main() = runBlocking {
    // 创建自定义线程池调度器
    val customDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
    
    try {
        // 使用自定义调度器
        val jobs = List(10) { i ->
            launch(customDispatcher) {
                println("任务 $i 在线程: ${Thread.currentThread().name}")
                delay(1000L)
                println("任务 $i 完成")
            }
        }
        
        // 等待所有任务完成
        jobs.forEach { it.join() }
    } finally {
        // 关闭自定义调度器
        customDispatcher.close()
    }
}

协程的取消和异常处理

1. 协程取消

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("任务 $i")
                delay(500L)
            }
        } catch (e: CancellationException) {
            println("协程被取消: ${e.message}")
        } finally {
            println("清理资源")
        }
    }
    
    delay(2000L) // 让协程运行2秒
    println("取消协程")
    job.cancel() // 取消协程
    job.join() // 等待协程完成清理
    println("协程已取消")
}

2. 协程取消的配合性

import kotlinx.coroutines.*

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        
        // 检查协程是否被取消
        while (isActive) { // isActive是协程的扩展属性
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("任务 ${i++}")
                nextPrintTime += 500L
            }
        }
    }
    
    delay(1300L)
    println("取消协程")
    job.cancelAndJoin() // 取消并等待协程完成
    println("协程已取消")
}

3. 异常处理

import kotlinx.coroutines.*

fun main() = runBlocking {
    // 使用CoroutineExceptionHandler处理未捕获的异常
    val handler = CoroutineExceptionHandler { _, exception ->
        println("捕获到异常: $exception")
    }
    
    val job = launch(handler) {
        // 模拟异常
        throw RuntimeException("协程中的异常")
    }
    
    job.join()
    
    // 使用try-catch处理async中的异常
    val deferred = async {
        delay(1000L)
        throw IllegalStateException("async中的异常")
    }
    
    try {
        deferred.await()
    } catch (e: Exception) {
        println("捕获async异常: $e")
    }
}

4. SupervisorJob的使用

import kotlinx.coroutines.*

fun main() = runBlocking {
    // 使用SupervisorJob,子协程的异常不会影响其他子协程
    val supervisor = SupervisorJob()
    
    with(CoroutineScope(coroutineContext + supervisor)) {
        // 第一个子协程会失败
        val firstChild = launch {
            println("第一个子协程开始")
            delay(1000L)
            throw RuntimeException("第一个子协程失败")
        }
        
        // 第二个子协程会成功完成
        val secondChild = launch {
            println("第二个子协程开始")
            delay(2000L)
            println("第二个子协程完成")
        }
        
        // 等待第一个子协程失败
        try {
            firstChild.join()
        } catch (e: Exception) {
            println("第一个子协程异常: $e")
        }
        
        // 第二个子协程仍然会完成
        secondChild.join()
    }
}

协程的通信机制

1. Channel(通道)

Channel是协程之间通信的管道:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // 创建一个Channel
    val channel = Channel<Int>()
    
    // 生产者协程
    launch {
        for (x in 1..5) {
            println("发送: $x")
            channel.send(x) // 发送数据到channel
            delay(1000L)
        }
        channel.close() // 关闭channel
    }
    
    // 消费者协程
    launch {
        // 从channel接收数据
        for (y in channel) {
            println("接收: $y")
        }
        println("消费者完成")
    }
    
    delay(6000L)
}

2. 不同类型的Channel

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // 无缓冲Channel(默认)
    val unbufferedChannel = Channel<String>()
    
    // 有缓冲Channel
    val bufferedChannel = Channel<String>(capacity = 4)
    
    // 无限缓冲Channel
    val unlimitedChannel = Channel<String>(Channel.UNLIMITED)
    
    // 冲突Channel(新值会覆盖旧值)
    val conflatedChannel = Channel<String>(Channel.CONFLATED)
    
    // 演示缓冲Channel
    launch {
        repeat(10) { i ->
            println("发送: $i")
            bufferedChannel.send("Item $i")
            println("已发送: $i")
        }
        bufferedChannel.close()
    }
    
    delay(2000L) // 让生产者先运行
    
    // 消费数据
    for (item in bufferedChannel) {
        println("接收: $item")
        delay(1000L) // 模拟处理时间
    }
}

3. produce构建器

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // 使用produce构建器创建生产者
    val squares = produce {
        for (x in 1..5) {
            send(x * x) // 发送平方数
            delay(1000L)
        }
    }
    
    // 消费数据
    squares.consumeEach { value ->
        println("平方数: $value")
    }
    
    println("完成")
}

// 创建一个生产斐波那契数列的函数
fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) {
        send(x++)
        delay(100L)
    }
}

// 过滤素数
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) {
        if (x % prime != 0) send(x)
    }
}

// 使用管道模式
fun main2() = runBlocking {
    var cur = produceNumbers() // 从2开始的数字
    repeat(10) {
        val prime = cur.receive()
        println("素数: $prime")
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // 取消所有子协程
}

4. Select表达式

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

fun main() = runBlocking {
    val channel1 = Channel<String>()
    val channel2 = Channel<String>()
    
    // 生产者1
    launch {
        delay(1000L)
        channel1.send("来自channel1")
    }
    
    // 生产者2
    launch {
        delay(1500L)
        channel2.send("来自channel2")
    }
    
    // 使用select等待第一个可用的结果
    repeat(2) {
        val result = select<String> {
            channel1.onReceive { value ->
                "channel1: $value"
            }
            channel2.onReceive { value ->
                "channel2: $value"
            }
        }
        println(result)
    }
    
    coroutineContext.cancelChildren()
}

协程的使用场景

1. 网络请求

import kotlinx.coroutines.*
import java.net.URL

// 模拟网络请求
suspend fun fetchUserData(userId: Int): String {
    delay(1000L) // 模拟网络延迟
    return "用户$userId 的数据"
}

suspend fun fetchUserPosts(userId: Int): List<String> {
    delay(800L) // 模拟网络延迟
    return listOf("帖子1", "帖子2", "帖子3")
}

fun main() = runBlocking {
    val userId = 123
    
    // 并发获取用户数据和帖子
    val userData = async { fetchUserData(userId) }
    val userPosts = async { fetchUserPosts(userId) }
    
    // 等待所有数据
    val user = userData.await()
    val posts = userPosts.await()
    
    println("用户信息: $user")
    println("用户帖子: $posts")
}

2. 文件处理

import kotlinx.coroutines.*
import java.io.File

// 模拟文件读取
suspend fun readFile(filename: String): String {
    return withContext(Dispatchers.IO) {
        delay(500L) // 模拟IO操作
        "文件 $filename 的内容"
    }
}

// 模拟文件写入
suspend fun writeFile(filename: String, content: String) {
    withContext(Dispatchers.IO) {
        delay(300L) // 模拟IO操作
        println("已写入文件 $filename: $content")
    }
}

fun main() = runBlocking {
    val files = listOf("file1.txt", "file2.txt", "file3.txt")
    
    // 并发读取多个文件
    val contents = files.map { filename ->
        async { readFile(filename) }
    }.awaitAll()
    
    // 处理文件内容
    contents.forEachIndexed { index, content ->
        val processedContent = "处理后的: $content"
        writeFile("output${index + 1}.txt", processedContent)
    }
}

3. 数据库操作

import kotlinx.coroutines.*

// 模拟数据库查询
suspend fun queryDatabase(sql: String): List<String> {
    return withContext(Dispatchers.IO) {
        delay(200L) // 模拟数据库查询时间
        listOf("结果1", "结果2", "结果3")
    }
}

// 模拟数据库更新
suspend fun updateDatabase(sql: String): Boolean {
    return withContext(Dispatchers.IO) {
        delay(150L) // 模拟数据库更新时间
        true
    }
}

fun main() = runBlocking {
    // 批量数据库操作
    val queries = listOf(
        "SELECT * FROM users",
        "SELECT * FROM posts",
        "SELECT * FROM comments"
    )
    
    // 并发执行查询
    val results = queries.map { query ->
        async { queryDatabase(query) }
    }.awaitAll()
    
    results.forEachIndexed { index, result ->
        println("查询 ${index + 1} 结果: $result")
    }
    
    // 执行更新操作
    val updateSuccess = updateDatabase("UPDATE users SET status = 'active'")
    println("更新成功: $updateSuccess")
}

4. UI更新(Android示例概念)

// 注意:这是概念性代码,实际Android开发中需要相应的依赖
import kotlinx.coroutines.*

class ViewModel {
    private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
    
    fun loadData() {
        scope.launch {
            try {
                // 显示加载状态
                updateUI("加载中...")
                
                // 在IO线程执行网络请求
                val data = withContext(Dispatchers.IO) {
                    fetchDataFromNetwork()
                }
                
                // 回到主线程更新UI
                updateUI("数据: $data")
            } catch (e: Exception) {
                updateUI("错误: ${e.message}")
            }
        }
    }
    
    private suspend fun fetchDataFromNetwork(): String {
        delay(2000L) // 模拟网络请求
        return "网络数据"
    }
    
    private fun updateUI(message: String) {
        println("UI更新: $message")
    }
    
    fun cleanup() {
        scope.cancel()
    }
}

fun main() = runBlocking {
    val viewModel = ViewModel()
    viewModel.loadData()
    
    delay(3000L) // 等待操作完成
    viewModel.cleanup()
}

常见错误和最佳实践

1. 常见错误

错误1:阻塞协程

// ❌ 错误:在协程中使用阻塞操作
fun badExample() = runBlocking {
    launch {
        Thread.sleep(1000L) // 阻塞线程,不要这样做!
        println("这是错误的做法")
    }
}

// ✅ 正确:使用挂起函数
fun goodExample() = runBlocking {
    launch {
        delay(1000L) // 挂起协程,不阻塞线程
        println("这是正确的做法")
    }
}

错误2:忘记处理协程取消

// ❌ 错误:不检查取消状态
suspend fun badLongRunningTask() {
    repeat(1000000) { i ->
        // 长时间运行的任务,不检查取消状态
        println("处理 $i")
        Thread.sleep(1) // 还使用了阻塞调用
    }
}

// ✅ 正确:检查取消状态
suspend fun goodLongRunningTask() {
    repeat(1000000) { i ->
        ensureActive() // 检查协程是否被取消
        println("处理 $i")
        delay(1) // 使用挂起函数
    }
}

错误3:不正确的异常处理

// ❌ 错误:吞掉异常
fun badExceptionHandling() = runBlocking {
    launch {
        try {
            throw RuntimeException("出错了")
        } catch (e: Exception) {
            // 静默忽略异常,这是不好的做法
        }
    }
}

// ✅ 正确:适当处理异常
fun goodExceptionHandling() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("处理异常: $exception")
        // 记录日志、上报错误等
    }
    
    launch(handler) {
        throw RuntimeException("出错了")
    }
}

错误4:内存泄漏

class BadActivity {
    // ❌ 错误:使用GlobalScope可能导致内存泄漏
    fun startWork() {
        GlobalScope.launch {
            // 长时间运行的任务
            repeat(1000) {
                delay(1000L)
                // 可能持有Activity的引用
            }
        }
    }
}

class GoodActivity {
    // ✅ 正确:使用生命周期感知的作用域
    private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
    
    fun startWork() {
        scope.launch {
            repeat(1000) {
                delay(1000L)
                // 当Activity销毁时,协程会被取消
            }
        }
    }
    
    fun onDestroy() {
        scope.cancel() // 清理协程
    }
}

2. 最佳实践

实践1:使用结构化并发

// ✅ 好的做法:使用结构化并发
class DataRepository {
    private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    
    suspend fun loadUserData(userId: Int): UserData {
        return coroutineScope { // 创建子作用域
            val userInfo = async { fetchUserInfo(userId) }
            val userPosts = async { fetchUserPosts(userId) }
            val userFriends = async { fetchUserFriends(userId) }
            
            UserData(
                info = userInfo.await(),
                posts = userPosts.await(),
                friends = userFriends.await()
            )
        }
    }
    
    private suspend fun fetchUserInfo(userId: Int): String {
        delay(1000L)
        return "用户信息"
    }
    
    private suspend fun fetchUserPosts(userId: Int): List<String> {
        delay(800L)
        return listOf("帖子1", "帖子2")
    }
    
    private suspend fun fetchUserFriends(userId: Int): List<String> {
        delay(600L)
        return listOf("朋友1", "朋友2")
    }
    
    fun cleanup() {
        scope.cancel()
    }
}

data class UserData(
    val info: String,
    val posts: List<String>,
    val friends: List<String>
)

实践2:合理使用调度器

class FileProcessor {
    // CPU密集型任务使用Default调度器
    suspend fun processData(data: List<Int>): List<Int> {
        return withContext(Dispatchers.Default) {
            data.map { it * it }.filter { it > 100 }
        }
    }
    
    // IO操作使用IO调度器
    suspend fun saveToFile(data: List<Int>, filename: String) {
        withContext(Dispatchers.IO) {
            // 模拟文件写入
            delay(500L)
            println("数据已保存到 $filename")
        }
    }
    
    // 组合使用
    suspend fun processAndSave(data: List<Int>, filename: String) {
        val processedData = processData(data) // CPU密集型
        saveToFile(processedData, filename)   // IO密集型
    }
}

实践3:超时处理

import kotlinx.coroutines.*

suspend fun fetchDataWithTimeout(): String? {
    return try {
        withTimeout(5000L) { // 5秒超时
            // 模拟可能很慢的网络请求
            delay(3000L)
            "获取到的数据"
        }
    } catch (e: TimeoutCancellationException) {
        println("请求超时")
        null
    }
}

// 使用withTimeoutOrNull避免异常
suspend fun fetchDataWithTimeoutOrNull(): String? {
    return withTimeoutOrNull(5000L) {
        delay(3000L)
        "获取到的数据"
    } // 超时时返回null而不是抛出异常
}

fun main() = runBlocking {
    val result1 = fetchDataWithTimeout()
    println("结果1: $result1")
    
    val result2 = fetchDataWithTimeoutOrNull()
    println("结果2: $result2")
}

实践4:资源管理

import kotlinx.coroutines.*
import java.io.Closeable

class DatabaseConnection : Closeable {
    fun query(sql: String): String {
        return "查询结果: $sql"
    }
    
    override fun close() {
        println("数据库连接已关闭")
    }
}

// 使用use函数确保资源被正确关闭
suspend fun queryWithResource(): String {
    return DatabaseConnection().use { connection ->
        delay(1000L) // 模拟查询时间
        connection.query("SELECT * FROM users")
    } // connection会自动关闭
}

// 在协程中管理多个资源
suspend fun complexOperation() {
    coroutineScope {
        val connection1 = DatabaseConnection()
        val connection2 = DatabaseConnection()
        
        try {
            val result1 = async {
                delay(1000L)
                connection1.query("SELECT * FROM table1")
            }
            
            val result2 = async {
                delay(1200L)
                connection2.query("SELECT * FROM table2")
            }
            
            println("结果1: ${result1.await()}")
            println("结果2: ${result2.await()}")
        } finally {
            connection1.close()
            connection2.close()
        }
    }
}

3. 性能优化建议

// 1. 避免创建过多的协程
fun inefficientApproach(items: List<String>) = runBlocking {
    // ❌ 为每个item创建一个协程
    items.map { item ->
        async {
            processItem(item)
        }
    }.awaitAll()
}

fun efficientApproach(items: List<String>) = runBlocking {
    // ✅ 使用有限的并发数
    val semaphore = Semaphore(10) // 最多10个并发
    items.map { item ->
        async {
            semaphore.withPermit {
                processItem(item)
            }
        }
    }.awaitAll()
}

suspend fun processItem(item: String): String {
    delay(100L)
    return "处理后的 $item"
}

// 2. 使用Channel进行流式处理
fun streamProcessing() = runBlocking {
    val channel = Channel<String>(capacity = 100)
    
    // 生产者
    launch {
        repeat(1000) { i ->
            channel.send("Item $i")
        }
        channel.close()
    }
    
    // 多个消费者
    repeat(5) { workerId ->
        launch {
            for (item in channel) {
                println("Worker $workerId 处理: $item")
                delay(10L)
            }
        }
    }
}

总结

Kotlin协程是一个强大的并发编程工具,它提供了以下主要优势:

核心概念回顾

  1. 挂起函数:使用suspend关键字,可以暂停和恢复执行
  2. 协程构建器launchasyncrunBlocking等用于创建协程
  3. 协程作用域:管理协程的生命周期和结构化并发
  4. 调度器:控制协程在哪个线程执行
  5. 取消和异常处理:提供了完善的错误处理机制

主要优势

  • 轻量级:比线程更高效,可以创建大量协程
  • 非阻塞:避免阻塞线程,提高应用响应性
  • 结构化并发:提供清晰的并发模型
  • 异常安全:完善的异常处理和传播机制
  • 易于使用:简洁的API,降低并发编程复杂度

使用建议

  1. 选择合适的调度器:CPU密集型用Default,IO操作用IO
  2. 正确处理取消:使用ensureActive()或检查isActive
  3. 避免内存泄漏:使用生命周期感知的协程作用域
  4. 合理使用超时:为可能长时间运行的操作设置超时
  5. 结构化并发:使用coroutineScope确保所有子协程完成

适用场景

  • 网络请求和API调用
  • 文件I/O操作
  • 数据库访问
  • UI更新(Android开发)
  • 并发数据处理
  • 实时数据流处理

通过掌握这些概念和最佳实践,你可以有效地使用Kotlin协程来构建高效、可维护的并发应用程序。记住,协程的核心是让异步代码看起来像同步代码,同时保持非阻塞的特性。


本文涵盖了Kotlin协程的核心概念和实用技巧。随着你对协程的深入使用,你会发现更多高级特性和优化技巧。持续学习和实践是掌握协程的关键。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容