目录
什么是协程
协程(Coroutines)是一种轻量级的并发编程模型,它允许我们以同步的方式编写异步代码。在Kotlin中,协程是一种用于简化异步编程的工具,它可以让我们避免回调地狱,使代码更加清晰和易于维护。
协程的特点
- 轻量级:协程比线程更轻量,可以创建成千上万个协程而不会消耗太多资源
- 非阻塞:协程可以挂起而不阻塞线程
- 结构化并发:协程提供了结构化的并发模型,便于管理和控制
- 异常安全:协程提供了良好的异常处理机制
// 简单的协程示例
import kotlinx.coroutines.*
fun main() {
// 启动一个协程
runBlocking {
// 在协程中执行代码
println("Hello")
delay(1000L) // 挂起1秒,不阻塞线程
println("World!")
}
}
协程的设计思想
1. 挂起函数(Suspend Functions)
协程的核心概念是挂起函数。挂起函数是可以被暂停和恢复的函数,它们使用suspend关键字标记。
// 挂起函数的定义
suspend fun doSomething(): String {
delay(1000L) // 模拟耗时操作
return "Result"
}
// 挂起函数只能在协程或其他挂起函数中调用
fun main() {
runBlocking {
val result = doSomething() // 在协程中调用挂起函数
println(result)
}
}
2. 协程构建器(Coroutine Builders)
协程构建器用于创建和启动协程:
import kotlinx.coroutines.*
fun main() {
// runBlocking: 阻塞当前线程直到协程完成
runBlocking {
println("runBlocking 开始")
// launch: 启动一个新协程,不阻塞当前线程
val job = launch {
delay(1000L)
println("launch 协程完成")
}
// async: 启动一个新协程并返回Deferred对象
val deferred = async {
delay(500L)
"async 结果"
}
println("等待协程完成...")
job.join() // 等待launch协程完成
val result = deferred.await() // 等待async协程完成并获取结果
println("结果: $result")
}
}
3. 协程的状态机实现
协程在底层是通过状态机实现的,每个挂起点都是状态机的一个状态:
// 编译器会将这个挂起函数转换为状态机
suspend fun example() {
println("开始") // 状态0
delay(1000L) // 挂起点1
println("中间") // 状态1
delay(1000L) // 挂起点2
println("结束") // 状态2
}
协程基础用法
1. 基本的协程启动
import kotlinx.coroutines.*
fun main() {
println("程序开始")
// 使用runBlocking启动协程
runBlocking {
println("协程开始")
delay(2000L) // 挂起2秒
println("协程结束")
}
println("程序结束")
}
2. 并发执行多个协程
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking {
val time = measureTimeMillis {
// 串行执行
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("串行结果: ${one + two}")
}
println("串行执行时间: $time ms")
val time2 = measureTimeMillis {
// 并发执行
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("并发结果: ${one.await() + two.await()}")
}
println("并发执行时间: $time2 ms")
}
// 模拟耗时操作
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // 模拟1秒的工作
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // 模拟1秒的工作
return 29
}
3. 协程的懒启动
import kotlinx.coroutines.*
fun main() = runBlocking {
// 懒启动的async
val lazyDeferred = async(start = CoroutineStart.LAZY) {
println("懒启动协程开始执行")
delay(1000L)
"懒启动结果"
}
println("协程已创建但未启动")
delay(2000L)
println("现在启动协程")
val result = lazyDeferred.await() // 启动并等待结果
println("结果: $result")
}
协程作用域和上下文
1. 协程作用域(CoroutineScope)
协程作用域定义了协程的生命周期和结构化并发的边界:
import kotlinx.coroutines.*
class MyClass {
// 创建自定义协程作用域
private val scope = CoroutineScope(Dispatchers.Default + Job())
fun doSomething() {
// 在自定义作用域中启动协程
scope.launch {
repeat(5) { i ->
println("协程工作中: $i")
delay(500L)
}
}
}
fun cleanup() {
// 取消作用域中的所有协程
scope.cancel()
}
}
fun main() = runBlocking {
val myClass = MyClass()
myClass.doSomething()
delay(2000L) // 让协程运行一段时间
myClass.cleanup() // 清理资源
delay(1000L) // 观察协程是否被取消
println("程序结束")
}
2. 协程上下文(CoroutineContext)
协程上下文是一组元素的集合,包括调度器、Job、异常处理器等:
import kotlinx.coroutines.*
fun main() = runBlocking {
// 使用不同的上下文元素
val job = launch(
Dispatchers.IO + // 调度器
CoroutineName("MyCoroutine") + // 协程名称
CoroutineExceptionHandler { _, exception -> // 异常处理器
println("捕获到异常: $exception")
}
) {
println("协程名称: ${coroutineContext[CoroutineName]}")
println("当前线程: ${Thread.currentThread().name}")
// 模拟一些工作
repeat(3) { i ->
println("工作 $i")
delay(1000L)
}
}
job.join()
}
3. 父子协程关系
import kotlinx.coroutines.*
fun main() = runBlocking {
println("父协程开始")
val parentJob = launch {
println("父协程工作中")
// 启动子协程
val child1 = launch {
println("子协程1开始")
delay(2000L)
println("子协程1完成")
}
val child2 = launch {
println("子协程2开始")
delay(1000L)
println("子协程2完成")
}
println("父协程等待子协程完成")
// 父协程会自动等待所有子协程完成
}
parentJob.join()
println("所有协程完成")
}
协程的调度器
调度器决定了协程在哪个线程或线程池中执行:
1. 内置调度器
import kotlinx.coroutines.*
fun main() = runBlocking {
// Dispatchers.Default - 用于CPU密集型任务
launch(Dispatchers.Default) {
println("Default: ${Thread.currentThread().name}")
// 适合CPU密集型计算
val result = (1..1000000).sum()
println("计算结果: $result")
}
// Dispatchers.IO - 用于IO密集型任务
launch(Dispatchers.IO) {
println("IO: ${Thread.currentThread().name}")
// 适合文件读写、网络请求等
delay(1000L) // 模拟IO操作
println("IO操作完成")
}
// Dispatchers.Main - 用于UI更新(Android中)
// 注意:在普通JVM程序中Main调度器不可用
// Dispatchers.Unconfined - 不限制调度器
launch(Dispatchers.Unconfined) {
println("Unconfined 1: ${Thread.currentThread().name}")
delay(100L)
println("Unconfined 2: ${Thread.currentThread().name}")
}
delay(2000L) // 等待所有协程完成
}
2. 自定义调度器
import kotlinx.coroutines.*
import java.util.concurrent.Executors
fun main() = runBlocking {
// 创建自定义线程池调度器
val customDispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
try {
// 使用自定义调度器
val jobs = List(10) { i ->
launch(customDispatcher) {
println("任务 $i 在线程: ${Thread.currentThread().name}")
delay(1000L)
println("任务 $i 完成")
}
}
// 等待所有任务完成
jobs.forEach { it.join() }
} finally {
// 关闭自定义调度器
customDispatcher.close()
}
}
协程的取消和异常处理
1. 协程取消
import kotlinx.coroutines.*
fun main() = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("任务 $i")
delay(500L)
}
} catch (e: CancellationException) {
println("协程被取消: ${e.message}")
} finally {
println("清理资源")
}
}
delay(2000L) // 让协程运行2秒
println("取消协程")
job.cancel() // 取消协程
job.join() // 等待协程完成清理
println("协程已取消")
}
2. 协程取消的配合性
import kotlinx.coroutines.*
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
// 检查协程是否被取消
while (isActive) { // isActive是协程的扩展属性
if (System.currentTimeMillis() >= nextPrintTime) {
println("任务 ${i++}")
nextPrintTime += 500L
}
}
}
delay(1300L)
println("取消协程")
job.cancelAndJoin() // 取消并等待协程完成
println("协程已取消")
}
3. 异常处理
import kotlinx.coroutines.*
fun main() = runBlocking {
// 使用CoroutineExceptionHandler处理未捕获的异常
val handler = CoroutineExceptionHandler { _, exception ->
println("捕获到异常: $exception")
}
val job = launch(handler) {
// 模拟异常
throw RuntimeException("协程中的异常")
}
job.join()
// 使用try-catch处理async中的异常
val deferred = async {
delay(1000L)
throw IllegalStateException("async中的异常")
}
try {
deferred.await()
} catch (e: Exception) {
println("捕获async异常: $e")
}
}
4. SupervisorJob的使用
import kotlinx.coroutines.*
fun main() = runBlocking {
// 使用SupervisorJob,子协程的异常不会影响其他子协程
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// 第一个子协程会失败
val firstChild = launch {
println("第一个子协程开始")
delay(1000L)
throw RuntimeException("第一个子协程失败")
}
// 第二个子协程会成功完成
val secondChild = launch {
println("第二个子协程开始")
delay(2000L)
println("第二个子协程完成")
}
// 等待第一个子协程失败
try {
firstChild.join()
} catch (e: Exception) {
println("第一个子协程异常: $e")
}
// 第二个子协程仍然会完成
secondChild.join()
}
}
协程的通信机制
1. Channel(通道)
Channel是协程之间通信的管道:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// 创建一个Channel
val channel = Channel<Int>()
// 生产者协程
launch {
for (x in 1..5) {
println("发送: $x")
channel.send(x) // 发送数据到channel
delay(1000L)
}
channel.close() // 关闭channel
}
// 消费者协程
launch {
// 从channel接收数据
for (y in channel) {
println("接收: $y")
}
println("消费者完成")
}
delay(6000L)
}
2. 不同类型的Channel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// 无缓冲Channel(默认)
val unbufferedChannel = Channel<String>()
// 有缓冲Channel
val bufferedChannel = Channel<String>(capacity = 4)
// 无限缓冲Channel
val unlimitedChannel = Channel<String>(Channel.UNLIMITED)
// 冲突Channel(新值会覆盖旧值)
val conflatedChannel = Channel<String>(Channel.CONFLATED)
// 演示缓冲Channel
launch {
repeat(10) { i ->
println("发送: $i")
bufferedChannel.send("Item $i")
println("已发送: $i")
}
bufferedChannel.close()
}
delay(2000L) // 让生产者先运行
// 消费数据
for (item in bufferedChannel) {
println("接收: $item")
delay(1000L) // 模拟处理时间
}
}
3. produce构建器
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// 使用produce构建器创建生产者
val squares = produce {
for (x in 1..5) {
send(x * x) // 发送平方数
delay(1000L)
}
}
// 消费数据
squares.consumeEach { value ->
println("平方数: $value")
}
println("完成")
}
// 创建一个生产斐波那契数列的函数
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) {
send(x++)
delay(100L)
}
}
// 过滤素数
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) {
if (x % prime != 0) send(x)
}
}
// 使用管道模式
fun main2() = runBlocking {
var cur = produceNumbers() // 从2开始的数字
repeat(10) {
val prime = cur.receive()
println("素数: $prime")
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // 取消所有子协程
}
4. Select表达式
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
fun main() = runBlocking {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
// 生产者1
launch {
delay(1000L)
channel1.send("来自channel1")
}
// 生产者2
launch {
delay(1500L)
channel2.send("来自channel2")
}
// 使用select等待第一个可用的结果
repeat(2) {
val result = select<String> {
channel1.onReceive { value ->
"channel1: $value"
}
channel2.onReceive { value ->
"channel2: $value"
}
}
println(result)
}
coroutineContext.cancelChildren()
}
协程的使用场景
1. 网络请求
import kotlinx.coroutines.*
import java.net.URL
// 模拟网络请求
suspend fun fetchUserData(userId: Int): String {
delay(1000L) // 模拟网络延迟
return "用户$userId 的数据"
}
suspend fun fetchUserPosts(userId: Int): List<String> {
delay(800L) // 模拟网络延迟
return listOf("帖子1", "帖子2", "帖子3")
}
fun main() = runBlocking {
val userId = 123
// 并发获取用户数据和帖子
val userData = async { fetchUserData(userId) }
val userPosts = async { fetchUserPosts(userId) }
// 等待所有数据
val user = userData.await()
val posts = userPosts.await()
println("用户信息: $user")
println("用户帖子: $posts")
}
2. 文件处理
import kotlinx.coroutines.*
import java.io.File
// 模拟文件读取
suspend fun readFile(filename: String): String {
return withContext(Dispatchers.IO) {
delay(500L) // 模拟IO操作
"文件 $filename 的内容"
}
}
// 模拟文件写入
suspend fun writeFile(filename: String, content: String) {
withContext(Dispatchers.IO) {
delay(300L) // 模拟IO操作
println("已写入文件 $filename: $content")
}
}
fun main() = runBlocking {
val files = listOf("file1.txt", "file2.txt", "file3.txt")
// 并发读取多个文件
val contents = files.map { filename ->
async { readFile(filename) }
}.awaitAll()
// 处理文件内容
contents.forEachIndexed { index, content ->
val processedContent = "处理后的: $content"
writeFile("output${index + 1}.txt", processedContent)
}
}
3. 数据库操作
import kotlinx.coroutines.*
// 模拟数据库查询
suspend fun queryDatabase(sql: String): List<String> {
return withContext(Dispatchers.IO) {
delay(200L) // 模拟数据库查询时间
listOf("结果1", "结果2", "结果3")
}
}
// 模拟数据库更新
suspend fun updateDatabase(sql: String): Boolean {
return withContext(Dispatchers.IO) {
delay(150L) // 模拟数据库更新时间
true
}
}
fun main() = runBlocking {
// 批量数据库操作
val queries = listOf(
"SELECT * FROM users",
"SELECT * FROM posts",
"SELECT * FROM comments"
)
// 并发执行查询
val results = queries.map { query ->
async { queryDatabase(query) }
}.awaitAll()
results.forEachIndexed { index, result ->
println("查询 ${index + 1} 结果: $result")
}
// 执行更新操作
val updateSuccess = updateDatabase("UPDATE users SET status = 'active'")
println("更新成功: $updateSuccess")
}
4. UI更新(Android示例概念)
// 注意:这是概念性代码,实际Android开发中需要相应的依赖
import kotlinx.coroutines.*
class ViewModel {
private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
fun loadData() {
scope.launch {
try {
// 显示加载状态
updateUI("加载中...")
// 在IO线程执行网络请求
val data = withContext(Dispatchers.IO) {
fetchDataFromNetwork()
}
// 回到主线程更新UI
updateUI("数据: $data")
} catch (e: Exception) {
updateUI("错误: ${e.message}")
}
}
}
private suspend fun fetchDataFromNetwork(): String {
delay(2000L) // 模拟网络请求
return "网络数据"
}
private fun updateUI(message: String) {
println("UI更新: $message")
}
fun cleanup() {
scope.cancel()
}
}
fun main() = runBlocking {
val viewModel = ViewModel()
viewModel.loadData()
delay(3000L) // 等待操作完成
viewModel.cleanup()
}
常见错误和最佳实践
1. 常见错误
错误1:阻塞协程
// ❌ 错误:在协程中使用阻塞操作
fun badExample() = runBlocking {
launch {
Thread.sleep(1000L) // 阻塞线程,不要这样做!
println("这是错误的做法")
}
}
// ✅ 正确:使用挂起函数
fun goodExample() = runBlocking {
launch {
delay(1000L) // 挂起协程,不阻塞线程
println("这是正确的做法")
}
}
错误2:忘记处理协程取消
// ❌ 错误:不检查取消状态
suspend fun badLongRunningTask() {
repeat(1000000) { i ->
// 长时间运行的任务,不检查取消状态
println("处理 $i")
Thread.sleep(1) // 还使用了阻塞调用
}
}
// ✅ 正确:检查取消状态
suspend fun goodLongRunningTask() {
repeat(1000000) { i ->
ensureActive() // 检查协程是否被取消
println("处理 $i")
delay(1) // 使用挂起函数
}
}
错误3:不正确的异常处理
// ❌ 错误:吞掉异常
fun badExceptionHandling() = runBlocking {
launch {
try {
throw RuntimeException("出错了")
} catch (e: Exception) {
// 静默忽略异常,这是不好的做法
}
}
}
// ✅ 正确:适当处理异常
fun goodExceptionHandling() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("处理异常: $exception")
// 记录日志、上报错误等
}
launch(handler) {
throw RuntimeException("出错了")
}
}
错误4:内存泄漏
class BadActivity {
// ❌ 错误:使用GlobalScope可能导致内存泄漏
fun startWork() {
GlobalScope.launch {
// 长时间运行的任务
repeat(1000) {
delay(1000L)
// 可能持有Activity的引用
}
}
}
}
class GoodActivity {
// ✅ 正确:使用生命周期感知的作用域
private val scope = CoroutineScope(Dispatchers.Main + SupervisorJob())
fun startWork() {
scope.launch {
repeat(1000) {
delay(1000L)
// 当Activity销毁时,协程会被取消
}
}
}
fun onDestroy() {
scope.cancel() // 清理协程
}
}
2. 最佳实践
实践1:使用结构化并发
// ✅ 好的做法:使用结构化并发
class DataRepository {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
suspend fun loadUserData(userId: Int): UserData {
return coroutineScope { // 创建子作用域
val userInfo = async { fetchUserInfo(userId) }
val userPosts = async { fetchUserPosts(userId) }
val userFriends = async { fetchUserFriends(userId) }
UserData(
info = userInfo.await(),
posts = userPosts.await(),
friends = userFriends.await()
)
}
}
private suspend fun fetchUserInfo(userId: Int): String {
delay(1000L)
return "用户信息"
}
private suspend fun fetchUserPosts(userId: Int): List<String> {
delay(800L)
return listOf("帖子1", "帖子2")
}
private suspend fun fetchUserFriends(userId: Int): List<String> {
delay(600L)
return listOf("朋友1", "朋友2")
}
fun cleanup() {
scope.cancel()
}
}
data class UserData(
val info: String,
val posts: List<String>,
val friends: List<String>
)
实践2:合理使用调度器
class FileProcessor {
// CPU密集型任务使用Default调度器
suspend fun processData(data: List<Int>): List<Int> {
return withContext(Dispatchers.Default) {
data.map { it * it }.filter { it > 100 }
}
}
// IO操作使用IO调度器
suspend fun saveToFile(data: List<Int>, filename: String) {
withContext(Dispatchers.IO) {
// 模拟文件写入
delay(500L)
println("数据已保存到 $filename")
}
}
// 组合使用
suspend fun processAndSave(data: List<Int>, filename: String) {
val processedData = processData(data) // CPU密集型
saveToFile(processedData, filename) // IO密集型
}
}
实践3:超时处理
import kotlinx.coroutines.*
suspend fun fetchDataWithTimeout(): String? {
return try {
withTimeout(5000L) { // 5秒超时
// 模拟可能很慢的网络请求
delay(3000L)
"获取到的数据"
}
} catch (e: TimeoutCancellationException) {
println("请求超时")
null
}
}
// 使用withTimeoutOrNull避免异常
suspend fun fetchDataWithTimeoutOrNull(): String? {
return withTimeoutOrNull(5000L) {
delay(3000L)
"获取到的数据"
} // 超时时返回null而不是抛出异常
}
fun main() = runBlocking {
val result1 = fetchDataWithTimeout()
println("结果1: $result1")
val result2 = fetchDataWithTimeoutOrNull()
println("结果2: $result2")
}
实践4:资源管理
import kotlinx.coroutines.*
import java.io.Closeable
class DatabaseConnection : Closeable {
fun query(sql: String): String {
return "查询结果: $sql"
}
override fun close() {
println("数据库连接已关闭")
}
}
// 使用use函数确保资源被正确关闭
suspend fun queryWithResource(): String {
return DatabaseConnection().use { connection ->
delay(1000L) // 模拟查询时间
connection.query("SELECT * FROM users")
} // connection会自动关闭
}
// 在协程中管理多个资源
suspend fun complexOperation() {
coroutineScope {
val connection1 = DatabaseConnection()
val connection2 = DatabaseConnection()
try {
val result1 = async {
delay(1000L)
connection1.query("SELECT * FROM table1")
}
val result2 = async {
delay(1200L)
connection2.query("SELECT * FROM table2")
}
println("结果1: ${result1.await()}")
println("结果2: ${result2.await()}")
} finally {
connection1.close()
connection2.close()
}
}
}
3. 性能优化建议
// 1. 避免创建过多的协程
fun inefficientApproach(items: List<String>) = runBlocking {
// ❌ 为每个item创建一个协程
items.map { item ->
async {
processItem(item)
}
}.awaitAll()
}
fun efficientApproach(items: List<String>) = runBlocking {
// ✅ 使用有限的并发数
val semaphore = Semaphore(10) // 最多10个并发
items.map { item ->
async {
semaphore.withPermit {
processItem(item)
}
}
}.awaitAll()
}
suspend fun processItem(item: String): String {
delay(100L)
return "处理后的 $item"
}
// 2. 使用Channel进行流式处理
fun streamProcessing() = runBlocking {
val channel = Channel<String>(capacity = 100)
// 生产者
launch {
repeat(1000) { i ->
channel.send("Item $i")
}
channel.close()
}
// 多个消费者
repeat(5) { workerId ->
launch {
for (item in channel) {
println("Worker $workerId 处理: $item")
delay(10L)
}
}
}
}
总结
Kotlin协程是一个强大的并发编程工具,它提供了以下主要优势:
核心概念回顾
-
挂起函数:使用
suspend关键字,可以暂停和恢复执行 -
协程构建器:
launch、async、runBlocking等用于创建协程 - 协程作用域:管理协程的生命周期和结构化并发
- 调度器:控制协程在哪个线程执行
- 取消和异常处理:提供了完善的错误处理机制
主要优势
- 轻量级:比线程更高效,可以创建大量协程
- 非阻塞:避免阻塞线程,提高应用响应性
- 结构化并发:提供清晰的并发模型
- 异常安全:完善的异常处理和传播机制
- 易于使用:简洁的API,降低并发编程复杂度
使用建议
- 选择合适的调度器:CPU密集型用Default,IO操作用IO
-
正确处理取消:使用
ensureActive()或检查isActive - 避免内存泄漏:使用生命周期感知的协程作用域
- 合理使用超时:为可能长时间运行的操作设置超时
-
结构化并发:使用
coroutineScope确保所有子协程完成
适用场景
- 网络请求和API调用
- 文件I/O操作
- 数据库访问
- UI更新(Android开发)
- 并发数据处理
- 实时数据流处理
通过掌握这些概念和最佳实践,你可以有效地使用Kotlin协程来构建高效、可维护的并发应用程序。记住,协程的核心是让异步代码看起来像同步代码,同时保持非阻塞的特性。
本文涵盖了Kotlin协程的核心概念和实用技巧。随着你对协程的深入使用,你会发现更多高级特性和优化技巧。持续学习和实践是掌握协程的关键。