RxJava
subscribeOn()和observeOn()的区别
subscribeOn()和observeOn()都是用来切换线程用的
- subscribeOn()改变调用它之前(及之后,如果前面没有指定observeOn的话)代码的线程
- observeOn()改变调用它之后代码的线程
- doOnSubscribe()之后如果指定了subscribeOn,doOnSubscribe就在指定的线程上运行,如果没指定就运行在subscribe方法被调用时的线程上。
- doOnSubscribe()之前的subscribeOn()只有第一次使用的会生效。
Coroutine
协程是采用就是并发的设计模式,这句话的大多数环境下是没有问题。但是,如果某个协程满足以下几点,那它里面的子协程将会是同步执行的:
- 父协程的协程调度器是处于Dispatchers.Main情况下启动。
- 同时子协程在不修改协程调度器下的情况下启动。
private fun start() {
GlobalScope.launch(Dispatchers.Main) {
for (index in 1 until 10) {
//同步执行
launch {
Log.d("launch$index", "启动一个协程")
}
}
}
}
Android平台上如果协程处于Dispatchers.Main调度器,它会将协程调度到UI事件循环中执行。
如果其中的某一个子协程将他的协程调度器修改为非Dispatchers.Main,那么这个子协程将会与其他子协程并发执行。
协程状态
父协程需要等待所有的子协程执行完毕之后才会进入Completed状态,不管父协程自身的协程体是否已经执行完成。
协程调度器
CoroutineDispatcher
确定了相关的协程在哪个线程或哪些线程上执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。
-
withContext
顶级函数,使用withContext函数来改变协程的上下文,而仍然驻留在相同的协程中,同时withContext还携带有一个泛型T返回值。
协程上下文
- CoroutineContext即协程上下文。它是一个包含了用户定义的一些各种不同元素的Element对象集合。其中主要元素是Job、协程调度器CoroutineDispatcher、还有包含协程异常CoroutineExceptionHandler、拦截器ContinuationInterceptor、协程名CoroutineName等。这些数据都是和协程密切相关的,每一个Element都一个唯一key。
- 子协程会继承父协程的协程上下文中的Element,如果自身有相同key的成员,则覆盖对应的key,覆盖的效果仅限自身范围内有效。
协程启动模式
CoroutineStart协程启动模式,是启动协程时需要传入的第二个参数。协程启动有4种:
- DEFAULT 默认启动模式,我们可以称之为饿汉启动模式,因为协程创建后立即开始调度,虽然是立即调度,单不是立即执行,有可能在执行前被取消。
- LAZY 懒汉启动模式,启动后并不会有任何调度行为,直到我们需要它执行的时候才会产生调度。也就是说只有我们主动的调用Job的start、join或者await等函数时才会开始调度。
- ATOMIC 一样也是在协程创建后立即开始调度,但是它和DEFAULT模式有一点不一样,通过ATOMIC模式启动的协程执行到第一个挂起点之前是不响应cancel 取消操作的,ATOMIC一定要涉及到协程挂起后cancel 取消操作的时候才有意义。
- UNDISPATCHED 协程在这种模式下会直接开始在当前线程下执行,直到运行到第一个挂起点。这听起来有点像 ATOMIC,不同之处在于UNDISPATCHED是不经过任何调度器就开始执行的。当然遇到挂起点之后的执行,将取决于挂起点本身的逻辑和协程上下文中的调度器。
可以总结一下,当以UNDISPATCHED启动时:
- 无论我们是否指定协程调度器,挂起前的执行都是在当前线程下执行。
- 如果所在的协程没有指定调度器,那么就会在join处恢复执行的线程里执行,即我们上述案例中的挂起后的执行是在main线程中执行。
- 当我们指定了协程调度器时,遇到挂起点之后的执行将取决于挂起点本身的逻辑和协程上下文中的调度器。即join处恢复执行时,因为所在的协程有调度器,所以后面的执行将会在调度器对应的线程上执行。
协程作用域
协程作用域分为三种:
- 顶级作用域 --> 没有父协程的协程所在的作用域称之为顶级作用域。
- 协同作用域 --> 在协程中启动一个协程,新协程为所在协程的子协程。子协程所在的作用域默认为协同作用域。此时子协程抛出未捕获的异常时,会将异常传递给父协程处理,如果父协程被取消,则所有子协程同时也会被取消。
- 主从作用域 官方称之为监督作用域。与协同作用域一致,区别在于该作用域下的协程取消操作的单向传播性,子协程的异常不会导致其它子协程取消。但是如果父协程被取消,则所有子协程同时也会被取消。
Flow
-
flowOn
只影响前面没有自己上下文的操作符。 - 不管
flowOn
如何切换线程,collect
始终运行在调用它的协程调度器上。
Flow的常用操作符
- 流程操作符:
onStart
/onEach
/onCompletion
- 异常操作符:
catch
- 转换操作符:
transform
/map
/filter
/flatMapConcat
/flatmapMerge
/flatMapLatest
- 限制操作符:
drop
/take
- 末端操作符:
collect
/toList
/single
等
onCompletion
:在流程完成或取消后调用,并将取消异常或失败作为操作的原因参数传递。
transform
操作符任意值任意次,其他转换操作符都是基于transform进行扩展的。
Flow的缓冲
buffer
操作符,场景:上游生产速度比下游消费速度快。使用buffer缓存上游发射的数据,下游可以直接取用。压缩了整个流的处理时长。
Flow的内存泄漏
Flow无法像LiveData那样感知生命周期。
感知生命周期为LiveData至少带来两个好处:
避免泄漏:当 lifecycleOwner 进入 DESTROYED 时,会自动删除 Observer
节省资源:当 lifecycleOwner 进入 STARTED 时才开始接受数据,避免 UI 处于后台时的无效计算。
lifecycleScope
lifecycleOwner.lifecycleScope 扩展,可以在当前 Activity 或 Fragment 销毁时结束此协程,防止泄露。
Flow 也是运行在协程中的,lifecycleScope 可以帮助 Flow 解决内存泄露的问题:
lifecycleScope.launch {
viewMode.stateFlow.collect {
updateUI(it)
}
}
虽然解决了内存泄漏问题, 但是 lifecycleScope.launch 会立即启动协程,之后一直运行直到协程销毁,无法像 LiveData 仅当 UI 处于前台才执行,对资源的浪费比较大。
因此,lifecycle-runtime-ktx 又为我们提供了 LaunchWhenStarted 和 LaunchWhenResumed ( 下文统称为 LaunchWhenX )
launchWhenX 的利与弊
LaunchWhenX 会在 lifecycleOwner 进入 X 状态之前一直等待,又在离开 X 状态时挂起协程。lifecycleScope + launchWhenX 的组合终于使 Flow 有了与 LiveData 相媲美的生命周期可感知能力:
避免泄露:当 lifecycleOwner 进入 DESTROYED 时, lifecycleScope 结束协程
节省资源:当 lifecycleOwner 进入 STARTED/RESUMED 时 launchWhenX 恢复执行,否则挂起。
但对于 launchWhenX 来说, 当 lifecycleOwner 离开 X 状态时,协程只是挂起协程而非销毁,如果用这个协程来订阅 Flow,就意味着虽然 Flow 的收集暂停了,但是上游的处理仍在继续,资源浪费的问题解决地不够彻底。
即使在 launchWhenX 中订阅 Flow 仍然是不够的,无法完全避免资源的浪费。
解决办法:repeatOnLifecycle
lifecycle-runtime-ktx 自 2.4.0-alpha01 起,提供了一个新的协程构造器 lifecyle.repeatOnLifecycle, 它在离开 X 状态时销毁协程,再进入 X 状态时再启动协程。从其命名上也可以直观地认识这一点,即围绕某生命周期的进出反复启动新协程。
使用 repeatOnLifecycle 可以弥补上述 launchWhenX 对协程仅挂起而不销毁的弊端。因此,正确订阅 Flow 的写法应该如下(以在 Fragment 中为例):
onCreateView(...) {
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.lifecycle.repeatOnLifecycle(STARTED) {
viewMode.stateFlow.collect { ... }
}
}
}
当 Fragment 处于 STARTED 状态时会开始收集数据,并且在 RESUMED 状态时保持收集,最终在 Fragment 进入 STOPPED 状态时结束收集过程。
需要注意 repeatOnLifecycle 本身是个挂起函数,一旦被调用,将走不到后续代码,除非 lifecycle 进入 DESTROYED。
最后:Flow.flowWithLifecycle
当我们只有一个 Flow 需要收集时,可以使用 flowWithLifecycle 这样一个 Flow 操作符的形式来简化代码
lifecycleScope.launch {
viewMode.stateFlow
.flowWithLifecycle(this, Lifecycle.State.STARTED)
.collect { ... }
}
当然,其本质还是对 repeatOnLifecycle 的封装:
public fun <T> Flow<T>.flowWithLifecycle(
lifecycle: Lifecycle,
minActiveState: Lifecycle.State = Lifecycle.State.STARTED
): Flow<T> = callbackFlow {
lifecycle.repeatOnLifecycle(minActiveState) {
this@flowWithLifecycle.collect {
send(it)
}
}
close()
}
MMM
flatMapConcat
下游阻塞上游的数据发射,所有数据串行发射。flatMapMerge
下游可以并发地收集上游数据。flatMapLatest
上游发射的数据被buffer收集发送给下游,下游接收到上游的数据时如果下游处于block状态,下游的block将被取消。