解决Android Flow无法收到更新的数据的问题

使用场景

最近在做项目的时候遇到了一个需求。从DataStore获取最新的ID,并根据ID到Room数据库内获取相应的内容。由于DataStore和Room都可以通过Flow获取到最新的数据更新,因此我设想是否能通过数据驱动界面的方式,抛开以往的通知界面更新方法,直接对数据源的变动进行监听并设置界面。
需要实现以上的需求就需要对DataStore的ID进行监听的同时,还要对Room数据库内相应ID的数据进行同步监听,保证两个数据源的数据的变化都可以监听到。

Tips: 以下部分大都是问题发现与解决的步骤,符合从认识到实践的二次飞跃过程,因此想要直接看最终可用解决方案的可以直接翻到最后。

初步实现

最简单的设想是在transform方法中进行嵌套实现,代码如下:

// 使用时转换为LiveData后,在Activity中进行数据变化监听
val data = flow1.transform { id ->
    val flow2 = getFlow2(id)
    emitAll(flow2)
}.asLiveData()

实际使用以上代码时却出了大问题,A界面获取数据并初始化视图成功,但是在B界面修改数据后返回A界面后,数据更新并没有被监听到。以上的数据不更新情况还是不确定的,因为如果在B界面修改完数据后等待一段时间再返回A界面,数据更新就能够被监听到了。

初步解决方案

无法监听数据变化的情况让我抓狂了一阵子,最终“修改完数据后等待一段时间能够被监听到”还是在asLiveData方法的源码中找到了原因。源码如下:

// CoroutineLiveData.kt
internal const val DEFAULT_TIMEOUT = 5000L

// FlowLiveData.kt
@JvmOverloads
fun <T> Flow<T>.asLiveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
    collect {
        emit(it)
    }
}

好家伙,这里有个DEFAULT_TIMEOUT的超时设定。掐指一算确实只需要在B界面等待5秒后再返回A界面,仅能够成功更新界面了。因此在使用asLiveData(timeoutInMs=0)后问题貌似就解决了。那么此处的超时时间是用来做什么的呢?继续看源码:

// CoroutineLiveData.kt
internal class BlockRunner<T>(
    private val liveData: CoroutineLiveData<T>,
    private val block: Block<T>,
    private val timeoutInMs: Long,
    private val scope: CoroutineScope,
    private val onDone: () -> Unit
) {
    ··· ···
    @MainThread
    fun cancel() {
        ··· ···
        cancellationJob = scope.launch(Dispatchers.Main.immediate) {
            delay(timeoutInMs)
            if (!liveData.hasActiveObservers()) {
                // one last check on active observers to avoid any race condition between starting
                // a running coroutine and cancelation
                runningJob?.cancel()
                runningJob = null
            }
        }
    }
}

首先asLiveData中的调用的Flow的collect方法是在一个lambda函数中的,CoroutineLiveData使用BlockRunner作为collect方法所在的lambda函数的运行器,监听LifeCycle的变化,以启动与取消collect方法。
而超时时间设置的原因,简单理解一下就是,由于担心界面变化太快(例如屏幕旋转)导致BlockRunner重复的启动与取消带来的任何竞争问题,所以此处等待指定时间后再检查是否还有活动的观察器,并最后取消collect的运行。
显然,超时时间并不是造成无法收到更新数据的主要原因,因为设置这个超时时间的出发点以及功能显然是好的。
但是以上源码却给我带来了启示,因为等待时间超过5秒后取消了对于collect方法的执行,后面再重新执行collect方法就可以获取到更新的数据了。

查找根本原因

此时,我突然想起collect方法是一个suspend方法,会挂起当前运行的协程,而对数据变化进行监听的话,collect方法显然会挂起当前运行的协程。会不会就是collect方法带来的协程挂起导致了以上的问题?开始试验:

val flow1 = flow {
    emit("F1-A")
    delay(2000L)
    emit("F1-B")
    delay(2000L)
    emit("F1-C")
    delay(10000000L)
}.transform {
    val flow2 = flow {
        emit("$it-F2")
        delay(10000000L)
    }
    emitAll(flow2)
}
runBlocking {
    launch {
        flow1.collect {
            println("${System.currentTimeMillis()} 1 $it")
        }
    }
    delay(3000L)
    launch {
        flow1.collect {
            println("${System.currentTimeMillis()} 2 $it")
        }
    }
}

delay(2000L)表示数据的更新间隔,delay(10000000L)模拟长时间不关闭的监听,两个独立的collect协程表示监听的先后。最后得到的结果如下:

1608255907350 1 F1-A-F2
1608255910333 2 F1-A-F2
// 后面就卡住了

实锤了collect方法的嵌套导致协程挂起,导致了无法收到更新数据的问题。

寻找解决方案

此时我突然想到了combine方法,同样都是将两个Flow合并在一起,为什么combine方法就没有问题?查找后源码如下:

@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
    flows: Array<out Flow<T>>,
    arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
    transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
    ··· ···
        // Coroutine per flow that keeps track of its value and sends result to downstream
        launch {
            try {
                flows[i].collect { value ->
                    resultChannel.send(Update(i, value))
                    yield() // Emulate fairness, giving each flow chance to emit
                }
            }
··· ···

恍然大悟,原来combine方法是将每个对collect的监听放到单独的Scope中才能够实现对多个flow的监听与同步更新。
那么这个flowScope以及combineInternal能不能改造后为我所用呢?不行,因为全都是内部方法,所以无法调用。那么只能模仿其写一个了。

fun <T1, T2> Flow<T1>.dependCombine(transform: (T1) -> Flow<T2>) = flow {
    var oldJob: Job? = null
    collect {
        oldJob?.cancel()
        oldJob = GlobalScope.launch {
            emitAll(transform(it))
        }
    }
}

然而,最终却出错了?!!

Exception in thread "DefaultDispatcher-worker-1 @coroutine#3" java.lang.IllegalStateException: Flow invariant is violated:
        Emission from another coroutine is detected.
        Child of "coroutine#3":StandaloneCoroutine{Active}@5bbc3ad9, expected child of "coroutine#2":StandaloneCoroutine{Active}@41f88e7d.
        FlowCollector is not thread-safe and concurrent emissions are prohibited.
        To mitigate this restriction please use 'channelFlow' builder instead of 'flow'

好吧,flow中不能够运行其他的协程,所以只能使用官方建议的channelFlow了。

最终解决方案

直接放代码:

fun <T1, T2> Flow<T1>.combine(transform: suspend (T1) -> Flow<T2>): Flow<T2> =
    combineTransform(transform) { _, t2 ->
        t2
    }

fun <T1, T2, R> Flow<T1>.combineTransform(combineTransform: suspend (T1) -> Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
    channelFlow {
        var oldJob: Job? = null
        collect { t1 ->
            oldJob?.cancel()
            oldJob = launch {
                combineTransform(t1).collect { t2 ->
                    send(transform(t1, t2))
                }
            }
        }
        awaitClose {
            oldJob?.cancel()
        }
    }

通过channelFlow即可正确的解决问题了
以上只是为本人的解决方案,并不代表只能这么解决,如果有更好的方法望不吝赐教。

One more thing

对于一个需要在多处被collect的Flow,与shareIn方法一起使用效果更佳。


Made By XFY9326

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容