Kotlin Flow

在协程中,与仅返回单个值的挂起函数不同,flow可按顺序发出多个值。例如,可以使用flow接收来自数据库的实时更新。

flow在协程的构建基础上,可以提供多值返回。从概念上来说,flow可以通过异步方式处理一组数据序列。前提是所发出的值的类型必须相同。例如,Flow<Int>是返回整数值数据流。

flow与生成一组序列值的Iterator非常相似,但它使用挂起函数通过异步方式生成和消费这个值。

flow包括三个实体:

  • Producer: 会生成添加到数据流中的数据。得益于协程,flow还可以异步产生数据。
  • (Optional)Intermediary: 可以修改发送到flow中的值,或修正flow本身
  • Consumer: 使用flow中的值。
图 1. 数据流中包含的实体:使用方、可选中介和提供方。

在Android中,仓库(repository)通常是UI数据的提供方,UI是其数据的最终使用方。而其他时候,UI层是用户输入事件的提供方,其他层则是这些事件的使用方。提供方和使用方之间的层通常被称作中介,负责修改数据流,以满足其后层的要求。

创建Flow

如需创建flow,可以使用flow 构造器API。flow构造函数会创建一个新的flow,可以使用emit函数手动将新值发送到flow中。

如以下示例,数据源以固定时间间隔自动获取最新资讯。由于挂起函数不能返回多个连续值,数据源将创建返回flow来满足要求。

class NewsRemoteDataSource(
    private val newsApi: NewsApi,
    private val refreshIntervalMs: Long = 5000
) {
    val latestNews: Flow<List<ArticleHeadline>> = flow {
        while(true) {
            val latestNews = newsApi.fetchLatestNews()
            emit(latestNews) // Emits the result of the request to the flow
            delay(refreshIntervalMs) // Suspends the coroutine for some time
        }
    }
}

// Interface that provides a way to make network requests with suspend functions
interface NewsApi {
    suspend fun fetchLatestNews(): List<ArticleHeadline>
}

flow 构造器在协程内执行。因此,它将受益于相同异步API,但也存在一些限制:

  • flow是有序的。当协程内的Producer调用挂起函数时,Producer会挂起,直到挂起函数返回。在此示例中,Producer会挂起,直到fetchLatestNews网络请求完成为止。只有这样请求结果才会发送到flow中。
  • 使用flow构造器时,Producer不能提供来自不同CoroutineContextemit值。因此,请勿通过创建新协程或使用withContext代码块,在不同CoroutineContext中调用emit。在这些情况下,可以使用其他flow构造器,例如callbackFlow

修改flow

Intermediary可以利用中间运算符在不消费值的情况下修改数据流。这些运算符都是函数。可在应用于数据库时,设置一系列暂不执行的链式运算,留待将来使用值时执行。如需详细了解中间运算符,请参阅Flow参考文档

在以下示例中,存储层使用中间运算符map来转换将在View上显示的数据:

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData
) {
    /**
     * Returns the favorite latest news applying transformations on the flow.
     * These operations are lazy and don't trigger the flow. They just transform
     * the current value emitted by the flow at that point in time.
     */
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            // Intermediate operation to filter the list of favorite topics
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            // Intermediate operation to save the latest news in the cache
            .onEach { news -> saveInCache(news) }
}

中间运算符可以连接多个,形成链式运算,在数据项被发送到数据流时延迟执行。请注意,仅将一个中间运行符应用于数据流不会启动flow。

从Flow中进行收集

使用终端运算符可触发flow开始监听流的值。如需获取流中所有发出来的值,可以使用collect。如需详细了解终端运算符,请参阅官方Flow文档

由于collect是挂起函数,因此需要在协程中执行。它接受lambda作为在每个新值上调用的参数。由于它是挂起函数,调用collect的协程可能会挂起,直到该flow关闭。

继续之前的示例,下面将展示一个简单的ViewModel实现,展示其如何使用存储库层中的数据:

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            // Trigger the flow and consume its elements using collect
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                // Update View with the latest favorite news
            }
        }
    }
}

收集数据流会触发提供方刷新最新资讯,并以固定时间间隔发出网络请求。由于提供主始终通过while(true)循环保持活跃状态,因此,在清除ViewModel并取消viewModelScope数据流后,数据流将关闭。

Flow收集可能会由于以下原因而停止:

  • 如上例所示,协程收集取消。些操作也会底层Producer停止活动。
  • Producer完成了发送数据操作。在这种情况下,数据流将关闭,调用collect的协程继续执行。

除非使用其他中间运算符指定流,否则Flow始终为冷式和延迟执行。这意味着,每次在flow上调用终端运算符时,都会执行Producer方的代码。在前面示例中,拥有多个flow收集器会导致数据源以不同的固定时间间隔多次获取最新资讯。如需在多个使用方同时收集优化并共享数据流,请使用shareIn运算符。

捕获异常

Producer的数据实现可来自第三方库。这意味着可能会引发异常。如需处理这些异常,请使用catch中间运算符。

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Intermediate catch operator. If an exception is thrown,
                // catch and update the UI
                .catch { exception -> notifyError(exception) }
                .collect { favoriteNews ->
                    // Update View with the latest favorite news
                }
        }
    }
}

在之前的示例中,发生异常时,系统不会调用collect的lambda参数,因为未收到新数据项。catch还可执行emit操作,向flow发出数据。示例如下:

class NewsRepository(...) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> news.filter { userData.isFavoriteTopic(it) } }
            .onEach { news -> saveInCache(news) }
            // If an error happens, emit the last cached values
            .catch { exception -> emit(lastCachedNews()) }
}

在不同的CoroutineContext中执行

默认情况下,flow构造器的producer会通过从协程的CoroutineContext上执行,并且无法从不同的CoroutineContext对值执行emit操作。在某些情况下,可以跳出这个限制。如上示例代码中,存储层不应在viewModelScope所使用的Dispatchers.Main上执行。如需更改flowCoroutineContext,请使用中间运算符flowOnflowOn会更改上流数据流的CoroutineContext,这表示会在flowOn之前(或之上)producer以及任何中间运行符都会在传入的这个CoroutineContext上执行。下游数据不会受到影响。如果有多个flowOn运算符,每个运算符都会更改当前位置的上流数据流。

class NewsRepository(
    private val newsRemoteDataSource: NewsRemoteDataSource,
    private val userData: UserData,
    private val defaultDispatcher: CoroutineDispatcher
) {
    val favoriteLatestNews: Flow<List<ArticleHeadline>> =
        newsRemoteDataSource.latestNews
            .map { news -> // Executes on the default dispatcher
                news.filter { userData.isFavoriteTopic(it) }
            }
            .onEach { news -> // Executes on the default dispatcher
                saveInCache(news)
            }
            // flowOn affects the upstream flow ↑
            .flowOn(defaultDispatcher)
            // the downstream flow ↓ is not affected
            .catch { exception -> // Executes in the consumer's context
                emit(lastCachedNews())
            }
}

Jetpack库中的Flow

许多的Jetpack库已集成了flow,并且在Android第三方库中也非常受欢迎。flow非常适合实时数据更新和无限数据流。

比如使用Flow with Room接收有关数据库更改的通知。在使用Room DAO时,返回flow类型以获取实时更新。

@Dao
abstract class ExampleDao {
    @Query("SELECT * FROM Example")
    abstract fun getExamples(): Flow<List<Example>>
}

每当Example数据表发生更改时,系统都会发出数据库更新的列表。

将基于回调的API转换为数据流

callbackFlow是一个flow构造器,允许将基于回调的API转换为数据流。如:Firebase Firestore Android API 会使用回调。

注意:从 24.3.0 版开始,firestore-ktx 包含返回 Flowsnapshots() 扩展,因此您无需自行针对此特定 API 执行这一转换。

如需将这些 API 转换为数据流并监听 Firestore 数据库的更新,可使用以下代码:

class FirestoreUserEventsDataSource(
    private val firestore: FirebaseFirestore
) {
    // Method to get user events from the Firestore database
    fun getUserEvents(): Flow<UserEvents> = callbackFlow {

        // Reference to use in Firestore
        var eventsCollection: CollectionReference? = null
        try {
            eventsCollection = FirebaseFirestore.getInstance()
                .collection("collection")
                .document("app")
        } catch (e: Throwable) {
            // If Firebase cannot be initialized, close the stream of data
            // flow consumers will stop collecting and the coroutine will resume
            close(e)
        }

        // Registers callback to firestore, which will be called on new events
        val subscription = eventsCollection?.addSnapshotListener { snapshot, _ ->
            if (snapshot == null) { return@addSnapshotListener }
            // Sends events to the flow! Consumers will get the new events
            try {
                offer(snapshot.getEvents())
            } catch (e: Throwable) {
                // Event couldn't be sent to the flow
            }
        }

        // The callback inside awaitClose will be executed when the flow is
        // either closed or cancelled.
        // In this case, remove the callback from Firestore
        awaitClose { subscription?.remove() }
    }
}

flow构建器,callbackFlow允许从不同的CoroutineContext替换为send函数或协程之外的trySend函数。

在协程内部,callbackFlow 会使用通道,它在概念上与阻塞队列非常相似。通道都有容量配置,限定了可缓冲元素数的上限。在 callbackFlow 中所创建通道的默认容量为 64 个元素。当您尝试向完整通道添加新元素时,send 会将数据提供方挂起,直到新元素有空间为止,而 offer 不会将相关元素添加到通道中,并会立即返回 false

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容