Rxjava

操作符

Rxjava操作符

设计模式

观察者模式、发布订阅模式:发布者发布信息,订阅者获取信息,订阅了就能收到信息,没订阅就收不到信息。


观察者模式

应用场景

RxJava配合Retrofit ,防抖,网络嵌套

核心思想

有一个起点和一个终点,起点开始流向我们的“事件”
把事件流向到终点,只不过在流向的过程中,可以增加拦截,
拦截时可以对“事件进行改变” 终点只关心它上一个拦截

Observable创建过程时序图

Observable创建过程时序图

订阅流程分析

1.从终点开始也就是Rxjava中的自定义被观察者new Observer<T>()。
2.通过订阅事件subscribe(自定义观察者)方法中调用抽象函数subscribeActual(自定义观察者)。
3.ObervableCreat类中实现了subscribeActual(自定义观察者)方法,在这个方法中有三个重要步骤:
1)new CreateEmitter<T>(自定义观察者) =parent。
2)自定义观察者通过onSubscribe(parent)拿到发射器。
3)自定义被观察者的封装类ObservableOnSubscribe<T>,自定义被观察订阅发射器source.subscribe(parent)。
4.到达起点Rxjava中自定义被观察者的创建以及封装到new ObservableOnSubscribe<T>(),并实现subscribe订阅方法,通过里面订阅到的发射器e.onNext(T)发射事件,由于自定义观察者拿到过发射器所以实际调用的是“自定义观察者.onNext”最后回到终点。

Observable与Observer 订阅的过程以及时序图

Observable与Observer订阅
Observable与Observer时序图

Map操作符

Map操作符
Map源码分析

Rxjava的装饰模型

Rxjava的装饰模型
Rxjava的拆包封包操作

Rxjava的线程

Rxjava的Scheduler分类
线程时序图
IoScheduer时序图

线程流程分析

1.从终点开始也就是Rxjava中的自定义被观察者new Observer<T>()。
2.通过订阅事件subscribe(自定义观察者)方法中调用抽象函数subscribeActual(自定义观察者)。
3.通过线程上游定义函数subscribeOn(scheduler),new ObservableSubscribeOn<T>(this:自定义观察者, scheduler)这个是将自定义观察者和它所需要的线程实现类封装起来。
ObservableSubscribeOn这个类中有静态内部类SubscribeOnObserver继承AtomicReference<Disposable>实现了自定义观察者的所有接口Observer以及打断接口Disposable。
4.在ObservableSubscribeOn这个类中的subscribeActual()还new SubscribeTask(parent:SubscribeOnObserver);在SubscribeTask这个类的run方法中去将自定义被观察者和parent订阅在一起。
5.scheduler.scheduleDirect(new SubscribeTask(parent));这个函数是将任务交给对应的线程池,scheduleDirect()方法中创建对应的线程Worker w = createWorker(),然后将任务交给w线程去执行 w.schedule(task, delay, unit);同时将任务和w线程封装成DisposeTask,返回DisposeTask(这个特别之处是可以中断的Runnable)。如果是主线程的话其实走的是HandlerScheduler。
6.由于在对应线程中执行了subscribe方法,所以下面的操作多在对应的线程上执行,之后到达ObervableCreat类,此类中也实现了subscribeActual(parent:SubscribeOnObserver)方法,在这个方法中有三个重要步骤:
1)new CreateEmitter<T>(自定义观察者) =parent。
2)自定义观察者通过onSubscribe(parent)拿到发射器。
3)自定义被观察者的封装类ObservableOnSubscribe<T>,自定义被观察订阅发射器source.subscribe(parent)。
7.到达起点Rxjava中自定义被观察者的创建以及封装到new ObservableOnSubscribe<T>(),并实现subscribe订阅方法,通过里面订阅到的发射器e.onNext(T)发射事件,由于自定义观察者拿到过发射器所以实际调用的是“自定义观察者.onNext”最后回到终点。

异步线程

注意:哪个线程调用subscribe,在自定义观察者的实现方法中onSubscribe(Disposable d)就是哪个线程。可以在这里做赋全局变量,在不需要再继续执行的地方做中断Rxjava的流式操作

线程流程

自定义Rxjava操作符

1.首先继承Observable<T>这个类,重写里面的subscribeActual()方法。
2.在这个类中写个事件的静态内部类实现对应的事件以及Disposable接口。

自定义被观察者类
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.view.View;

import androidx.annotation.NonNull;

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;

public class ViewClickObservable extends Observable<Object> {

    private final View view;

    // 事件  第一节课 防抖 事件 没用
    private static final Object EVENT = new Object();
    private static Object EVENT2;

    public ViewClickObservable(View view) {
        this.view = view;

        EVENT2 = view;
    }

    @Override
    protected void subscribeActual(Observer<? super Object> observer) {
        // 可以干自己的

        MyListener myListener = new MyListener(view, observer);
        observer.onSubscribe(myListener);

        this.view.setOnClickListener(myListener);
    }

    // 我们的包裹
    static final class MyListener implements View.OnClickListener, Disposable {

        private final View view;
        private Observer<Object> observer;  // 存一份 下一层

        // 原子性
        // https://www.jianshu.com/p/8a44d4a819bc
        // boolean  == AtomicBoolean
        private final AtomicBoolean isDisposable = new AtomicBoolean();

        public MyListener(View view, Observer<Object> observer) {
            this.view = view;
            this.observer = observer;


        }

        @Override
        public void onClick(View v) {
            if (isDisposed() == false) {
                observer.onNext(EVENT);
            }

        }

        // 如果用调用了 中断
        @Override
        public void dispose() {
            // 如果没有中断过,才有资格,   取消view.setOnClickListener(null);
            if (isDisposable.compareAndSet(false, true)) {
                // 主线程 很好的中断
                if (Looper.myLooper() == Looper.getMainLooper()) {
                    view.setOnClickListener(null);

                } else { // 主线程,通过Handler的切换
                    /*new Handler(Looper.getMainLooper()) {
                        @Override
                        public void handleMessage(@NonNull Message msg) {
                            super.handleMessage(msg);
                            view.setOnClickListener(null);
                        }
                    };*/

                    // HandlerScheduler.scheduleDirect

                    AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
                        @Override
                        public void run() {
                            view.setOnClickListener(null);
                        }
                    });
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return isDisposable.get();
        }
    }
}
操作符类
import android.view.View;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;

public class RxView {

    private final static String TAG = RxView.class.getSimpleName();

    // 我们自己的操作符 == 函数
    public static Observable<Object> clicks(View view) {
        return new ViewClickObservable(view);
    }


}

应用
Button button = findViewById(R.id.button);
RxView.clicks(button)
                .throttleFirst(2000, TimeUnit.MILLISECONDS)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> e) throws Exception {
                                e.onNext("Derry");
                            }
                        })
                        .subscribe(new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                Log.d(L.TAG, "accept: 终点:" + s);
                            }
                        });
                    }
                });
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容