RxJava2.0----事件流操作符Observable Utility Operators

6.事件流操作符Observable Utility Operators

A toolbox of useful Operators for working with Observables
● Delay
● Do
● Materialize/Dematerialize
● Serialize
● TimeInterval
● Timeout
● Timestamp
● Using
● To
● Retry
● cache
● cast
● compese

● Delay
将一个事件流里的数据源全部都延时发送。

● Do
在观察者订阅前,接收数据前后,完成接收前后,事件流过程中发生错误后,事件流结束前后等回调被观察者通知
doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle doOnNext doOnSubscribe doOnTerminate onTerminateDetach

  private Observer observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            print("onSubscribe ");
        }

        @Override
        public void onNext(Integer integer) {
            print("onNext "+integer);
        }

        @Override
        public void onError(Throwable e) {
            print("onError "+e.getMessage());
        }

        @Override
        public void onComplete() {
            print("onComplete " );
        }
    };
    private Observable getObservable(final boolean isError){
        return  Observable.just(1,2,3,4,5)
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        print("doOnSubscribe ");
                    }
                })
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        print("doOnEach :"+integerNotification);
                    }
                }).doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        print("doAfterNext : "+integer  );
                        if(isError && integer == 3){
                            throw new Exception("There is a Error!!");
                        }
                    }
                }).doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doAfterTerminate : "  );
                    }
                }).doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnComplete : "  );
                    }
                }).doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doFinally : "  );
                    }
                }).doOnDispose(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnDispose : "  );
                    }
                }).doOnTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnTerminate : "  );
                    }
                }).onTerminateDetach()
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("doOnError : "  );
                    }
                }).doOnLifecycle(new Consumer<Disposable>() {
                    @Override
                    public void accept(Disposable disposable) throws Exception {
                        print("doOnLifecycle : accept"  );
                    }
                }, new Action() {
                    @Override
                    public void run() throws Exception {
                        print("doOnLifecycle Action : "  );
                    }
                });
    }
    private void doAct1() {
        //需要引入RxJava 1.0
        //-------------buffer operator------
        tx_console.setText("Do");
        getObservable(false).subscribe(observer);
Log.d(" ", "  =================");
        getObservable(true).subscribe(observer);
}
输出结果:
  doOnSubscribe
          doOnLifecycle : accept
          onSubscribe
          doOnEach :OnNextNotification[1]
          onNext 1
          doAfterNext : 1
          doOnEach :OnNextNotification[2]
          onNext 2
          doAfterNext : 2
          doOnEach :OnNextNotification[3]
          onNext 3
          doAfterNext : 3
          doOnEach :OnNextNotification[4]
          onNext 4
          doAfterNext : 4
          doOnEach :OnNextNotification[5]
          onNext 5
          doAfterNext : 5
          doOnEach :OnCompleteNotification
          doOnComplete :
          doOnTerminate :
          onComplete
          doFinally :
          doAfterTerminate :
================================

          doOnSubscribe
          doOnLifecycle : accept
          onSubscribe
          doOnEach :OnNextNotification[1]
          onNext 1
          doAfterNext : 1
          doOnEach :OnNextNotification[2]
          onNext 2
          doAfterNext : 2
          doOnEach :OnNextNotification[3]
          onNext 3
          doAfterNext : 3
          doOnTerminate :
          doOnError :
          onError There is a Error!!
          doFinally :
          doAfterTerminate :

● Materialize/Dematerialize
Materialize返回一个被观察者对象,该对象发射源数据的所有数据,以及通知,每一项item通过一个标记类Notification封装源数据以及通知。Dematerialize 则和materialize功能相反。



 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("aaaa");
                e.onNext("bbbb");
                e.onNext("cccc");
                e.onComplete();
            }
        }).materialize()
                .map(new Function<Notification<String>, Notification<String>>() {
            @Override
            public Notification<String> apply(Notification<String> stringNotification) throws Exception {
                print("materialize:"+stringNotification +"--->getValue:"+stringNotification.getValue()
                        +"--->isOnComplete:"+stringNotification.isOnComplete()
                        +"--->isOnError:"+stringNotification.isOnError() );
                return stringNotification;
            }
        }).dematerialize().subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                print("dematerialize:"+o.toString());//
            }
        });
输出结果
         materialize:OnNextNotification[aaaa]--->getValue:aaaa--->isOnComplete:false--->isOnError:false
           materialize:OnNextNotification[bbbb]--->getValue:bbbb--->isOnComplete:false--->isOnError:false
           materialize:OnNextNotification[cccc]--->getValue:cccc--->isOnComplete:false--->isOnError:false
           materialize:OnCompleteNotification--->getValue:null--->isOnComplete:true--->isOnError:false
           dematerialize:aaaa
           dematerialize:bbbb
           dematerialize:cccc

● Serialize
当ObservalbeSource数据源是从不同线程回调观察者(发射数据),那么极有可能出现其中一个线程调用观察者的onComplete()或则onError()发生在另一个线程调用onNext()之前,或则两个线程同时第调用观察者的onNext(),而Serialize 操作是给观察者的回调添加同步锁synchronized,来确保Observalbe对其观察者进行序列化的调用.

● TimeInterval
返回上级数据源每个数据从接收到发送的时间间隔的Observable。

● Timeout
当一个事件流中每一个数据在一定时间内没有发射出去,则抛出超时异常

● Timestamp
返回每个数据源发射的时候的时间戳的Observable。

 Observable.intervalRange(0,10,0,500,TimeUnit.MILLISECONDS)
                .timeInterval().subscribe(new Consumer<Timed<Long>>() {
            @Override
            public void accept(Timed<Long> longTimed) throws Exception {
                print("timeInterval---Timed--->"+longTimed.time());//0
            }
        });
        Observable.intervalRange(0,10,0,5500,TimeUnit.MILLISECONDS)
                .timeout(5000,TimeUnit.MILLISECONDS )
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        print("timeout---->"+aLong);//0
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("timeout----Throwable>"+throwable.getMessage());
                    }
                });
        Observable.intervalRange(0,10,0,5500,TimeUnit.MILLISECONDS)
                .timestamp()
                .subscribe(new Consumer<Timed<Long>>() {
                    @Override
                    public void accept(Timed<Long> longTimed) throws Exception {
                        print("timestamp---Timed--->"+longTimed.time());//1510388694052
                    }
                });
         11-11 16:24:54.034 : timeInterval---Timed--->0
         11-11 16:24:54.044   timeout---->0
         11-11 16:24:54.044   timestamp---Timed--->1510388694052
         11-11 16:24:54.544   timeInterval---Timed--->500
         11-11 16:24:55.034   timeInterval---Timed--->500
         11-11 16:24:55.544   timeInterval---Timed--->500
         11-11 16:24:56.034   timeInterval---Timed--->500
         11-11 16:24:56.544   timeInterval---Timed--->500
         11-11 16:24:57.034   timeInterval---Timed--->500
         11-11 16:24:57.544   timeInterval---Timed--->500
         11-11 16:24:58.034   timeInterval---Timed--->500
         11-11 16:24:58.534   timeInterval---Timed--->500
         11-11 16:24:59.044   timeout----Throwable>null
         11-11 16:24:59.544   timestamp---Timed--->1510388699553
         11-11 16:25:05.044   timestamp---Timed--->1510388705053
         11-11 16:25:10.544   timestamp---Timed--->1510388710553
         11-11 16:25:16.044   timestamp---Timed--->1510388716053
         11-11 16:25:21.544   timestamp---Timed--->1510388721553
         11-11 16:25:27.044   timestamp---Timed--->1510388727053
         11-11 16:25:32.544   timestamp---Timed--->1510388732553
         11-11 16:25:38.044   timestamp---Timed--->1510388738053
         11-11 16:25:43.544   timestamp---Timed--->1510388743553

● Using
通过对源资源对象的生命周期的控制(对源数据订阅),产生一个对源数据经过处理后的ObservableSource

 Observable.using(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "hello";//----源数据
            }
        }, new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                return Observable.just(s+"----》你好!");//--------目标数据
            }
        }, new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                print("using----->"+s);//hello----收到源数据
                throw new Exception("源数据-----Error :"+s);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                print("using Consumer accept----->" + s);//hello----》你好!
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                print("using Consumer throwable----->" + throwable.getMessage());
            }
        });
输出结果:
         using Consumer accept----->hello----》你好!
         using----->hello
         using Consumer throwable----->源数据-----Error :hello

● To
转换操作。
blockingIterable blockingLatest blockingMostRecent blockingNext sorted to toFuture toList toMap toMultimap toSortedList

String first = Observable.just("aaaa",2,3).blockingFirst().toString();
        print(""+first);//aaaa
        Iterable<String> stringIterable = Observable.just("1","2","3").blockingIterable();
        Iterator iterator = stringIterable.iterator();
        while (iterator.hasNext()){
            print(""+iterator.next());
        }
        //1,2,3


        Observable.just("1","2","3").toMap(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s+"+"+s;
            }
        }).subscribe(new Consumer<Map<String, String>>() {
            @Override
            public void accept(Map<String, String> stringStringMap) throws Exception {
                print("toMap   "+stringStringMap );//{2+2=2, 3+3=3, 1+1=1}
            }
        });
        Observable.just(5,3,6,3,9,4)
                .toSortedList().subscribe(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> integers) throws Exception {
                print("toSortedList"+integers);//[3, 3, 4, 5, 6, 9]
            }
        });  

● Retry
当发生错误的时候,重新发射数据。

 Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("2222");
                e.onError(new Throwable("Sorry!! an error occured sending the data"));
            }
        }).retry(3)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        print("retry--->" + s);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("retry--->throwable:" + throwable.getMessage());
                    }
                });
       输出结果:
         retry--->2222
         retry--->2222
         retry--->2222
         retry--->2222
         retry--->throwable:Sorry!! an error occured sending the data

● cache
当第一次订阅时,缓存所有的项目和通知,以使后续订阅者也可以接收到数据

ObservableEmitter<String> emitter = null;
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                emitter = e;
                emitter.onNext("1-----onNext");

            }
        });
        Observable.intervalRange(0, 5, 100, 5, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                emitter.onNext("intervalRange  send " + aLong);
            }
        });

        observable.cache().subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String aLong) throws Exception {
                        print("no cache---->" + aLong);
                    }
                });
        observable.delay(2000,TimeUnit.MILLISECONDS).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String aLong) throws Exception {
                        print(" cache---->" + aLong);
                    }
                });
        observable
                .delay(4000,TimeUnit.MILLISECONDS)
                .onTerminateDetach()
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String aLong) throws Exception {
                        print("onTerminateDetach cache---->" + aLong);
                    }
                });
输出结果:
/**
 *================================================no  cache ===========
  no cache---->1-----onNext
  cache---->intervalRange  send 0
  cache---->1-----onNext
  cache---->intervalRange  send 1
  cache---->intervalRange  send 2
  onTerminateDetach cache---->1-----onNext
  onTerminateDetach cache---->intervalRange  send 3
  onTerminateDetach cache---->intervalRange  send 4

 ===================================================cache===========
 : no cache---->1-----onNext
   cache---->1-----onNext
   onTerminateDetach cache---->1-----onNext
   onTerminateDetach cache---->intervalRange  send 0
   onTerminateDetach cache---->intervalRange  send 1
   onTerminateDetach cache---->intervalRange  send 2
   onTerminateDetach cache---->intervalRange  send 3
   onTerminateDetach cache---->intervalRange  send 4
 */

● cast
在将其转换为指定类型后,从源观察源发出每个项目,实际上通过map(Functions.castFunction(clazz))来实现,本质上是一个map操作。

 Observable.just("1", "2", "3") 
                .cast(Integer.class)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer val) throws Exception {
                        print("cast---->" + val);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        print("" + throwable.getMessage());//java.lang.String cannot be cast to java.lang.Integer
                    }
                });

● compose
自定义操作符,参数为ObservableTransformer ,可以继承ObservableTransformer 实现方法apply,来制定自己的运算符。

 Observable.just("1", "2", "3")
                .compose(schedulersTransformer())
                .subscribe();
//自定义线程调度操作符
 public ObservableTransformer schedulersTransformer() {
        return new ObservableTransformer() {
            @Override
            public ObservableSource apply(Observable upstream) {
                return upstream.subscribeOn(Schedulers.computation())
                        .observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

总结
终于将这些Rxjava2.0的操作符讲完了,哈哈!!妈妈再也不用担心我不会用RxJava操作符了!!


噢耶!

这可能是世上最全操作符详解,虽然每个演示的Demo简单,但是应该可以根据输出结果理解,如果还不太明白,或者有疑问,动手自己敲段代码跑一下。哈哈!小伙伴们,不要忘记点个赞哦!

本系列文章的demo演示代码下载地址:
https://github.com/Callanna/RxLoad.git
找到该项目下的demo的module就可以了哦。
同时也可以支持一下我正在写的RxLoad这个类库,一个使用Rxjava实现加载图片,加载文件,加载网页的lib。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容