使用场景
最近在做项目的时候遇到了一个需求。从DataStore获取最新的ID,并根据ID到Room数据库内获取相应的内容。由于DataStore和Room都可以通过Flow获取到最新的数据更新,因此我设想是否能通过数据驱动界面的方式,抛开以往的通知界面更新方法,直接对数据源的变动进行监听并设置界面。
需要实现以上的需求就需要对DataStore的ID进行监听的同时,还要对Room数据库内相应ID的数据进行同步监听,保证两个数据源的数据的变化都可以监听到。
Tips: 以下部分大都是问题发现与解决的步骤,符合从认识到实践的二次飞跃过程,因此想要直接看最终可用解决方案的可以直接翻到最后。
初步实现
最简单的设想是在transform方法中进行嵌套实现,代码如下:
// 使用时转换为LiveData后,在Activity中进行数据变化监听
val data = flow1.transform { id ->
val flow2 = getFlow2(id)
emitAll(flow2)
}.asLiveData()
实际使用以上代码时却出了大问题,A界面获取数据并初始化视图成功,但是在B界面修改数据后返回A界面后,数据更新并没有被监听到。以上的数据不更新情况还是不确定的,因为如果在B界面修改完数据后等待一段时间再返回A界面,数据更新就能够被监听到了。
初步解决方案
无法监听数据变化的情况让我抓狂了一阵子,最终“修改完数据后等待一段时间能够被监听到”还是在asLiveData方法的源码中找到了原因。源码如下:
// CoroutineLiveData.kt
internal const val DEFAULT_TIMEOUT = 5000L
// FlowLiveData.kt
@JvmOverloads
fun <T> Flow<T>.asLiveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT
): LiveData<T> = liveData(context, timeoutInMs) {
collect {
emit(it)
}
}
好家伙,这里有个DEFAULT_TIMEOUT的超时设定。掐指一算确实只需要在B界面等待5秒后再返回A界面,仅能够成功更新界面了。因此在使用asLiveData(timeoutInMs=0)后问题貌似就解决了。那么此处的超时时间是用来做什么的呢?继续看源码:
// CoroutineLiveData.kt
internal class BlockRunner<T>(
private val liveData: CoroutineLiveData<T>,
private val block: Block<T>,
private val timeoutInMs: Long,
private val scope: CoroutineScope,
private val onDone: () -> Unit
) {
··· ···
@MainThread
fun cancel() {
··· ···
cancellationJob = scope.launch(Dispatchers.Main.immediate) {
delay(timeoutInMs)
if (!liveData.hasActiveObservers()) {
// one last check on active observers to avoid any race condition between starting
// a running coroutine and cancelation
runningJob?.cancel()
runningJob = null
}
}
}
}
首先asLiveData中的调用的Flow的collect方法是在一个lambda函数中的,CoroutineLiveData使用BlockRunner作为collect方法所在的lambda函数的运行器,监听LifeCycle的变化,以启动与取消collect方法。
而超时时间设置的原因,简单理解一下就是,由于担心界面变化太快(例如屏幕旋转)导致BlockRunner重复的启动与取消带来的任何竞争问题,所以此处等待指定时间后再检查是否还有活动的观察器,并最后取消collect的运行。
显然,超时时间并不是造成无法收到更新数据的主要原因,因为设置这个超时时间的出发点以及功能显然是好的。
但是以上源码却给我带来了启示,因为等待时间超过5秒后取消了对于collect方法的执行,后面再重新执行collect方法就可以获取到更新的数据了。
查找根本原因
此时,我突然想起collect方法是一个suspend方法,会挂起当前运行的协程,而对数据变化进行监听的话,collect方法显然会挂起当前运行的协程。会不会就是collect方法带来的协程挂起导致了以上的问题?开始试验:
val flow1 = flow {
emit("F1-A")
delay(2000L)
emit("F1-B")
delay(2000L)
emit("F1-C")
delay(10000000L)
}.transform {
val flow2 = flow {
emit("$it-F2")
delay(10000000L)
}
emitAll(flow2)
}
runBlocking {
launch {
flow1.collect {
println("${System.currentTimeMillis()} 1 $it")
}
}
delay(3000L)
launch {
flow1.collect {
println("${System.currentTimeMillis()} 2 $it")
}
}
}
delay(2000L)表示数据的更新间隔,delay(10000000L)模拟长时间不关闭的监听,两个独立的collect协程表示监听的先后。最后得到的结果如下:
1608255907350 1 F1-A-F2
1608255910333 2 F1-A-F2
// 后面就卡住了
实锤了collect方法的嵌套导致协程挂起,导致了无法收到更新数据的问题。
寻找解决方案
此时我突然想到了combine方法,同样都是将两个Flow合并在一起,为什么combine方法就没有问题?查找后源码如下:
@PublishedApi
internal suspend fun <R, T> FlowCollector<R>.combineInternal(
flows: Array<out Flow<T>>,
arrayFactory: () -> Array<T?>?, // Array factory is required to workaround array typing on JVM
transform: suspend FlowCollector<R>.(Array<T>) -> Unit
): Unit = flowScope { // flow scope so any cancellation within the source flow will cancel the whole scope
··· ···
// Coroutine per flow that keeps track of its value and sends result to downstream
launch {
try {
flows[i].collect { value ->
resultChannel.send(Update(i, value))
yield() // Emulate fairness, giving each flow chance to emit
}
}
··· ···
恍然大悟,原来combine方法是将每个对collect的监听放到单独的Scope中才能够实现对多个flow的监听与同步更新。
那么这个flowScope以及combineInternal能不能改造后为我所用呢?不行,因为全都是内部方法,所以无法调用。那么只能模仿其写一个了。
fun <T1, T2> Flow<T1>.dependCombine(transform: (T1) -> Flow<T2>) = flow {
var oldJob: Job? = null
collect {
oldJob?.cancel()
oldJob = GlobalScope.launch {
emitAll(transform(it))
}
}
}
然而,最终却出错了?!!
Exception in thread "DefaultDispatcher-worker-1 @coroutine#3" java.lang.IllegalStateException: Flow invariant is violated:
Emission from another coroutine is detected.
Child of "coroutine#3":StandaloneCoroutine{Active}@5bbc3ad9, expected child of "coroutine#2":StandaloneCoroutine{Active}@41f88e7d.
FlowCollector is not thread-safe and concurrent emissions are prohibited.
To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
好吧,flow中不能够运行其他的协程,所以只能使用官方建议的channelFlow了。
最终解决方案
直接放代码:
fun <T1, T2> Flow<T1>.combine(transform: suspend (T1) -> Flow<T2>): Flow<T2> =
combineTransform(transform) { _, t2 ->
t2
}
fun <T1, T2, R> Flow<T1>.combineTransform(combineTransform: suspend (T1) -> Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> =
channelFlow {
var oldJob: Job? = null
collect { t1 ->
oldJob?.cancel()
oldJob = launch {
combineTransform(t1).collect { t2 ->
send(transform(t1, t2))
}
}
}
awaitClose {
oldJob?.cancel()
}
}
通过channelFlow即可正确的解决问题了
以上只是为本人的解决方案,并不代表只能这么解决,如果有更好的方法望不吝赐教。
One more thing
对于一个需要在多处被collect的Flow,与shareIn方法一起使用效果更佳。
Made By XFY9326