Android RxJava之葵花宝典(上)(看我就够了)----入门基础

最近再系统的整理了一下RxJava,感觉挺好用的,因为它,GitHub 上衍生了一堆比如 RxAndroid、RxBus、RxPermission 等之类的开源库。下面写写整理出来的文档。

另外使用场景文章Android RxJava之葵花宝典(下)(看我就够了)---使用场景

一、RxJava的介绍
1、RxJava是什么

在讲RxJava之前,先了解一下相关术语

响应式编程:一种面向数据流和变化传播的编程范式

不懂?那举个简单的例子,界面上的按钮,点击的时候会触发按钮的写好的点击事件。 我们不知道按钮什么时候会被点到,但是点到了就会通知app去触发你写好的事件,这个 “通知” 的过程就是RxJava的工作。

观察者模式:

观察者模式的基本需求:观察者和被观察者之间是完全分离的,当被观察者的状态发生变化之后,
通过Register(注册) 或者 Subscribe(订阅)的方式,通知观察者。

RxJava 是一种函数式、响应式的异步操作库,它让你的代码更加简洁。
二、RxJava的重点基础
1、Observable

Observable [əb'zɜːvəbl]可观察量,可观察量,可观测的。在观察者模式中称为“被观察者”“可观察对象”

//OnSubcriber是实现了一个Acton1接口的接口。
Observable.create(new Observable.OnSubscribe<String>() {

   @Override
   public void call(Subscriber<? super String> subscriber) {
      //每个Observable有一个final OnSubscribe<T> onSubscribe 成员,
      //在该成员方法中调用call()方法,这个call方法的参数
      //就是 Observable.subscribe() 方法传入的 Subsriber实例。
   }
});
注意:在Rxjava中ActionX系列,其实就是无返回值的的接口
2、Observer

接收源,是观察者模式中的“观察者”,可接收Observable、Subject发射的数据。

Observer<String> observer = new Observer<String>() {
        @Override
        public void onCompleted() {
           //告知Observable没有更多的数据了,即没有新的onNext()发出时,就执行onCompleted()。
        }

        @Override
        public void onError(Throwable e) {
           //在事件处理过程中,出现了异常或者错误,就会被触发,同时整个队列将被终止,不再有事件发出。
        }

        @Override
        public void onNext(String s) {
            //实现方法跟Subscriber一模一样
        }
    };

在一个队列中,onCompleted()和onError() 都是最后触发的,而且两者中只有一个会被触发。

3、Subscriber

“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源。

Subscriber<String> subscriber = new Subscriber<String>() {
      @Override
      public void onCompleted() {

       }

       @Override
       public void onError(Throwable e) {

       }

       @Override
       public void onNext(String s) {
          //实现方法跟Observer一模一样
       }

       @Override
        public void onStart() {
            // 它会在 Subscribe 刚开始,而事件还未发送之前被调用,
            // 可以用于做一些准备工作,例如数据的清零或重置
            super.onStart();
        }
};

--------------------------------
//这是 Subscriber 所实现的另一个接口 Subscription 的方法.
//用于取消订阅。
subscriber.unsubscribe();

//这是 Subscriber 所实现的另一个接口 Subscription 的方法.
//用于判断当前是否订阅。
subscriber.isUnsubscribed();


4、Subscription

订阅。Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;

      //被观察者
      Observable<Integer> observableInteger = Observable
                .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i = 0; i < 5; i++) {
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        });

        //观察者
        Subscriber subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: Observable completed");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: Observable error");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };
        //进行订阅
        Subscription subscription = observableInteger.subscribe(subscriber);

        Log.d(TAG, "subscription: " + subscription.isUnsubscribed() + ",Observable:" + subscriber
                .isUnsubscribed());
Observable.from()
        List<Integer> integers = new ArrayList<>();
        integers.add(1);
        integers.add(2);
        integers.add(3);
        integers.add(4);
        integers.add(5);

        Subscriber subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: Observable completed");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: Observable error");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable.from(integers).subscribe(subscriber);
Observable.just()

将传入的参数依次发射出去。

        Subscriber subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: Observable completed");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: Observable error");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable.just(1,2,3,4,5).subscribe(subscriber);

just()中可以传入1-10个参数,并且将传入参数的顺序来发射出去。

5、Subject

是一个比较特殊的对象,既可充当发射源,也可充当接收源。
源码如下

 public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
      //这个Subject即实现了Observer,又继承了Observable
        ...
}

所以 Subject = Observable + Observer

RxJava针对不同的场景提供四种不同的Subject
  • PublishSubject
    订阅之后的数据全部被发射。
  • BehaviorSubject
    订阅之前的一个和订阅之后的全部数据被发射
  • ReplaySubject
    不论订阅所处任何位置,都将发射全部数据
  • AsyncSubject
    不论订阅所处任何位置,只会发射最后一个数据
PublishSubject

PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。

        PublishSubject<String> publishSubject = PublishSubject.create();

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: PublishSubject Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: PublishSubject Error!");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        publishSubject.onNext("one");
        publishSubject.onNext("two");
        publishSubject.subscribe(subscriber);
        publishSubject.onNext("three");

---------------------------------------
//打印结果
onNext: three
BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。

BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");

Subscriber<String> subscriber = new Subscriber<String>() {
      @Override
      public void onCompleted() {
          Log.d(TAG, "onCompleted: BehaviorSubject Completed!");
      }

      @Override
      public void onError(Throwable e) {
           Log.d(TAG, "onError: BehaviorSubject Error!");
       }

       @Override
       public void onNext(String s) {
            Log.d(TAG, "onNext: " + s);
       }
};

behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.onNext("three");

--------------------------------------------
onNext: default
onNext: one
onNext: two
onNext: three

如果把 behaviorSubject.subscribe(subscriber);放在倒数第二行

behaviorSubject.onNext("one");
behaviorSubject.onNext("two");
behaviorSubject.subscribe(subscriber);
behaviorSubject.onNext("three");

---------------------------------
//打印结果
 onNext: two
 onNext: three

看到结果,我们不难看出,其实上面所说的发射最近所发射的数据,其实就是以

behaviorSubject.subscribe(subscriber);

为界,这句代码之前的一个和之后的所以发射。

当然,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。

ReplaySubject

ReplaySubject会发射所有来自原始Observable的数据给观察者,无论它们是何时订阅的。

ReplaySubject<String> replaySubject = ReplaySubject.create();

Subscriber<String> subscriber = new Subscriber<String>() {
       @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: ReplaySubject Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ReplaySubject Error!");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
};


replaySubject.subscribe(subscriber);

replaySubject.onNext("one");
replaySubject.onNext("two");
replaySubject.onNext("three");

--------------------------
//打印
onNext: one
onNext: two
onNext: three
AsyncSubject

当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。

AsyncSubject<String> asyncSubject = AsyncSubject.create();

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {

                Log.d(TAG, "onCompleted: AsyncSubject Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: AsyncSubject Error!");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: "+s);
            }
        };

        Subscription subscription = asyncSubject.subscribe(subscriber);

        asyncSubject.onNext("one!");
        asyncSubject.onNext("two!");
        asyncSubject.onNext("three!");
        asyncSubject.onCompleted();

-----------------------------------
//打印结果如下
onNext: three!
onCompleted: AsyncSubject Completed!

当然如果原始Observable没有发射任何值,AsyncObject也不发射任何值

AsyncSubject会把最后一个值发射给后续的观察者。

请注意:如果在AsyncSubject异常时,那么不会向观察者发射任何值,只会传递一个错误的通知。
6、Action0

RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推。

7、Func0

与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;

三、RxJava操作符

Scheduler(调度器)

Scheduler(调度器)

subscribeOn()指定:Observable将全部的处理过程(包括发射数据和通知)放在特定的调度器上执行。
ObserveOn()指定:一个Observable在一个特定的调度器上调用观察者的onNext, onError和onCompleted方法,

      Subscriber subcriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: Error!");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        Observable.just("1", "2", "3", "4")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subcriber);

上面这段代码中,由于指定了1,2,3,4发射代码为Schedulers.io(),那么发射数据就将在io线程中执行。而onNext, onError和onCompleted则将在主线中执行。

Operators(操作符)

RxJava提供了几个mapping函数:map(),flatMap(),concatMap(),flatMapIterable()以及switchMap().所有这些函数都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。

map

map 是用于变换的一个操作符,这在RxJava中占据了一定的地位,就是因为它的变换操作。

      Subscriber subcriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: Error!");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable.just("1", "2", "3", "4")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                        return Integer.parseInt(s);
                    }
                })
                .subscribe(subcriber);

在上面的代码中,我通过map将字符串转化成了整形的1,2,3,4,返回一个Observable的对象。
请注意:这个操作符默认不在任何特定的调度器上执行。

flatmap
       Subscriber subcriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: Error!");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable.just("1", "2", "3", "4")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Func1<String, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(String s) {
                        return Observable.just(Integer.parseInt(s)+1);
                    }
                })
                .subscribe(subcriber);
-------------------------------------------
//打印
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onCompleted: Completed!

从上面我们可以看出,map与flatmap很相似,都是用的Func1,而且模式都是<I,O>模式,即是:I转换成O并返回。但是最大的不同点在于:我们flatmap的输出类型是Observable的类型。
在这里请注意一个问题:在执行flatmap中返回之后(O输出返回的Observable),并不是立马把返回的Observable通过Subscribe进行订阅,而是将返回的若干Observables都交给同一个Observable,然后再进行subscribe。

所以,在上面我们先将字符串"1","2", "3", "4" 分别转换成一个整形的Observable类型,即是:Observable(2),Observable(3),Observable(4),Observable(5)。然后将这些个Observables统一转换成一个Observable,再进行subscribe。

那么,这个flatmap到底有何用呢?可以用在什么地方呢?

假设这样一种情景:一个学校的老师我们定义为一个集合A,每个老师包括了个人信息和所教课程,一个老师不可能只教授一门课程,所以我们将老师所教授课程定义为集合B。如果让你打印每个老师所教课程,该怎么做?

    Teacher[] teachers = ...;
    Subscriber<Course> subscriber = new Subscriber<Course>() {
        @Override
        public void onNext(Course course) {
            Log.d(tag, course.getName());
        }
        ...
    };
    Observable.from(teachers)
        .flatMap(new Func1<Teacher, Observable<Course>>() {
            @Override
            public Observable<Course> call(Teacher teacher) {
                return Observable.from(teacher.getCourses());
            }
        })
        .subscribe(subscriber);
最后再补充一点:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。这意味着flatMap()函数在最后的Observable中不能够保证源Observables确切的发射顺序。
ConcatMap

RxJava的concatMap()函数解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。

repeat

让你发射的数据重复发射

        Subscriber subcriber = new Subscriber<Integer>() {
            ...
            }
        };

        Observable.just("1", "2","3")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Func1<String, Observable<Integer>>() {
                    @Override
                    public Observable<Integer> call(String s) {
                        return Observable.just(Integer.parseInt(s)+1);
                    }
                })
                .repeat(3)
                .subscribe(subcriber);

--------------------------------------------
//打印输出
 onNext: 2
 onNext: 3
 onNext: 4
 onNext: 2
 onNext: 3
 onNext: 4
 onNext: 2
 onNext: 3
 onNext: 4
 onCompleted: Completed!
range

从起始点开始发射数据

Subscriber subcriber = new Subscriber<Integer>() {
            ...
        };

        Observable.range(10,3)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(subcriber);

结果为:10,11,12。range(10,3),其中10 是起始,3是数量。

interval

在需要轮询的时候是最好的选择

Observable.interval(3,TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onCompleted() {
            ...
        });

interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位。

take
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
          .take(4)
          .subscribe(new Subscriber<Integer>() {
        ...
    });

------------------------------
//打印输出
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
TakeLast

如果我们想要最后N个元素,我们只需使用takeLast()函数:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
              .takelast(2)
              .subscribe(new Subscriber<Integer>() {
            ...
        });

--------------------------------------
//打印输出
Next: 7
Next: 8
Sequence complete.
五、RxJava的使用配置
1.build.gradle 添加依赖包
dependencies {
    compile fileTree(include: ['*.jar'], dir: 'libs')
    androidTestCompile('com.android.support.test.espresso:espresso-core:2.2.2', {
        exclude group: 'com.android.support', module: 'support-annotations'
    })
    compile 'com.android.support:appcompat-v7:25.1.1'
    testCompile 'junit:junit:4.12'
    /* 响应式*/
    compile 'io.reactivex:rxjava:1.1.6'
    // RxJava
    compile 'io.reactivex:rxandroid:1.2.1'
 
    /*retrofit*/
    compile 'com.squareup.retrofit2:retrofit:2.2.0'//这个
    // 如果要是用Gson转换的话,需要添加这个依赖
    compile 'com.squareup.retrofit2:converter-gson:2.2.0'//这个
    compile 'com.squareup.retrofit2:adapter-rxjava:2.2.0'//这个
    compile 'com.squareup.okhttp3:okhttp:3.7.0'
}
2.build.gradle 支持JDK1.8(是为了用到Lambda表达式语言,这样可以更便捷的编程)
android {
   ...
   compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

如果出现以下提示,则在在build.gradle文件的defaultConfig中添加以下代码

//如果报错
Error:Jack is required to support java 8 language features. Either enable Jack or remove sourceCompatibility JavaVersion.VERSION_1_8.

android {

  defaultConfig {
     //添加以下代码
     jackOptions {
        enabled true
     }
  }
}
3.build.gradle 添加表达式语言的插件。

apply plugin: 'me.tatarka.retrolambda'

如果这里报错Error:(2, 0) Plugin with id 'me.tatarka.retrolambda' not found.还需要在项目的build.gradle中的dependencies节点中添加
buildscript {
    repositories {
        jcenter()
    }
    dependencies {
        classpath 'com.android.tools.build:gradle:2.2.2'
        classpath 'me.tatarka:gradle-retrolambda:2.5.0'//添加这里
        // NOTE: Do not place your application dependencies here; they belong
        // in the individual module build.gradle files
       
    }
}

另外使用场景文章Android RxJava之葵花宝典(下)(看我就够了)---使用场景

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容