Android 面试知识点记录——Rxjava与协程

作者:浪人笔记

Rxjava

  • Rxjava常用操作符
  • map和flatMap有什么区别
  • Rxjava1.0和Rxjava2.0有什么区别?
  • subscribeOn与observeOn多次执行会怎么样?
  • Rxjava是怎么切回到主线程的

协程

  • 进程、线程、协程的区别
  • 什么回调地狱以及协程在这方面的处理
  • 开发中怎么选择合适的调度器

Rxjava

Rxjava常用操作符

  • map() 操作符:用于将流中的每个元素通过一个函数转换为另一个元素。
  • flatMap() 操作符:用于将流中的每个元素通过一个函数转换为多个元素,并将这些元素组合成一个新的流。
  • filter() 操作符:用于过滤流中的元素,只保留符合条件的元素。
  • take() 操作符:用于从流中取前 n 个元素。
  • reduce() 操作符:用于将流中的元素通过一个函数进行累加,得到一个最终结果。
  • scan() 操作符:用于将流中的元素通过一个函数进行累加,得到每一步的中间结果。
  • concat() 操作符:用于将多个流组合成一个新的流。
  • merge() 操作符:用于将多个流合并成一个新的流。
  • zip() 操作符:用于将多个流中的元素按顺序一一组合成一个新的元素,并形成一个新的流。
    -debounce() 操作符:用于过滤流中发射过快的元素,只保留一个元素。

map和flatMap有什么区别

  • map 和 flatMap 都可以用来对数据流中的数据进行变换,但它们的实现方式有所不同。map 只进行一次变换,并将变换后的结果发射出去,而 flatMap 则进行多次变换,并将得到的 Observable 合并成一个新的 Observable 发射出去

在源码层面,map 操作符的实现非常简单,它实际上就是在原有的 Observable 上添加了一个新的 MapObservable 观察者,并将变换函数作为参数传递给 MapObservable。在 MapObservable 的 onNext 方法中,会将接收到的元素传递给变换函数进行变换,并将变换后的结果作为新的元素发射出去。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

在源码层面,flatMap 操作符的实现相对比较复杂。它实际上是在原有的 Observable 上添加了一个新的 FlatMapObservable 观察者,并将变换函数作为参数传递给 FlatMapObservable。在 FlatMapObservable 的 onNext 方法中,会将接收到的元素传递给变换函数进行变换,并得到一个新的 Observable。然后,它会将这个新的 Observable 注册到一个 FlatMapSubscriber 中,等待下一次数据的到来。当所有数据都处理完成后,FlatMapObservable 会调用 FlatMapSubscriber 的 onComplete 方法,将所有得到的 Observable 合并成一个新的 Observable,并将它发送给下游的观察者。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false, bufferSize(), bufferSize());
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) {
        ScalarCallable<T> scalarCallable = (ScalarCallable<T>)this;
        R r = scalarCallable.call();
        if (r == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(r, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

Rxjava1.0和Rxjava2.0有什么区别?

  1. 改进的异常处理:RxJava 2.0 改进了异常处理机制,使得开发者可以更好地处理异常,避免应用程序崩溃。
  2. 新的操作符:RxJava 2.0 引入了一些新的操作符,如 Flowable,Single 和 Completable,来取代旧版本的 Observable。这些新的操作符可以更好地处理背压(backpressure)和错误处理。
  3. 改进的背压支持:RxJava 2.0 引入了更好的背压支持,可以更好地处理在数据源发送大量数据时的情况。
  4. 改进的线程调度:RxJava 2.0 改进了线程调度机制,使得开发者可以更好地控制并发性。
    5.更好的性能:RxJava 2.0 在性能上也有所提升,可以更好地处理大量数据流。

总的来说,RxJava 2.0 在异常处理、背压支持、线程调度和性能等方面都有所改进和提升

什么是背压?怎么改进的?

背压(Backpressure)是指当数据产生速度大于消费速度,程序处理不过来是消息就会出现堆积。从而导致内存溢出、程序崩溃等问题。这种情况被称为背压问题

逻辑上的改进办法

  1. 生产者数量=消费者数量
  2. 节流,丢弃一部分请求
  3. 打包,把所有事件封装在一个集合中发送

Rxjava1.x的时候没有对背压的支持,只提供了onBackpressureBuffer(time)、onBackpressureDrop() 等)来缓解背压问题,但这些解决方案都只是对数据流进行了缓存或者丢弃处理

RxJava 2.0后 引入了新的数据类型 Flowable,它支持背压,并提供了更多的背压控制策略。

Flowable 类型是一个支持背压的数据源,可以通过 onBackpressureBuffer,onBackpressureDrop,onBackpressureLatest 等方式来处理背压问题。其中

  • onBackpressureBuffer 策略会在内存中缓存数据,直到消费者可以消费这些数据;
  • onBackpressureDrop 策略会在数据流中丢弃一部分数据,直到消费者可以消费;
  • onBackpressureLatest 策略会只保留最新的数据,丢弃旧数据。

另外Flowable 的方式和 Observable 类似,只是Flowable 在使用的时候需要注意要制定背压策略。

subscribeOn与observeOn多次执行会怎么样?

结论:subscribeOn只跟第一次指定的线程有关,执行多次跟最后一次有关。

  • subscribeOn只有第一次会生效,所以只跟第一次指定的线程有关。

当我们在一个 Observable中使用多个 subscribeOn 操作符时,它们的执行顺序只会影响到代码中的顺序,但实际上只有第一个 subscribeOn 会生效。原因是在 ObservableSubscribeOn 类的实现中,只会在第一个 subscribeOn 操作符中调用 scheduler.scheduleDirect 方法,后面的 subscribeOn 操作符调用该方法也会被拦截,也就不会改变 Observable 的执行线程。这就是为什么在同一个 Observable 中使用多个 subscribeOn 操作符时,只有第一个 subscribeOn 会生效的原因。

//Observable.java
 @Override
    public final void subscribe(Observer<? super T> observer) {
    ...
         subscribeActual(observer);
    ...
    }
//-----------------
// ObservableSubscribeOn.java
 @Override
public void subscribeActual(Observer<? super T> observer) {
    if (once) {
        source.subscribe(observer);
        return;
    }
    once = true;
    Scheduler scheduler = this.scheduler;
    SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    observer.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent, source)));
}
  • observeOn 执行多次跟最后一次有关。
@Override
protected void subscribeActual(Observer<? super T> observer) {
    Scheduler.Worker worker = scheduler.createWorker();

    source.subscribe(new ObserveOnObserver<T>(observer, worker, delayError, bufferSize));
}

其实比较好理解,subscribeOn理解为一个管道的入口,observeOn 理解为一个管道的出口。数据进去之后就没办法指定了,但是数据出来之前都可以在切换出口

Rxjava是怎么切回到主线程的

使用observeOn(AndroidSchedulers.mainThread()),内部的实现其实是new Handler(Looper.getMainLooper())

public class MainThreadScheduler extends Scheduler {

    private static MainThreadScheduler INSTANCE;

    private MainThreadScheduler() {}

    public static MainThreadScheduler instance() {
        if (INSTANCE == null) {
            INSTANCE = new MainThreadScheduler();
        }
        return INSTANCE;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new MainThreadWorker(new Handler(Looper.getMainLooper()));
    }

    private static class MainThreadWorker extends Worker {

        private final Handler mHandler;

        MainThreadWorker(Handler handler) {
            mHandler = handler;
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable runnable) {
            mHandler.post(runnable);
            return Disposables.empty();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable runnable, long delay, @NonNull TimeUnit unit) {
            mHandler.postDelayed(runnable, unit.toMillis(delay));
            return Disposables.empty();
        }

        @Override
        public void dispose() {}

        @Override
        public boolean isDisposed() {
            return false;
        }
    }
}

协程

进程、线程、协程的区别

  • 进程(Process)是指操作系统中的一个执行单位,它有自己独立的内存空间和资源,可以执行独立的程序,是程序运行的基本单位。一个进程可以包含多个线程。
  • 线程(Thread)是进程中的一个执行单元,它共享进程的内存空间和资源,但具有独立的执行序列和运行堆栈。一个进程可以包含多个线程,线程之间可以并发执行,实现多任务处理。
  • 协程(Coroutine)是一种用户态的轻量级线程,由程序员自己控制调度,而不是由操作系统控制。协程可以在同一线程中实现并发执行,利用时间片轮转算法切换任务,避免了线程上下文切换带来的开销,可以提高程序的执行效率。

Kotlin 的协程是基于 Kotlin 标准库中的协程框架实现的。该框架基于一种称为“挂起函数”的特殊函数类型实现,这些函数可以暂停执行并在稍后的某个时候恢复执行,从而实现了协程的效果。不依赖于操作系统和编译器。

什么回调地狱以及协程在这方面的处理

回调地狱指的是在异步编程中,如果多次嵌套使用回调函数来处理异步操作,会造成代码的可读性和可维护性变差,代码逻辑难以理解和调试的情况。举个例子

getUserInfo(userId) { user ->
    getUserOrders(user.id) { orders ->
        for (order in orders) {
            getItems(order.id) { items ->
                for (item in items) {
                    processItem(item) { result ->
                        saveResult(result) {
                            // ...
                        }
                    }
                }
            }
        }
    }
}

协程中的挂起函数写法是

suspend fun processOrders(userId: String) = withContext(Dispatchers.IO) {
    val user = getUserInfo(userId)
    val orders = getUserOrders(user.id)
    for (order in orders) {
        val items = getItems(order.id)
        for (item in items) {
            val result = processItem(item)
            saveResult(result)
        }
    }
}

使用 withContext 可以指定协程执行的上下文,这里使用了 IO 线程池,避免了主线程的阻塞。

开发中怎么选择合适的调度器

其实无非就是三个,一个主线程,一个io密集型,一个cpu密集型 rxjava中的调度器

  1. Schedulers.io():用于 I/O 密集型任务,比如网络请求等。
  2. Schedulers.computation():用于 CPU 密集型任务,比如类似视频编解码这种大量的计算和数据处理等。
  3. Schedulers.newThread():每次都创建一个新线程,不推荐使用。
  4. AndroidSchedulers.mainThread():用于 Android 平台的 UI 线程。

对应kotlin协程里面的是

  1. Dispatchers.Default:适合执行 CPU 密集型任务的调度器,它会自动根据可用的 CPU 数量进行调度。
  2. Dispatchers.IO:适合执行 I/O 密集型任务的调度器,比如网络请求和磁盘 I/O 等。
  3. Dispatchers.Main:适合在 Android 应用程序中执行 UI 操作的调度器。在 Android 应用程序中,Main 调度器会将协程的执行切换到主线程上。
  4. Dispatchers.Unconfined:一个不受限制的调度器,它允许协程在调用挂起函数的线程中继续执行。使用这个调度器时需要特别小心,因为它可能会导致一些奇怪的行为。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,884评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,212评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,351评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,412评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,438评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,127评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,714评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,636评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,173评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,264评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,402评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,073评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,763评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,253评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,382评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,749评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,403评论 2 358

推荐阅读更多精彩内容