【Kotlin】Flow简介

1 前言

Flow 是 Kotlin 标准库中的一个新的异步流处理框架,旨在简化异步数据流的操作和处理,它提供了一种声明式的方式来处理数据流。

Flow 中一些接口调用有些类似 Sequence(详见 → Sequence简介),协程的使用详见 → 协程

Flow 有以下特性和概念。

  1. 异步流(Asynchronous Streams):Flow 允许以一种非阻塞的方式处理一系列的值或事件,这使得在处理大量数据或涉及 IO 操作时能够更加高效。
  2. 冷流(Cold Flow):只有在收集器(collector)订阅(或启动)了之后才会开始发射(emit)数据。
  3. 热流(Hot Flow):在创建后就立即开始发射(emit)数据,不管是否有收集器(collector),这会导致收集器可能只接收到部分数据。
  4. 声明式 API:Flow 提供了一套简洁清晰的操作符,允许以声明式的方式对流进行处理,如 map、filter、reduce 等。
  5. 协程集成:Flow 构建在协程之上,因此可以与协程一起使用,并且可以利用协程的优势,比如轻量级、顺序性等。
  6. 取消支持:Flow 支持与协程一样的取消操作,从而释放资源和避免内存泄漏。
  7. 背压支持:Flow 提供了背压支持,可以通过各种操作符来控制数据的生产和消费速率,防止生产者速度过快导致消费者无法跟上。

Flow 有中间操作和终端操作,如下。

  • 中间操作:每次操作返回一个新的 Flow 对象(主要操作有:flowOn、catch、buffer、conflate、filter、distinctUntilChanged、drop、take、map 等)。
  • 终端操作:每次操作返回一个值或集合,每个 Flow 只能进行一次终端操作(主要操作有:first、last、count、fold、reduce、collect、toCollection、toSet、toList 等)。

2 创建 Flow

2.1 emptyFlow

public fun <T> emptyFlow(): Flow<T> = EmptyFlow

2.2 flow

1)源码

public fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

2)应用

var coldFlow = flow<String> {  
    emit("A")
    emit("B")
}

2.3 MutableSharedFlow

1)源码

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

2)应用

var hotFlow = MutableSharedFlow<String>()
CoroutineScope(Dispatchers.Default).launch {
    hotFlow.emit("A")
    hotFlow.emit("B")
}

2.4 flowOf

1)源码

public fun <T> flowOf(value: T): Flow<T> = flow {
    emit(value)
}

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

2)应用

var flow1 = flowOf("A")
var flow2 = flowOf("A", "B", "C")

2.5 asFlow

2.5.1 () -> T

1)源码

public fun <T> (() -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

2)应用

fun main() {
    var fun2 = { fun1() }.asFlow()
}

fun fun1(): String {
    return "xxx"
}

2.5.2 Iterator

1)源码

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

2)应用

var array = intArrayOf(1, 2, 3)
var iterator = array.iterator()
var flow = iterator.asFlow()

2.5.3 Sequence

1)源码

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

2)应用

var sequence = sequenceOf(1, 2, 3)
var flow = sequence.asFlow()

2.5.4 Array

1)源码

public fun <T> Array<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

2)应用

var array = arrayOf(1, 2, 3)
var flow = array.asFlow()

2.5.5 Range

1)源码

public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

2)应用

var range = 1..3
var flow = range.asFlow()

2.6 zip

1)源码

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

2)应用

var flow1 = flowOf(1, 3, 5)
var flow2 = flowOf("A", "B", "C")
// A-1, B-3, C-5
var flow = flow1.zip(flow2) { num, str ->
    "$str-$num"
}

3 Flow 冷流和热流

3.1 冷流

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch

fun main() {
    val coldFlow = emitFlow()
    CoroutineScope(Dispatchers.Default).launch {
        coldFlow.collect {
            println("CoroutineScope, $it")
        }
    }
    MainScope().launch(Dispatchers.IO) {
        coldFlow.collect {
            println("MainScope, $it")
        }
    }
    Thread.sleep(1000)
}

fun emitFlow(): Flow<String> = flow {
    for (i in 1..2) {
        emit("emit-$i")
        delay(100)
    }
}

打印如下。

CoroutineScope, emit-1
MainScope, emit-1
CoroutineScope, emit-2
MainScope, emit-2

说明:可以看到每一个订阅者都可以收到所有消息。

3.2 热流

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

fun main() {
    var hotFlow = emitFlow()
    CoroutineScope(Dispatchers.Default).launch {
        hotFlow.collect {
            println("CoroutineScope, $it")
        }
    }
    MainScope().launch(Dispatchers.IO) {
        hotFlow.collect {
            println("MainScope, $it")
        }
    }
    Thread.sleep(1000)
}

fun emitFlow(): MutableSharedFlow<String> {
    var hotFlow = MutableSharedFlow<String>()
    CoroutineScope(Dispatchers.Default).launch {
        for (i in 1..2) {
            hotFlow.emit("emit-$i")
            delay(100)
        }
    }
    return hotFlow
}

打印如下。

MainScope, emit-2
CoroutineScope, emit-2

说明:可以看到每一个订阅者都只收到部分消息。

4 Flow 的中间操作

4.1 源码

// 切换线程
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
// 捕获异常
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T>
// 在数据流中使用一个缓冲区来存储数据, 当数据产生速率超过消费速率时, 数据会暂时存储在缓冲区中, 直到有足够的空间将其传递给订阅者。这可以确保数据不会丢失,但可能会占用更多的内存。
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T>
// 当数据产生速率超过消费速率时, 跳过一些数据, 只保留最新的数据。这样可以减少内存占用,但会丢失一部分数据。
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
// 过滤
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
// 去除相邻的重复元素
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
// 丢弃前 n 个元素
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 截取前 n 个元素
public fun <T> Flow<T>.take(count: Int): Flow<T>
// 映射(T -> R)
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>

4.2 应用

fun main() {
    var flow = flowOf(4, 9, 1, 8, 5, 7, 7, 5, 3, 6, 2)
    CoroutineScope(Dispatchers.Default).launch {
        flow.flowOn(Dispatchers.Default)
            .catch {
                println(it.message)
            }.buffer()
            .filter { it in 3..7 } // 4, 5, 7, 7, 5, 3, 6
            .distinctUntilChanged() // 4, 5, 7, 5, 3, 6
            .drop(1) // 5, 7, 5, 3, 6
            .take(4) // 5, 7, 5, 3
            .map { it * it } // 25, 49, 25, 9
            .collect(::println)
    }
    Thread.sleep(1000)
}

5 Flow 的终端操作

5.1 first、last、count

1)源码

// 首元素
public suspend fun <T> Flow<T>.first(): T
// 尾元素
public suspend fun <T> Flow<T>.last(): T

2)应用

fun main() {
    var flow = flowOf(3, 5, 7, 6)
    CoroutineScope(Dispatchers.Default).launch {
        println(flow.first()) // 3
        println(flow.last()) // 6
        println(flow.count()) // 4
    }
    Thread.sleep(1000)
}

5.2 collect

1)源码

public suspend fun collect(collector: FlowCollector<T>)

2)应用

fun main() {
    var flow = flowOf(1, 3, 5, 7)
    CoroutineScope(Dispatchers.Default).launch {
        flow.collect(::println) // 1, 3, 5, 7
    }
    Thread.sleep(1000)
}

5.3 fold

1)源码

// 规约运算,定义运算 o, result = (((((i o e1) o e2)) o e3) o e4) o ...
public suspend inline fun <T, R> Flow<T>.fold(initial: R,
    crossinline operation: suspend (acc: R, value: T) -> R
): R

说明:fold 与 reduce 的区别是,fold 有初值,reduce 无初值。

2)应用

fun main() {
    var flow = flowOf(2, 3, 5)
    CoroutineScope(Dispatchers.Default).launch {
        // 10+2+3+5=20
        var res1 = flow.fold(10, Integer::sum)
        println(res1)
        // 1*1-2*2=-3, (-3)*(-3)-3*3=0, 0*0-5*5=-25
        var res2 = flow.fold(1) { e1, e2 ->
            e1 * e1 - e2 * e2
        }
        println(res2)
    }
    Thread.sleep(1000)
}

5.4 reduce

1)源码

// 规约运算,定义运算 o, result = ((((e1 o e2)) o e3) o e4) o ...
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

说明:reduce 与 fold 的区别是,reduce 无初值,fold 有初值。

2)应用

fun main() {
    var flow = flowOf(1, 3, 5)
    CoroutineScope(Dispatchers.Default).launch {
        var sum = flow.reduce(Integer::sum)
        println(sum) // 9
        // 1*1-3*3=-8, (-8)*(-8)-5*5=39
        var res = flow.reduce { e1, e2 ->
            e1 * e1 - e2 * e2
        }
        println(res) // 39
    }
    Thread.sleep(1000)
}

5.5 集合转换

1)源码

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

2)应用

fun main() {
    var flow = flowOf(1, 3, 5)
    CoroutineScope(Dispatchers.Default).launch {
        var set1 = flow.toCollection(mutableSetOf()) // [1, 3, 5]
        var list1 = flow.toCollection(mutableListOf()) // [1, 3, 5]
        var set2 = flow.toSet() // [1, 3, 5]
        var list2 = flow.toList() // [1, 3, 5]
    }
    Thread.sleep(1000)
}

声明:本文转自【Kotlin】Flow简介

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容