在 Kotlin 协程中,Flow 是一种异步数据流(Asynchronous Stream)API,专为处理顺序发射的多个值而设计,类似于 RxJava 的 Observable,但更轻量且深度集成 Kotlin 协程:
- Flow 的核心特性
冷流(Cold Stream):
只有在收集(collect)时才会触发数据发射,类似 Sequence。每次收集都会重新执行流。
非阻塞异步:
基于协程的挂起函数,避免回调地狱,支持结构化并发。
背压(Backpressure)支持:
通过 buffer()、conflate() 等操作符自动处理生产-消费速率不匹配问题。 - 基础用法
(1)创建 Flow
// 方式1: 使用 flow{} 构建器
fun fetchNumbers(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 模拟异步操作
emit(i) // 发射值
}
}
// 方式2: 转换集合为 Flow
listOf(1, 2, 3).asFlow()
(2)收集 Flow
viewModelScope.launch {
fetchNumbers()
.collect { value -> println(value) } // 1, 2, 3
}
- 关键操作符
(1)转换操作符
map / filter:同步转换或过滤。
transform:灵活发射任意值(可多次 emit)。
flowOf(1, 2, 3).transform { value ->
if (value % 2 == 0) emit(value * 2)
}
(2)异步操作符
flatMapConcat:顺序执行异步操作。
flatMapMerge:并发执行异步操作(默认限制并发数为 16)。
flatMapLatest:取消前一个未完成的任务,只处理最新值。
(3)背压处理
buffer():缓存发射的值,避免生产者被挂起。
conflate():丢弃中间值,只保留最新值。
collectLatest:取消并重启收集器处理新值。
- 异常处理
flow {
emit(1)
throw RuntimeException("Error")
}.catch { e -> println("Caught: $e") } // 捕获上游异常
.onCompletion { cause -> println("Flow completed: $cause") } // 类似 finally
- 与 LiveData/StateFlow 的对比
特性 Flow StateFlow (热流) LiveData
生命周期感知 需手动取消(通过协程作用域) 自动感知(与 ViewModel 绑定) 自动感知
多订阅支持 每次收集重新触发流 共享最新状态 共享最新状态
适用场景 一次性异步数据流 UI 状态管理 简单 UI 数据观察 - 高级场景
(1)Flow + Room 数据库
@Dao
interface UserDao {
@Query("SELECT * FROM users")
fun getUsers(): Flow<List<User>> // 自动监听数据库变化
}
(2)Flow + Retrofit
interface ApiService {
@GET("users")
suspend fun fetchUsers(): List<User>
}
fun getUsersFlow(): Flow<List<User>> = flow {
emit(apiService.fetchUsers())
}.flowOn(Dispatchers.IO) // 指定调度线程
- 最佳实践
避免在 flow{} 中阻塞线程:使用 delay 或 withContext 替代 Thread.sleep 。
合理使用 shareIn/stateIn:将冷流转换为热流,避免重复计算。
测试时用 test 扩展:
runTest {
val flow = flowOf(1, 2, 3)
flow.test {
assertEquals(1, awaitItem())
assertEquals(2, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
总结
何时用 Flow:需要处理异步数据流(如分页加载、WebSocket 消息、传感器数据)。
何时用 StateFlow:需要将状态暴露给 UI 层(如 ViewModel 中的 UI 状态)。
性能优化:注意操作符的同步/异步选择,合理处理背压。