kotlin 回调转协程挂起函数

github博客

λ:

今天起 android demo 项目新加个sdk:腾讯云IM,最近正在用,而且接口多,涉及到的需求也挺全。正好练手。同时也有flutter的sdk。顺路把flutter也写了。

大多数sdk或者库在提供api时,对于异步处理一般都是提供回调。好处是通用,兼容,不管java, kotlin,不用管其他依赖库。 坏处就不用再提了。

IM也不例外是一堆回调,MVVM模式下,一层层传回调上去就很low,所以把IM用到的接口整理成Service,在里边把回调包成kotlin 协程挂起函数。

suspendCancellableCoroutine

public suspend inline fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T 

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

协程库提供的两个内联函数。通过操作其中的CancellableContinuation提交结果。点进去看源码,查看支持的操作。

public interface Continuation<in T> {
    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}

public interface CancellableContinuation<in T> : Continuation<T> {
    public val isActive: Boolean

    public val isCompleted: Boolean

    public val isCancelled: Boolean

    public fun cancel(cause: Throwable? = null): Boolean

    public fun invokeOnCancellation(handler: CompletionHandler)

    ... 试验性接口
}

public inline fun <T> Continuation<T>.resume(value: T): Unit = resumeWith(Result.success(value))

public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit = resumeWith(Result.failure(exception))

public inline fun <T> Continuation(context: CoroutineContext, crossinline resumeWith: (Result<T>) -> Unit): Continuation<T>

忽略掉被打了标签的接口(不确定,试验性,即将废弃,等),看函数名基本就知道干嘛用,还剩这么点。 同时提供一堆拓展函数

所以可以通过 resumeresumeWithException 提交回调返回的结果。

通过invokeOnCancellation注册取消时要执行的任务,比如关闭流之类的。

如IM中获取所有已加入群,当回调返回为失败时,直接提交一个空列表。或者提交个Throwable

suspend fun getJoinedGroupList(): List<V2TIMGroupInfo> =
        suspendCancellableCoroutine { continuation ->
            V2TIMManager.getGroupManager().getJoinedGroupList(object : V2TIMValueCallback<List<V2TIMGroupInfo>> {
                    override fun onSuccess(t: List<V2TIMGroupInfo>) {
                        continuation.resume(t)
                    }

                    override fun onError(code: Int, desc: String?) {
                        continuation.resume(emptyList())
//                     continuation.resumeWithException(Exception("code: $code, desc: $desc"))
                    }
                })
        }

callbackFlow, SharedFlow, StateFlow

有些回调是在实时监听数据。比如位置信息,音量变化,IM中数据变化,新消息送达等等。所以这种回调用 kotlin Flow 和 Channel 处理。

callbackFlow

现在callbackFlow仍标记为@ExperimentalCoroutinesApi。所以等 鸡啄完了米,狗舔完了面,火烧断了锁 。我再用。

允许在不同的CoroutineContext中提交数据。刨一下源码:

public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)

private class CallbackFlowBuilder<T>(...) : ChannelFlowBuilder<T>(...)

private open class ChannelFlowBuilder<T>(...) : ChannelFlow<T>(...)

最底层就是个ChannelFlow,也就是开个带缓冲区Channel来收集数据,在Flow里接收数据。

CallbackFlowBuilder在此基础上加了awaitClose: 当流要关闭时要执行的操作,常见的是注销掉回调函数。如果没有awaitClose,将会抛出IllegalStateException异常。

所以提交数据的方式和SendChannel一样。

callbackFlow<T> {
    send(T) // 发送数据
    offer(T) // 允许在协程外提交
    sendBlocking(T) //尝试用offer提交,如果失败则runBlocking{ send(T) },阻塞式提交
    awaitClose(block: () -> Unit = {}) // 关闭时执行的操作
}
// demo
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            try {
                sendBlocking(value)
            } catch (e: Exception) {

            }
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    awaitClose { api.unregister(callback) }
}

SharedFlow

取代BroadcastChannel

SharedFlow, MutableSharedFlow 都是 interface。 同时提供了fun MutableSharedFlow用于快速构造。

public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> 

replay:重播n个之前收到的数据给新订阅者。 >= 0

extraBufferCapacity:除了重播之外缓冲区大小。当缓冲区不满时,提交数据不会挂起

onBufferOverflow:缓冲区溢出时的策略(replay != 0 || extraBufferCapacity != 0 才有效)。默认SUSPEND: 暂停发送,DROP_OLDEST:删掉最旧数据,DROP_LATEST:删掉最新数据。

通过emit(T)在协程中提交数据。

tryEmit(T): Boolean 尝试在不挂起的情况下提交数据,成功则返回true。 如果onBufferOverflow = BufferOverflow.SUSPEND ,在缓冲区满时,tryEmit会返回false,直到有新空间。而如果是DROP_OLDESTDROP_LATEST,不会阻塞,tryEmit永为true

所以我用MutableSharedFlow替代callbackFlow的提交过程。

// demo
val resFlow = MutableSharedFlow<Res<T>>(
    extraBufferCapacity = 1,
    onBufferOverflow: BufferOverflow = BufferOverflow.DROP_OLDEST
)

fun registerCallback() {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            resFlow.tryEmit(Res.Success(value))
        }
        override fun onApiError(cause: Throwable) {
            resFlow.tryEmit(Res.Failed(cause))
        }
        override fun onCompleted() {
            resFlow.tryEmit(Res.Finish)
        }
    }
    api.register(callback)
}

StateFlow

继承自SharedFlow,同样也提供了快速构造的函数。函数必须提交一个初始的value。底层相当于开了一个 MutableSharedFlow(replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST).

public interface StateFlow<out T> : SharedFlow<T> {
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    public override var value: T
    public fun compareAndSet(expect: T, update: T): Boolean
}

@Suppress("FunctionName")
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)

compareAndSet: 如果当前值为expect, 则更新为update。如果更新则返回true(包括current == expect && current == update 的情况)。

// demo
class LatestNewsViewModel(
private val newsRepository: NewsRepository) : ViewModel() {

    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews.collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}

sealed class LatestNewsUiState {
    data class Success(news: List<ArticleHeadline>): LatestNewsUiState()
    data class Error(exception: Throwable): LatestNewsUiState()
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容