在协程中,Flow 是一种可以顺序发出多个值的类型,而不是只返回单个值的挂起函数。例如,你可以使用 Flow 从数据库接收实时更新。
数据流建立在协程之上,可以提供多个值。Flow 在概念上是可以异步计算的数据流。发出的值必须是同一类型。例如, Flow<Int>
是一个发出整数值的流。
数据流与生成一组序列值的 Iterator
非常相似,但它使用挂起函数来异步生成和使用值。这意味着,例如,Flow 可以安全地发出网络请求以生成下一个值,而不会阻塞主线程。
数据流涉及三个实体:
- 提供方会生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
- (可选)中介可以修改发送到数据流的值,或修正数据流本身。
- 使用方则使用数据流中的值。
在 Android 中,代码库通常是界面数据的提供方,其将界面用作最终显示数据的使用方。 而其他时候,UI 层是用户输入事件的生产者,而层次结构的其他层使用它们。 提供方和使用方之间的层通常充当中介,修改数据流以使其适应下一层的要求。
创建 Flow
要创建流,请使用flow
构建器 API。 流构建器函数创建一个新的 Flow,你可以在其中使用 emit 函数手动将新值发送到数据流中。
在以下示例中,数据源以固定的时间间隔自动获取最新新闻资讯。 由于挂起函数不能返回多个连续值,数据源创建并返回一个数据流来满足这个要求。 在这种情况下,数据源充当提供方。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // 将请求的结果发送到数据流
delay(refreshIntervalMs) // 暂停协程一段时间
}
}
}
// 提供一种通过挂起功能发出网络请求的方法的接口
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
构建器在协程中执行。 因此,它受益于相同的异步 API,但有一些限制:
- 数据流是有顺序的。当协程内的提供方调用挂起函数时,提供方会挂起,直到挂起函数返回。 在示例中,提供方会挂起,直到
fetchLatestNews
网络请求完成为止。只有这样,请求结果才会发送到数据流中。 - 使用
flow
构建器,生产者不能从不同的 CoroutineContext 发出值。 因此,不要通过创建新的协程或使用 withContext 代码块在不同的 CoroutineContext 中调用 emit。 在这些情况下,您可以使用其他流构建器,例如 callbackFlow。
修改数据流
中介可以使用中间运算符来修改数据流,而无需使用这些值。 这些操作符是函数,当应用于数据流时,会设置一系列暂不执行的链式运算,直到将来使用这些值时才会执行这些操作。 在 Flow 参考文档中了解有关中间运算符的更多信息。
在下面的示例中,存储库层使用中间运算符 map 来转换要在 View
上显示的数据:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* 返回对应流转换的最喜欢的最新新闻资讯。这些操作是惰性的,不会触发流程。
* 它们只是转换流在该时间点发出的当前值。
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// 过滤收藏主题列表的中间操作
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// 将最新消息保存在缓存中的中间操作
.onEach { news -> saveInCache(news) }
}
中间运算符可以接连应用,形成链式运算,在数据项被发送到数据流时延迟执行。 请注意,仅将一个中间运算符应用于数据流不会启动数据流收集。
从 Flow(数据流) 中收集
使用终端运算符可触发数据流开始监听值。如需获取数据流中的所有发出值,请使用 collect。 你可以在官方 Flow 文档中了解更多关于终端运算符的信息。
因为 collect
是一个挂起函数,所以它需要在协程中执行。 它接受 lambda 作为在每个新值上调用的参数。 由于它是一个挂起函数,调用 collect
的协程可能会挂起,直到流程关闭。
继续前面的示例,这里是一个 ViewModel 的简单实现,它使用来自存储库层的数据:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// 使用 collect 触发流并使用其元素
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// 使用最新喜欢的新闻资讯更新视图
}
}
}
}
收集数据流会触发提供方刷新最新消息,并以固定时间间隔发出网络请求的结果。 由于提供方在 while(true) 循环中始终保持活动状态,因此当 ViewModel
被清除并 viewModelScope
被取消时,数据流将被关闭。
收集数据流可能会因以下原因停止:
- 如上例所示,协程收集被取消。此操作也会让底层提供方停止活动。
- 提供方完成发出数据项。在这种情况下,数据流将关闭,调用
collect
的协程则继续执行。
除非使用其他中间运算符指定流,否则数据流始终为冷数据并延迟执行。 这意味着每次在流上调用终端操作符时都会执行提供方的代码。 在前面的示例中,拥有多个流收集器会导致数据源在不同的固定时间间隔内多次获取最新消息。 要在多个消费者同时收集时优化和共享数据流,请使用 shareIn 运算符。
捕捉意外的异常
提供方的数据实现可以来自第三方库。 这意味着它可能会抛出意外的异常。 要处理这些异常,请使用 catch 中间运算符。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// 中介捕获操作员。 如果抛出异常,
// 捕获并更新 UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// 使用最新喜欢的新闻资讯更新视图
}
}
}
}
在前面的示例中,当发生异常时,不会调用 collect
lambda,因为尚未收到新数据项。
catch
还可执行 emit
操作,向数据流发出数据项。示例存储库层可以改为对缓存值执行 emit
操作:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// 如果发生错误,则发出最后缓存的值
.catch { exception -> emit(lastCachedNews()) }
}
在此示例中,当发生异常时,将调用 collect
lambda,因为由于异常而将新数据项发送到数据流中。
在不同的 CoroutineContext 中执行
默认情况下,flow
构建器的提供方在从它收集的协程的 CoroutineContext
中执行,并且如前所述,它不能从不同的 CoroutineContext
对值执行 emit
操作。 在某些情况下,这种行为可能是不可取的。 例如,在本文章中使用的示例中,存储库层不应在 viewModelScope
使用的 Dispatchers.Main
上执行操作。
要更改流的 CoroutineContext
,请使用中间运算符 flowOn
。 flowOn
改变了上游流的 CoroutineContext
,这意味提供方和任何在 flowOn
之前(或之上)应用的中间操作符。 下游数据流(晚于 flowOn
的中间运算符和使用方)不受影响,并会在 CoroutineContext 上执行以从数据流执行 collect
操作。 如果有多个 flowOn
操作符,每个操作符都会从其当前位置更改上游数据流。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // 在默认调度程序上执行
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // 在默认调度程序上执行
saveInCache(news)
}
// flowOn 影响上游流 ↑
.flowOn(defaultDispatcher)
// 下游流 ↓ 不受影响
.catch { exception -> // 在消费者的上下文中执行
emit(lastCachedNews())
}
}
使用此代码,·
onEach
和 map
运算符使用 defaultDispatcher
,而 catch
运算符和使用者在 viewModelScope
使用的 Dispatchers.Main
上执行。
由于数据源层正在执行 I/O 工作,因此你应该使用针对 I/O 操作进行了优化的调度程序:
class NewsRemoteDataSource(
...,
private val ioDispatcher: CoroutineDispatcher
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
// 在 IO 调度程序上执行
...
}
.flowOn(ioDispatcher)
}
Jetpack 库中的数据流
Flow 已集成到许多 Jetpack 库中,并且在 Android 第三方库中很受欢迎。 Flow 非常适合实时数据更新和无限的数据流。
你可以将 Flow 与 Room 结合使用,以便在数据库发生更改时收到通知。 使用数据访问对象 (DAO) 时,返回 Flow 类型以获取实时更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每当 Example
数据表发生更改时,系统都会发出包含数据库新数据项的新列表。
将 callback-based APIs 转换为数据流
callbackFlow 是一个数据流构建器,可让你将 callback-based APIs转换为 数据流。
与 flow
构建器不同,callbackFlow
允许使用 send 函数从不同的 CoroutineContext
或使用 offer
函数在协程外部发出值。
在协程内部,callbackFlow
使用一个 channel,它在概念上与阻塞队列非常相似。 通道都有容量配置,限定了可缓冲元素数的上限。 callbackFlow
中创建的通道的默认容量为 64 个元素。 当你尝试将新元素添加到完整频道时,send
会将数据提供方挂起,直到有新元素的空间,而 offer
不会将相关元素添加到通道中,并会立即返回 false。
Kotlin Channel和阻塞队列很类似,区别在于Channel用挂起的send操作代替了阻塞的put,用挂起的receive操作代替了阻塞的take。
使用lifecycle-runtime-ktx库中的launchWhenX方法,对Channel的收集协程会在组件生命周期 < X时挂起,从而避免异常。也可以使用repeatOnLifecycle(State) 来在UI层收集,当生命周期 < State时,会取消协程,恢复时再重新启动协程。
看起来使用Channel承载事件是个不错的选择,并且一般来说事件分发都是一对一,因此并不需要支持一对多的BroadcastChannel(后者已经逐渐被废弃,被SharedFlow替代)
如何创建Channel?看一下Channel对外暴露可供使用的构造方法,考虑传入合适的参数。
public fun <E> Channel(
// 缓冲区容量,当超出容量时会触发onBufferOverflow指定的策略
capacity: Int = RENDEZVOUS,
// 缓冲区溢出策略,默认为挂起,还有DROP_OLDEST和DROP_LATEST
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
// 处理元素未能成功送达处理的情况,如订阅者被取消或者抛异常
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
首先Channel是热的,即任意时刻发送元素到Channel即使没有订阅者也会执行。所以考虑到存在订阅者协程被取消时发送事件的情况,即存在Channel处在无订阅者时的空档期收到事件情况。例如当Activity使用repeatOnLifecycle方法启动协程去消费ViewModel持有的Channel里的事件消息,当前Activity因为处于STOPED状态而取消了协程。
StateFlow(状态流) 和 SharedFlow(共享流)
StateFlow
和 SharedFlow
是 Flow API,允许数据流以最优方式发出状态更新并向多个使用方发出值。
StateFlow和SharedFlow,两者拥有Channel的很多特性,可以看作是将Flow推向台前,将Channel雪藏幕后的一手重要操作。
首先二者都是热流,并支持在构造器外发射数据。简单看下它们的构造方法
public fun <T> MutableSharedFlow(
// 每个新的订阅者订阅时收到的回放的数目,默认0
replay: Int = 0,
// 除了replay数目之外,缓存的容量,默认0
extraBufferCapacity: Int = 0,
// 缓存区溢出时的策略,默认为挂起。只有当至少有一个订阅者时,onBufferOverflow才会生效。当无订阅者时,只有最近replay数目的值会保存,并且onBufferOverflow无效。
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
)
//MutableStateFlow等价于使用如下构造参数的SharedFlow
MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
StateFlow
StateFlow
是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value
属性读取当前状态值。如需更新状态并将其发送到数据流,请为 MutableStateFlow
类的 value
属性分配一个新值。
在 Android 中,StateFlow
非常适合需要让可变状态保持可观察的类。
按照 Kotlin 数据流中的示例,可以从 LatestNewsViewModel
公开 StateFlow
,以便 View
能够监听界面状态更新,并自行使屏幕状态在配置更改后继续有效。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
// Backing property to avoid state updates from other classes
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// The UI collects from this StateFlow to get its state updates
val uiState: StateFlow<LatestNewsUiState> = _uiState
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Update View with the latest favorite news
// Writes to the value property of MutableStateFlow,
// adding a new element to the flow and updating all
// of its collectors
.collect { favoriteNews ->
_uiState.value = LatestNewsUiState.Success(favoriteNews)
}
}
}
}
// Represents different states for the LatestNews screen
sealed class LatestNewsUiState {
data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
data class Error(exception: Throwable): LatestNewsUiState()
}
负责更新 MutableStateFlow
的类是提供方,从 StateFlow
收集的所有类都是使用方。与使用 flow
构建器构建的冷数据流不同,StateFlow
是热数据流:从此类数据流收集数据不会触发任何提供方代码。StateFlow
始终处于活跃状态并存于内存中,而且只有在垃圾回收根中未涉及对它的其他引用时,它才符合垃圾回收条件。
当新使用方开始从数据流中收集数据时,它将接收信息流中的最近一个状态及任何后续状态。您可在 LiveData
等其他可观察类中找到此操作行为。
与处理任何其他数据流一样,View
会监听 StateFlow
:
class LatestNewsActivity : AppCompatActivity() {
private val latestNewsViewModel = // getViewModel()
override fun onCreate(savedInstanceState: Bundle?) {
...
// 在生命周期范围内启动协程
lifecycleScope.launch {
// 每次 ifecycle 处于 STARTED 状态(或更高)时,repeatOnLifecycle 在新的协程中启动块,
// 并在它停止时取消它。
repeatOnLifecycle(Lifecycle.State.STARTED) {
// 触发流程并开始监听值。
// 请注意,当生命周期开始时会发生这种情况,当生命周期停止时会停止收集
latestNewsViewModel.uiState.collect { uiState ->
// 收到的新值
when (uiState) {
is LatestNewsUiState.Success -> showFavoriteNews(uiState.news)
is LatestNewsUiState.Error -> showError(uiState.exception)
}
}
}
}
}
}
警告:如果需要更新界面,切勿使用 launch
或 launchIn
扩展函数从界面直接收集数据流。即使 View 不可见,这些函数也会处理事件。此行为可能会导致应用崩溃。 为避免这种情况,请使用 repeatOnLifecycle
API(如上所示)。
注意:repeatOnLifecycle
API 仅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0-alpha01
库及更高版本中提供。
如需将任何数据流转换为 StateFlow
,请使用 stateIn
中间运算符。
StateFlow、Flow 和 LiveData
StateFlow
和 LiveData
具有相似之处。两者都是可观察的数据容器类,并且在应用架构中使用时,两者都遵循相似模式。
但请注意,StateFlow
和 LiveData
的行为确实有所不同:
-
StateFlow
需要将初始状态传递给构造函数,而LiveData
不需要。 - 当 View 进入
STOPPED
状态时,LiveData.observe()
会自动取消注册使用方,而从StateFlow
或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,您需要从Lifecycle.repeatOnLifecycle
块收集数据流。
利用 shareIn
使冷数据流变为热数据流
StateFlow
是热数据流,只要该数据流被收集,或对它的任何其他引用在垃圾回收根中存在,该数据流就会一直存于内存中。您可以使用 shareIn
运算符将冷数据流变为热数据流。
以在 Kotlin 数据流中创建的 callbackFlow
为例,您无需为每个收集器都创建一个新数据流,而是可以使用 shareIn
在收集器间共享从 Firestore 检索到的数据。您需要传入以下内容:
- 用于共享数据流的
CoroutineScope
。此作用域函数的生命周期应长于任何使用方,以使共享数据流在足够长的时间内保持活跃状态。 - 要重放 (replay) 至每个新收集器的数据项数量。
- “启动”行为政策。
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
}
在此示例中,latestNews
数据流将上次发出的数据项重放至新收集器,只要 externalScope
处于活跃状态并且存在活跃收集器,它就会一直处于活跃状态。当存在活跃订阅者时,SharingStarted.WhileSubscribed()
“启动”政策将使上游提供方保持活跃状态。可使用其他启动政策,例如使用 SharingStarted.Eagerly
可立即启动提供方,使用 SharingStarted.Lazily
可在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态。
注意:如需详细了解 externalScope
的模式,请查看这篇文章。
SharedFlow
shareIn
函数会返回一个热数据流 SharedFlow
,此数据流会向从其中收集值的所有使用方发出数据。SharedFlow
是 StateFlow
的可配置性极高的泛化数据流。
您无需使用 shareIn
即可创建 SharedFlow
。例如,您可以使用 SharedFlow
将 tick 信息发送到应用的其余部分,以便让所有内容定期同时刷新。除了获取最新资讯之外,您可能还想要使用用户最喜欢的主题集刷新用户信息部分。在以下代码段中,TickHandler
公开了 SharedFlow
,以便其他类知道要在何时刷新其内容。与 StateFlow
一样,请在类中使用类型 MutableSharedFlow
的后备属性将数据项发送给数据流:
// 当应用程序的内容需要刷新时集中的类
class TickHandler(
private val externalScope: CoroutineScope,
private val tickIntervalMs: Long = 5000
) {
// Backing property to avoid flow emissions from other classes
private val _tickFlow = MutableSharedFlow<Unit>(replay = 0)
val tickFlow: SharedFlow<Event<String>> = _tickFlow
init {
externalScope.launch {
while(true) {
_tickFlow.emit(Unit)
delay(tickIntervalMs)
}
}
}
}
class NewsRepository(
...,
private val tickHandler: TickHandler,
private val externalScope: CoroutineScope
) {
init {
externalScope.launch {
// Listen for tick updates
tickHandler.tickFlow.collect {
refreshLatestNews()
}
}
}
suspend fun refreshLatestNews() { ... }
...
}
您可通过以下方式自定义 SharedFlow
行为:
通过 replay
,您可以针对新订阅者重新发送多个之前已发出的值。
通过 onBufferOverflow
,您可以指定相关政策来处理缓冲区中已存满要发送的数据项的情况。默认值为 BufferOverflow.SUSPEND
,这会使调用方挂起。其他选项包括 DROP_LATEST
或 DROP_OLDEST
。
MutableSharedFlow
还具有 subscriptionCount
属性,其中包含处于活跃状态的收集器的数量,以便您相应地优化业务逻辑。MutableSharedFlow
还包含一个 resetReplayCache
函数,供您在不想重放已向数据流发送的最新信息的情况下使用。
SharedFlow
在无订阅者时会丢弃数据。SharedFlow
类似BroadcastChannel, 支持被多个订阅者订阅,可以使同一个事件会被多次消费。
相关官方文档:
https://developer.android.com/kotlin/flow
https://kotlinlang.org/docs/flow.html