框架-RxJava之Observable

创建一个Observable

  1. Observable.create()
    该方法接收一个Obsubscribe对象
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {

            }
        });

来个例子:

        Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i=0;i<5;i++){
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        });
        //Observable.subscribe(Observer),Observer订阅了Observable
        Subscription subscribe = observable.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "完成");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "异常");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "接收Obsverable中发射的值:" + integer);
            }
        });
输出:

 接收Obsverable中发射的值:0
接收Obsverable中发射的值:1
接收Obsverable中发射的值:2
接收Obsverable中发射的值:3
接收Obsverable中发射的值:4
完成

从上面的例子可以看出,在Observer订阅了Observable后,Observer作为OnSubscribe中call方法的参数传入,从而调用了Observer的相关方法

  1. Observable.from()
    该方法需要一个 数组或集合参数
    假如现在我们有一个集合,我们能否也像上面一样但不使用for循环,一个个发送给Observer,发送完后再调用onCompleted方法呢?
        //创建一个集合
        List<Integer> list=new ArrayList<>();
        for(int i=0;i<5;i++){
            list.add(i);
        }
        //使用Observable.from接收上面的list集合
        Observable<Integer> observable=Observable.from(list);
        //Observer订阅Observable
        Subscription subscribe = observable.subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "完成");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "异常");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "接收Obsverable中发射的值:" + integer);
            }
        });
输出:

 接收Obsverable中发射的值:0
接收Obsverable中发射的值:1
接收Obsverable中发射的值:2
接收Obsverable中发射的值:3
接收Obsverable中发射的值:4
完成

可以看到使用Observable.from()和1中的效果一样

  1. Observable.just()
    该方法可接收 1到9同一任意类型的参数
    假如我们有一个带返回值的方法,我们也想使用Observable对其操作,该怎么办呢?
  private List<Integer> getDatas(){//提供数据的方法
        List<Integer> list=new ArrayList<>();
        for(int i=0;i<5;i++){
            list.add(i);
        }
        return list;
    }

        //使用Observable.just调用上面的getDatas()方法,注意这里得到的Observable的类型为
        //List<Integer>,而非Integer
        Observable<List<Integer>> observable=Observable.just(getDatas());
        Subscription subscribe = observable.subscribe(new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "完成");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "异常");
            }

            @Override
            public void onNext(List<Integer> integers) {
                for(Integer integer: integers)
                    Log.e(TAG, "接收Obsverable中发射的值:" + integer);
            }
        });
输出:

 接收Obsverable中发射的值:0
接收Obsverable中发射的值:1
接收Obsverable中发射的值:2
接收Obsverable中发射的值:3
接收Obsverable中发射的值:4
完成

从上面的代码,我们可以看出just与from的区别,just是将集合直接作为一个参数发送给Observer,而from是将
集合中所有的元素一个一个的发送给Observer。假如上面代码中的getDatas方法返回的不是一个集合,我们也就不能使用from了应该使用just。
在发送数据后,just也会自动调用onCompleted方法。

  1. 其他方式:
    当我们需要一个Observable毫无理由的不再发射数据正常结束时,我们可以使用empty()
    。我们可以使用never()
    创建一个不发射数据并且也永远不会结束的Observable。我们也可以使用throw()
    创建一个不发射数据并且以错误结束的Observable。

特殊类Subject

为啥说它特殊呢?
因为它继承了Observable并且实现Observer接口,所以说它即可已是一个Observer又可以是一个Observable,它包含4个子类:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject
  1. PublishSubject
        //创建一个Subject这里看起来是不是缺点什么,对比上面的Observable.create()方法
        PublishSubject<String> subject = PublishSubject.create();
        
        Subscription subscription = subject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "完成");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "异常");
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG,"接收PublishSubject发送的值:"+s);
            }
        });
        //手动发送数据给订阅它的Observer
        subject.onNext("PublishSubject");
输出:
接收PublishSubject发送的值:PublishSubject

从上面代码可以看出,我们需手动发送数据给订阅它的Observer。所以这里就与Observable不同:
在Observable中,如果有订阅者订阅了它,它就会马上自动发送数据给Observer。
而对于Subject而言,它在被订阅的时候并不会自动发送数据给Observer,发送数据的控制权交给了我们,在我们发送数据之前,订阅的Observer会一直处于等待状态,但是这种等待并不会阻塞线程,也不会消耗太多的资源。
当然我们也可以像Observable一样使用Subject,没有区别。

  1. ReplaySubject
        //PublishSubject<Integer> subject=PublishSubject.create();
        ReplaySubject<Integer> subject= ReplaySubject.create();
        subject.onNext(1);// 1
        subject.subscribe(new Action1<Integer>() {// 2
            @Override
            public void call(Integer integer) {
                Log.e(TAG,""+integer);
            }
        });
        subject.onNext(2);// 3
        subject.onNext(3);// 4
        subject.onNext(4);// 5

这里没有给出输出,那我们先来猜猜如果上面的subject是PublishSubject情况下输出的是什么吧?
执行到1时,由于subject没有订阅者订阅,所以发送出去的数据1也就没有订阅者接收
执行到2时,subject有订阅者订阅了(这里的Action1相当于一个实现了onNext方法的Observer对象)
执行到3 4 5时,subject发送的数据都会被2中的订阅者接收到,从而输出2、3、4。
PublishSubject发送数据时,会将数据发送给订阅了它的所有Observer
那subject是ReplaySubject输出的是怎样呢?
Replay是不是重新播放的意思呢,这里的重新播放是指 当Observer订阅ReplaySubject时,会将ReplaySubject之前发送过的数据,重新发送给该Observer,所以这里会输出1 2 3 4。
再来看看另外2个ReplaySubject.createXXX()方法
- ReplaySubject.createWithSize(int size)
如果在Observer订阅该ReplaySubject前,ReplaySubject发送数据的个数大于size,那么超出size部分的数据
不会发送给Observer。

        ReplaySubject<Integer> subject= ReplaySubject.createWithSize(2);
        subject.onNext(1);
        subject.onNext(2);
        subject.onNext(3);
        subject.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,""+integer);
            }
        });
        subject.onNext(4);
输出:2 3 4
- ReplaySubject.createWithTime(int time, TimeUnit unit, Scheduler scheduler);

需要3个参数,第12个用来确定时间,第3个传入一个Scheduler
该方法表示如果ReplaySubject发送数据的时间超过了指定的时间,将不会重新发送给新订阅的Observer

  1. BehaviorSubject
        BehaviorSubject subject=BehaviorSubject.create();
        subject.onNext(1);
        subject.onNext(2);
        subject.onNext(3);
        subject.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,""+integer);
            }
        });
        subject.onNext(4);
        subject.onNext(5);
输出:
3 4 5

BehaviorSubject可以当作ReplaySubject来看,它只接收Observer订阅前BehaviorSubject发送的最后一条数据。

  1. AsyncSubject
        AsyncSubject subject=AsyncSubject.create();
        subject.onNext(1);
        subject.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,"A"+integer);
            }
        });
        subject.onNext(2);
        subject.onNext(3);
        subject.onCompleted();
        subject.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,"B"+integer);
            }
        });
输出:
A3 B3

AsyncSubject会将执行过的一个完整的事件缓存起来(最后一个subject.onNext() +subject.onCompleted()),然后会发送给所有订阅它的Observer。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容