kotlin flow
- flow (阻塞)
fun flow01(){
runBlocking {
val flow = flowOf(5, 4, 3, 2, 1).onEach{
delay(2000)
}
flow.collect {
println(it)
}
}
}
suspend fun flow02(){
val flow = flowOf(5, 4, 3, 2, 1).onEach{
delay(2000)
}
flow.collect {
println(it)
}
}
- measureTimeMillis
suspend fun flow03(){
val time = measureTimeMillis {
flow<Any> {
for (num in 1 until 5) {
delay(2000)
emit(num)
}
}.collect {
delay(500)
println(it)
}
}
}
- 切换线程
suspend fun flow04(){
val time = measureTimeMillis {
flow<Any> {
for (num in 1 until 5) {
delay(2000)
emit(num)
}
}.flowOn(Dispatchers.IO).collect {
println(it)
}
}
}
- 取消flow
如果flow是在一个挂起函数内被挂起了,那么flow可以被取消,否则不能取消
suspend fun flow05(){
val time = measureTimeMillis {
withTimeoutOrNull(2000){
flow<Any> {
for (num in 1 until 5) {
delay(2000)
emit(num)
}
}.collect {
println(it)
}
}
}
}
- 终端操作符 小结
* collect
* single/first
* toList/toSet/toCollection
* count
* fold/reduce
* launchIn/produceIn/broadcastIn
- flow 生命周期
suspend fun flow06(){
(1 until 5).asFlow().onEach {
if (it == 4) throw RuntimeException("Error on $it")
}.onStart { println("start flow") }
.onEach { println(it) }
.catch { println(it.localizedMessage.toString()) }
.onCompletion { println(" completion") }
.collect { println(it.toString()) }
}
- imperative 通过try...finally实现
suspend fun flow07(){
runBlocking {
try {
flow<Any> {
for (num in 1 until 6) {
delay(2000)
emit(num)
}
}.collect {
println(it)
}
}
finally {
println("done")
}
}
}
- declarative 通过onCompletion实现
suspend fun flow08(){
runBlocking {
flow {
for (num in 1 until 6) {
delay(2000)
emit(num)
}
}.onCompletion { println("done") }
.collect {
println(it)
}
}
}
- flow 的缓存策略
* MISSING 没有指定策略,,不会通过OnNext发射的数据做缓存和丢弃
*
* ERROR:如果放入flowable的异步缓存池中的数据超限,就会抛出异常
*
* BUFFER:对应即将指定的.buffer(),没有固定大小,可以无限制添加数据,不会抛出异常,但是会oom。
* 另外:buffer可以并发的执行任务,它是floeOn之外的另一种方式,只是不能显示的指定Dispatcher
*
* DROP:异步缓存池满了,就会丢弃即将放入缓存池中的数据。
*
* LATEST:对应即将指定的.conflate(),异步缓存池满了,
* 就会丢弃即将放入缓存池中的数据,不同于DROP的是:LATEST会将最后一条数据强行放入缓存池。
- flow实现并行 通过flatMapMerge
suspend fun flow09(){
val elements = arrayListOf<Int>()
for (num in 0 until 100) {
elements.add(num)
}
elements.asFlow().flatMapMerge {
flow {
emit(it)
}.flowOn(Dispatchers.IO)
}.collect {
println(it)
}
}
- 转换操作符 transform
transform ,可以无限制的调用emit,这是它跟map最大的区别,它还可以使用emit发射所有你想要发射的内容
suspend fun flow10(){
(1 until 5).asFlow().transform {
emit(it *2)
delay(2000)
emit(it * it)
}
.collect {
println(it)
}
}
- 限制发射符 take,只取指定的前几个emit发射的值,其它不管
suspend fun flow11(){
(1 until 5).asFlow()
.take(3)
.collect {
println(it)
}
}
- reduce 能够对集合进行计算操作
suspend fun flow12(){
// 计算阶乘
val sum = (1 until 5).asFlow()
.map { it * it }
.reduce { a, b -> a * b }
println(sum)
}
- fold 它有一个initial
suspend fun flow13(){
// 计算阶乘
val sum = (1 until 5).asFlow()
.map { it * it }
.fold(0){a,b ->
a * b
}
}
- 合并操作符 zip
suspend fun flow14(){
val flowA = (0 until 5).asFlow()
val flowB = flowOf("zero","one","two","three","four")
flowA.zip(flowB) {a,b ->
"$a and $b"
}.collect {
println(it)
}
}
- combine 这家伙也是合并,flowA每次发出新的元素,会将其与flowB最新的元素合并
suspend fun flow15(){
val flowA = (0 until 5).asFlow().onEach { delay(500) }
val flowB = flowOf("zero","one","two","three","four").onEach { delay(1000) }
flowA.combine(flowB) { a,b ->
"$a and $b"
}.collect {
println(it)
}
}
- flattenConcat 多个流合并的串联执行
suspend fun flow16(){
val flowA = (0 until 5).asFlow()
val flowB = flowOf("zero","one","two","three","four")
flowOf(flowA,flowB)
.flattenConcat()
.collect {
println(it)
}
}
- flatMapConcat
suspend fun flow17(){
var starting:Long = 0
(0 until 10).asFlow()
.onStart { starting = System.currentTimeMillis() }
.onEach { delay(1000) }
.flatMapConcat {
flow {
emit("$it first")
delay(500)
emit("$it second")
}
}.collect {
println("$it at ${System.currentTimeMillis() - starting} -> from start")
}
}
- flatMapLatest 当发射新值之后,上一个flow就会被取消
suspend fun flow18(){
var starting:Long = 0
(0 until 10).asFlow()
.onStart { starting = System.currentTimeMillis() }
.onEach { delay(1000) }
.flatMapLatest {
flow {
emit("$it first")
delay(500)
emit("$it second")
}
}.collect {
println("$it at ${System.currentTimeMillis() - starting} -> from start")
}
}
- channelFlow (异步非阻塞)
suspend fun channelFlow01(){
val time = measureTimeMillis {
channelFlow<Any> {
for (num in 1 until 5) {
delay(2000)
send(num)
}
}.collect {
delay(500)
println(it)
}
}
}