在协程中,与仅返回单个值的挂起函数不同,flow
可按顺序发出多个值。例如,可以使用flow
接收来自数据库的实时更新。
flow
在协程的构建基础上,可以提供多值返回。从概念上来说,flow
可以通过异步方式处理一组数据序列。前提是所发出的值的类型必须相同。例如,Flow<Int>
是返回整数值数据流。
flow
与生成一组序列值的Iterator
非常相似,但它使用挂起函数通过异步方式生成和消费这个值。
flow
包括三个实体:
-
Producer: 会生成添加到数据流中的数据。得益于协程,
flow
还可以异步产生数据。 -
(Optional)Intermediary: 可以修改发送到
flow
中的值,或修正flow
本身 -
Consumer: 使用
flow
中的值。
在Android中,仓库(repository)通常是UI数据的提供方,UI是其数据的最终使用方。而其他时候,UI层是用户输入事件的提供方,其他层则是这些事件的使用方。提供方和使用方之间的层通常被称作中介,负责修改数据流,以满足其后层的要求。
创建Flow
如需创建flow
,可以使用flow 构造器API。flow
构造函数会创建一个新的flow
,可以使用emit
函数手动将新值发送到flow中。
如以下示例,数据源以固定时间间隔自动获取最新资讯。由于挂起函数不能返回多个连续值,数据源将创建返回flow
来满足要求。
class NewsRemoteDataSource(
private val newsApi: NewsApi,
private val refreshIntervalMs: Long = 5000
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
while(true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews) // Emits the result of the request to the flow
delay(refreshIntervalMs) // Suspends the coroutine for some time
}
}
}
// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
suspend fun fetchLatestNews(): List<ArticleHeadline>
}
flow
构造器在协程内执行。因此,它将受益于相同异步API,但也存在一些限制:
-
flow
是有序的。当协程内的Producer
调用挂起函数时,Producer
会挂起,直到挂起函数返回。在此示例中,Producer
会挂起,直到fetchLatestNews
网络请求完成为止。只有这样请求结果才会发送到flow中。 - 使用
flow
构造器时,Producer
不能提供来自不同CoroutineContext
的emit
值。因此,请勿通过创建新协程或使用withContext
代码块,在不同CoroutineContext
中调用emit
。在这些情况下,可以使用其他flow
构造器,例如callbackFlow。
修改flow
Intermediary
可以利用中间运算符在不消费值的情况下修改数据流。这些运算符都是函数。可在应用于数据库时,设置一系列暂不执行的链式运算,留待将来使用值时执行。如需详细了解中间运算符,请参阅Flow参考文档。
在以下示例中,存储层使用中间运算符map
来转换将在View上显示的数据:
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData
) {
/**
* Returns the favorite latest news applying transformations on the flow.
* These operations are lazy and don't trigger the flow. They just transform
* the current value emitted by the flow at that point in time.
*/
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
// Intermediate operation to filter the list of favorite topics
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
// Intermediate operation to save the latest news in the cache
.onEach { news -> saveInCache(news) }
}
中间运算符可以连接多个,形成链式运算,在数据项被发送到数据流时延迟执行。请注意,仅将一个中间运行符应用于数据流不会启动flow。
从Flow中进行收集
使用终端运算符可触发flow
开始监听流的值。如需获取流中所有发出来的值,可以使用collect。如需详细了解终端运算符,请参阅官方Flow文档。
由于collect
是挂起函数,因此需要在协程中执行。它接受lambda作为在每个新值上调用的参数。由于它是挂起函数,调用collect
的协程可能会挂起,直到该flow
关闭。
继续之前的示例,下面将展示一个简单的ViewModel
实现,展示其如何使用存储库层中的数据:
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
// Trigger the flow and consume its elements using collect
newsRepository.favoriteLatestNews.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
收集数据流会触发提供方刷新最新资讯,并以固定时间间隔发出网络请求。由于提供主始终通过while(true)
循环保持活跃状态,因此,在清除ViewModel
并取消viewModelScope
数据流后,数据流将关闭。
Flow
收集可能会由于以下原因而停止:
- 如上例所示,协程收集取消。些操作也会底层
Producer
停止活动。 -
Producer
完成了发送数据操作。在这种情况下,数据流将关闭,调用collect
的协程继续执行。
除非使用其他中间运算符指定流,否则Flow
始终为冷式和延迟执行。这意味着,每次在flow
上调用终端运算符时,都会执行Producer
方的代码。在前面示例中,拥有多个flow
收集器会导致数据源以不同的固定时间间隔多次获取最新资讯。如需在多个使用方同时收集优化并共享数据流,请使用shareIn
运算符。
捕获异常
Producer
的数据实现可来自第三方库。这意味着可能会引发异常。如需处理这些异常,请使用catch
中间运算符。
class LatestNewsViewModel(
private val newsRepository: NewsRepository
) : ViewModel() {
init {
viewModelScope.launch {
newsRepository.favoriteLatestNews
// Intermediate catch operator. If an exception is thrown,
// catch and update the UI
.catch { exception -> notifyError(exception) }
.collect { favoriteNews ->
// Update View with the latest favorite news
}
}
}
}
在之前的示例中,发生异常时,系统不会调用collect
的lambda参数,因为未收到新数据项。catch
还可执行emit
操作,向flow
发出数据。示例如下:
class NewsRepository(...) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> news.filter { userData.isFavoriteTopic(it) } }
.onEach { news -> saveInCache(news) }
// If an error happens, emit the last cached values
.catch { exception -> emit(lastCachedNews()) }
}
在不同的CoroutineContext
中执行
默认情况下,flow
构造器的producer会通过从协程的CoroutineContext
上执行,并且无法从不同的CoroutineContext
对值执行emit
操作。在某些情况下,可以跳出这个限制。如上示例代码中,存储层不应在viewModelScope
所使用的Dispatchers.Main
上执行。如需更改flow
的CoroutineContext
,请使用中间运算符flowOn
。flowOn
会更改上流数据流的CoroutineContext
,这表示会在flowOn
之前(或之上)producer
以及任何中间运行符都会在传入的这个CoroutineContext
上执行。下游数据不会受到影响。如果有多个flowOn
运算符,每个运算符都会更改当前位置的上流数据流。
class NewsRepository(
private val newsRemoteDataSource: NewsRemoteDataSource,
private val userData: UserData,
private val defaultDispatcher: CoroutineDispatcher
) {
val favoriteLatestNews: Flow<List<ArticleHeadline>> =
newsRemoteDataSource.latestNews
.map { news -> // Executes on the default dispatcher
news.filter { userData.isFavoriteTopic(it) }
}
.onEach { news -> // Executes on the default dispatcher
saveInCache(news)
}
// flowOn affects the upstream flow ↑
.flowOn(defaultDispatcher)
// the downstream flow ↓ is not affected
.catch { exception -> // Executes in the consumer's context
emit(lastCachedNews())
}
}
Jetpack库中的Flow
许多的Jetpack库已集成了flow
,并且在Android第三方库中也非常受欢迎。flow
非常适合实时数据更新和无限数据流。
比如使用Flow with Room接收有关数据库更改的通知。在使用Room DAO时,返回flow
类型以获取实时更新。
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
每当Example数据表发生更改时,系统都会发出数据库更新的列表。
将基于回调的API转换为数据流
callbackFlow
是一个flow
构造器,允许将基于回调的API转换为数据流。如:Firebase Firestore Android API 会使用回调。
⭐注意:从 24.3.0 版开始,firestore-ktx 包含返回
Flow
的snapshots()
扩展,因此您无需自行针对此特定 API 执行这一转换。
如需将这些 API 转换为数据流并监听 Firestore 数据库的更新,可使用以下代码:
class FirestoreUserEventsDataSource(
private val firestore: FirebaseFirestore
) {
// Method to get user events from the Firestore database
fun getUserEvents(): Flow<UserEvents> = callbackFlow {
// Reference to use in Firestore
var eventsCollection: CollectionReference? = null
try {
eventsCollection = FirebaseFirestore.getInstance()
.collection("collection")
.document("app")
} catch (e: Throwable) {
// If Firebase cannot be initialized, close the stream of data
// flow consumers will stop collecting and the coroutine will resume
close(e)
}
// Registers callback to firestore, which will be called on new events
val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
if (snapshot == null) { return@addSnapshotListener }
// Sends events to the flow! Consumers will get the new events
try {
offer(snapshot.getEvents())
} catch (e: Throwable) {
// Event couldn't be sent to the flow
}
}
// The callback inside awaitClose will be executed when the flow is
// either closed or cancelled.
// In this case, remove the callback from Firestore
awaitClose { subscription?.remove() }
}
}
与flow
构建器,callbackFlow
允许从不同的CoroutineContext
替换为send
函数或协程之外的trySend
函数。
在协程内部,callbackFlow
会使用通道,它在概念上与阻塞队列非常相似。通道都有容量配置,限定了可缓冲元素数的上限。在 callbackFlow
中所创建通道的默认容量为 64 个元素。当您尝试向完整通道添加新元素时,send
会将数据提供方挂起,直到新元素有空间为止,而 offer
不会将相关元素添加到通道中,并会立即返回 false
。