Rxandroid基础

一、Rxjava环境配置

使用android studio,gradle脚本中加入依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.5'

二、示例

Rxjava基本示例

Observable.just("one", "two", "three", "four", "five")
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(/* an Observer */);
Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
        .observeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe(/* an Observer */)

Rxjava进阶使用示例

rxjava实现缓存机制

@Override
    public Observable<List<Test>> getDatas() {

        if (mCachedTests != null && !mCachedTests.isEmpty() && !mCacheIsDirty) {
            Log.i(TAG,"mCachedTests dataget");
            return Observable.fromIterable(mCachedTests.values()).toList().toObservable();
        } else if (mCachedTests == null) {
            mCachedTests = new LinkedHashMap<>();
        }

        Observable<List<Test>> remoteTests = loadTestRemoteDataSource();
        if (mCacheIsDirty) {
            return remoteTests;
        } else {
            // Query the local storage if available. If not, query the network.
            Observable<List<Test>>localTests = loadTestsLocalDataSource();

            return Observable.concat(localTests, remoteTests)
                    .firstElement().toObservable();
        }
    }

    private Observable<List<Test>> loadTestRemoteDataSource(){

        return mTestRemoteDataSource.getDatas()
                .filter(testList -> !testList.isEmpty())
                .flatMap(new Function<List<Test>, ObservableSource<List<Test>>>() {
                    @Override
                    public ObservableSource<List<Test>> apply(@io.reactivex.annotations.NonNull List<Test> testList) throws Exception {
                        mCachedTests.clear();
                        return Observable.fromIterable(testList).doOnNext(test -> mCachedTests.put(test.getId(),test)).toList().toObservable();
                    }
                })
                .doOnNext(mCachedTests->{
                    mTestLocalDataSource.refreshData();
                    mTestLocalDataSource.saveDatas(mCachedTests);
                })
                .doOnComplete(()-> mCacheIsDirty=false);
    }

    private Observable<List<Test>> loadTestsLocalDataSource(){

        return mTestLocalDataSource.getDatas()
                .firstElement()//problem : https://github.com/square/sqlbrite/issues/123
                .toObservable()
                .filter(testList -> !testList.isEmpty())
                .flatMap(new Function<List<Test>, ObservableSource<List<Test>>>() {
                    @Override
                    public ObservableSource<List<Test>> apply(@io.reactivex.annotations.NonNull List<Test> testList) throws Exception {
                        mCachedTests.clear();
                        return Observable.fromIterable(testList).doOnNext(test -> mCachedTests.put(test.getId(),test)).toList().toObservable();
                    }
                })
                .doOnNext(testList->Log.i(TAG,"mTestLocalDataSource doOnNext"))
                .doOnComplete(()->Log.i(TAG,"mTestLocalDataSource doOnComplete"));
    }

rxjava封装网络请求

public class RxjavaHttp {

    /**
     * 设置context
     * @param context 暂时不能使用ApplicationContext,可以为空
     * @return 当前operator
     */
    public static Operator with(@Nullable Context context){
        Operator operator = new Operator();
        return operator.with(context);
    }


    public static class Operator{


        private Operator operator;
        private boolean willchecknet = true;
        private boolean willcheckresult = true;

        public Operator(){
            this.operator = this;
        }

        private WeakReference<Context> contextWeakReference;

        /**
         * 设置context
         * @param context 暂时不能使用ApplicationContext
         * @return 当前operator
         */
        public Operator with(Context context){
            if (context == null) {
                throw new IllegalArgumentException("You cannot start a load on a null Context");
            } else if (context instanceof Application) {
                throw new IllegalArgumentException("You cannot use a context instanceof Application");
            } else {
                contextWeakReference = new WeakReference<Context>(context);
            }
            return operator;
        }

        /**
         * 请求前是否检查网络,默认值true,没有网络时将抛出NetworkErrorException(message)
         * @param willchecknet 是否检查网络
         * @return 当前operator
         */
        public Operator willchecknetfirst(boolean willchecknet){
            this.willchecknet = willchecknet;
            return operator;
        }

        /**
         * 请求完成是否检查结果,默认值true,接口没有正常返回结果时将抛出NetworkErrorException(message)
         * @param willcheckresult 是否检查结果
         * @return 当前operator
         */
        public Operator willcheckresult(boolean willcheckresult){
            this.willcheckresult = willcheckresult;
            return operator;
        }


        /*****************************************************************************************************
         *
         * 以下方法为请求的调用,返回数据源用作后续操作,没有对线程进行调度,用作http和其他数据源结合使用时或需要获取原始数据源时。
         *
         *****************************************************************************************************/


        /**
         * 获得一个http请求Observable
         * @param bean 请求参数
         * @param httpRequest 请求体
         * @param <T> 请求体泛型
         * @return http请求数据源
         */
        public <T> Observable<ResponesBean>excute(T bean, Function<T,ObservableSource<ResponesBean>> httpRequest){

            return Observable.just(bean).filter(new CheckNetPredicate<T>(contextWeakReference.get(),willchecknet))
                    .flatMap(httpRequest).filter(new HttpResultPredicate(willcheckresult));
        }

        /**
         * 获得一个get请求的Observable
         * @param url 请求地址
         * @return get请求数据源
         */
        public Observable<ResponesBean> get(String url){
            return excute(url, new Function<String, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(String url) throws Exception {
                    return Observable.just(HttpFactory.get(url)).subscribeOn(Schedulers.io());
                }
            });
        }


        /**
         * 获得一个post请求的Observable
         * @param rxjavaHttpBean 请求体
         * @param <T> RxjavaHttpBean的子类
         * @return post请求数据源
         */
        public <T extends RxjavaHttpBean> Observable<ResponesBean> post(T rxjavaHttpBean){
            return excute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.post(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getParams(),rxjavaHttpBean.getHeaders())).subscribeOn(Schedulers.io());
                }
            });
        }


        /**
         * 获得一个upload请求的Observable
         * @param rxjavaHttpBean 请求体
         * @param <T> RxjavaHttpBean的子类
         * @return upload请求数据源
         */
        public <T extends RxjavaHttpBean> Observable<ResponesBean> upload(T rxjavaHttpBean){
            return excute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.formUpload(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getFiles(),rxjavaHttpBean.getParams())).subscribeOn(Schedulers.io());
                }
            });
        }


        /*****************************************************************************************************
         *
         * 以下方法为简单的调用,直接传入Consumer,io线程请求,ui线程返回结果,并根据错误进行toast弹出。
         *
         *****************************************************************************************************/

        /**
         * 直接执行http请求,io线程请求,ui线程返回结果,并根据错误进行toast弹出
         * @param bean 请求参数
         * @param httpRequest 请求体
         * @param consumer 订阅者
         * @param <T> 请求体泛型
         * @return 返回Disposable用作生命周期控制
         */
        public <T> Disposable simpleExcute(T bean, Function<T,ObservableSource<ResponesBean>> httpRequest, Consumer<ResponesBean>consumer){

            return Observable.just(bean).filter(new CheckNetPredicate<T>(contextWeakReference.get(),willchecknet))
                    .flatMap(httpRequest).filter(new HttpResultPredicate(willcheckresult))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .onErrorReturn(new HttpToastErrorReturn(contextWeakReference.get()))
                    .subscribe(consumer);
        }

        /**
         * 直接执行get请求,io线程请求,ui线程返回结果,并根据错误进行toast弹出
         * @param url 请求地址
         * @param consumer 订阅者
         * @return 返回Disposable用作生命周期控制
         */
        public Disposable simpleGet(String url, Consumer<ResponesBean>consumer){
            return simpleExcute(url, new Function<String, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(String url) throws Exception {
                    return Observable.just(HttpFactory.get(url)).subscribeOn(Schedulers.io());
                }
            },consumer);
        }


        /**
         * 直接执行post请求,io线程请求,ui线程返回结果,并根据错误进行toast弹出
         * @param rxjavaHttpBean 请求内容
         * @param consumer 订阅者
         * @return 返回Disposable用作生命周期控制
         */
        public <T extends RxjavaHttpBean>  Disposable simplePost(T rxjavaHttpBean, Consumer<ResponesBean>consumer){
            return simpleExcute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.post(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getParams(),rxjavaHttpBean.getHeaders())).subscribeOn(Schedulers.io());
                }
            },consumer);
        }

        /**
         * 直接执行upload请求,io线程请求,ui线程返回结果,并根据错误进行toast弹出
         * @param rxjavaHttpBean 请求内容
         * @param consumer 订阅者
         * @return 返回Disposable用作生命周期控制
         */
        public <T extends RxjavaHttpBean>  Disposable simpleUpload(T rxjavaHttpBean, Consumer<ResponesBean>consumer){
            return simpleExcute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.formUpload(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getParams(),rxjavaHttpBean.getFiles())).subscribeOn(Schedulers.io());
                }
            },consumer);
        }

    }
}

三、Rxjava操作符详解

Creating Observables

Operators that originate new Observables.
Create — create an Observable from scratch by calling observer methods programmatically
Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
Empty Never Throw — create Observables that have very precise and limited behavior
From — convert some other object or data structure into an Observable
Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
Just — convert an object or a set of objects into an Observable that emits that or those objects
Range — create an Observable that emits a range of sequential integers
Repeat — create an Observable that emits a particular item or sequence of items repeatedly
Start — create an Observable that emits the return value of a function
Timer — create an Observable that emits a single item after a given delay

Transforming Observables

Operators that transform items that are emitted by an Observable.
Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
Map — transform the items emitted by an Observable by applying a function to each item
Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

Filtering Observables

Operators that selectively emit items from a source Observable.
Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
Distinct — suppress duplicate items emitted by an Observable
ElementAt — emit only item n emitted by an Observable
Filter — emit only those items from an Observable that pass a predicate test
First — emit only the first item, or the first item that meets a condition, from an Observable
IgnoreElements — do not emit any items from an Observable but mirror its termination notification
Last — emit only the last item emitted by an Observable
Sample — emit the most recent item emitted by an Observable within periodic time intervals
Skip — suppress the first n items emitted by an Observable
SkipLast — suppress the last n items emitted by an Observable
Take — emit only the first n items emitted by an Observable
TakeLast — emit only the last n items emitted by an Observable

Combining Observables

Operators that work with multiple source Observables to create a single Observable
And Then When — combine sets of items emitted by two or more Observables by means of Pattern
and Plan
intermediaries
CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
Merge — combine multiple Observables into one by merging their emissions
StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Error Handling Operators

Operators that help to recover from error notifications from an Observable
Catch — recover from an onError
notification by continuing the sequence without error
Retry — if a source Observable sends an onError
notification, resubscribe to it in the hopes that it will complete without error

Observable Utility Operators

A toolbox of useful Operators for working with Observables
Delay — shift the emissions from an Observable forward in time by a particular amount
Do — register an action to take upon a variety of Observable lifecycle events
Materialize Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
ObserveOn — specify the scheduler on which an observer will observe this Observable
Serialize — force an Observable to make serialized calls and to be well-behaved
Subscribe — operate upon the emissions and notifications from an Observable
SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
Timestamp — attach a timestamp to each item emitted by an Observable
Using — create a disposable resource that has the same lifespan as the Observable

Conditional and Boolean Operators

Operators that evaluate one or more Observables or items emitted by Observables
All — determine whether all items emitted by an Observable meet some criteria
Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
Contains — determine whether an Observable emits a particular item or not
DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
SequenceEqual — determine whether two Observables emit the same sequence of items
SkipUntil — discard items emitted by an Observable until a second Observable emits an item
SkipWhile — discard items emitted by an Observable until a specified condition becomes false
TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
TakeWhile — discard items emitted by an Observable after a specified condition becomes false

Mathematical and Aggregate Operators

Operators that operate on the entire sequence of items emitted by an Observable
Average — calculates the average of numbers emitted by an Observable and emits this average
Concat — emit the emissions from two or more Observables without interleaving them
Count — count the number of items emitted by the source Observable and emit only this value
Max — determine, and emit, the maximum-valued item emitted by an Observable
Min — determine, and emit, the minimum-valued item emitted by an Observable
Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
Sum — calculate the sum of numbers emitted by an Observable and emit this sum

Backpressure Operators

backpressure operators — strategies for coping with Observables that produce items more rapidly than their observers consume them

Connectable Observable Operators

Specialty Observables that have more precisely-controlled subscription dynamics
Connect — instruct a connectable Observable to begin emitting items to its subscribers
Publish — convert an ordinary Observable into a connectable Observable
RefCount — make a Connectable Observable behave like an ordinary Observable
Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items

Operators to Convert Observables

To — convert an Observable into another object or data structure

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351