RxJava入门解析(一)

前言

随着RxJava越来越火爆,很多人想入门却不知道从何做起,笔者整理了一天,总结出一些比较实用的东西,望各位看官斧正。

一、RxJava到底是什么,有什么好处?

关键词:异步、简洁。
RxJava可以进行异步调用,使每一步都可以自行处理其所在的线程,典型的流式代码结构,像读一首诗一样顺畅。即使以后逻辑越来越复杂,关于Rx的代码依然很顺畅。为什么这么说,请往下看。

二、RxJava例子

先比较一下下方的两个代码:

  new Thread(){
            @Override
            public void run() {
                super.run();
                //网络请求~~~~~之后
                {
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            ts.setText("这是通过newThread");
                        }
                    });
                }

            }
        }.start();
  Observable.create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        //异步操作网络请求后
                        {
                            subscriber.onNext("这是通过RxJava");
                            subscriber.onCompleted();
                        }

                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        ts2.setText(o.toString());
                    }
                });

同样是开启新线程,从异步线程获取数据,然后再UI线程中写入,代码量相比较,下方的要比上面的多,但是对于易读性来讲,流式的代码结构,从上向下阅读,条理清晰,操作简单,不论以后逻辑修改多么复杂,几乎不会有太大的变化,不像第一部分的代码,如果说复杂度增加的话,就会很麻烦。

三、RxJava使用方法

本文是基于Intelli JDEA 进行的开发,使用了gradle,只需要加一行配置引入即可:

 compile 'io.reactivex:rxjava:1.2.1'

概念基础

RxJava使用的设计模式为观察者模式,主要的步骤就是创建观察者,被观察者,订阅。比较熟悉的同学可以略过这一段。

借一张图
读书人能叫盗么,这叫借.png

如上图所示,观察者模式可有四个模块:
被观察者接口类:主要提供接口声明包括订阅 解除订阅 通知功能,和保存观察者对象。
观察者接口类:主要提供一个方法供被观察者使用,当被观察者触发了一些条件后,通知观察者。
被观察者实现类及观察者实现类:实现以上接口。

举个简单的例子,在家懒得做饭订了外卖,跟送外卖的说(被观察者),等你到楼下了给我打电话(订阅),等外卖到的时候,就会给你打电话,通知你去取外卖(通知),这就是一个观察者模式应用。

使用流程

RxJava的开发主要分为三个步骤:apple、pen、Ah~


皮.jpg

1、Observer (观察者)

Observer属于观察者,我们可以看一下源码:

public interface Observer<T> {
 
    onCompleted();
    onError(Throwable e); 
    void onNext(T t);

}

很简单,就三个方法,代表着完成(onCompleted),错误(onError),接收数据(onNext)。
另外还有一个实现了Observer的抽象类Subscriber,主要方法和Observer差不多,比较大的区别就是可以使用onStart()方法和unsubscribe()方法。
onStart():主要在事件未发出的时候调用,仅能在subscribe 所发生的线程被调用,可以做一些数据处理。
unsubscribe():用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。主要用于当某些对象不再使用的时候,但是并没有收到onCompleted消息的时候调用,防止出现内存泄露。
主要实现代码:

 Subscriber observer = new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) { 
            }

            @Override
            public void onStart() {
                super.onStart();
                System.out.println("----onStart----");
            }
        };

2、Observable(被观察者)

Observable为被观察者,由被观察者来决定是否发送消息,何时发送消息,异步操作一般都在这里进行,实现方式:

 Observable observable = Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) { 
                subscriber.onNext("111");
                subscriber.onCompleted();
            }
        });

调用Observable的create方法来创建被观察者,其中call方法的参数即可视为第一步的Observer ,在进行订阅的时候,会直接执行call方法内的代码。
除了create方法外,还有简单的方法来直接实现,just(T...)方法和from(T[]) 方法

 String[] txt = {"easd","dasd","dasd"};
 Observable observable = Observable.from(txt);
 Observable observable2 = Observable.just("easd","dasd","dasd");

查看源码后,其实实现的方法也是create方法,只是经过了一些简单的封装,这里就简单介绍一下from方法。

   public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);
        }
        return create(new OnSubscribeFromArray<T>(array));
    }

又上面代码可知,from方法首先对数组进行了判断,然后return了一个Observable的对象,使用的是create()方法,那么OnSubscribeFromArray又做了什么呢?

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> 

这是OnSubscribeFromArray实际上也是实现了OnSubscribe接口,那么我们去看一下call()方法:

 @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }

        void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

        void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;

            long e = 0L;
            int i = index;

            for (;;) {

                while (r != 0L && i != n) {
                    if (child.isUnsubscribed()) {
                        return;
                    }

                    child.onNext(array[i]);

                    i++;

                    if (i == n) {
                        if (!child.isUnsubscribed()) {
                            child.onCompleted();
                        }
                        return;
                    }

                    r--;
                    e--;
                }

                r = get() + e;

                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }

主要调用了这三个方法,当数组长度等于long的最大值时,调用fastPath(),反之则 slowPath(n),两个方法的主要目的其实也就是将数组进行遍历,然后分发出去。由此可知无论是just()还是from()都可以视为与create()方法等价。

3、Subscribe (订阅)

最激动人心的时候到了,之前的准备已经完成了之后,就可以将观察者和被观察者进行订阅。

observable.subscribe(observer); 
observable.subscribe(subscriber);

Ah~~~~这样关系就定下来了。subscribe方法代码比较啰嗦,就简化一下发到下面:

 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 
        if (subscriber == null) {
            throw new IllegalArgumentException("subscriber can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null."); 
        } 
        //1 !!
        subscriber.onStart();  
        try { 
         //2 !!
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            if (subscriber.isUnsubscribed()) {
                RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
            } else {
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    RxJavaHooks.onObservableError(r);
                
                    throw r; 
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

主要就三步:
一调用subscriber的onStart()方法,
二调用被观察者observable的OnSubscribe的call方法,
三将subscriber变成Subscription 返回,可以直接使用unsubscribed()方法。
另外,subscribe方法支持不完整的回调,可以使用Action1和Action0作为回调:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};
observable.subscribe(onNextAction,onErrorAction,onCompletedAction);
//分别对应onNext,onError,onComplete。

//也可以
observable.subscribe(onNextAction,onErrorAction);
observable.subscribe(onNextAction);
//在源码里都有不同的方法去对应

以上所说都是关于RxJava的使用方法,但是!跟特么所说的异步没毛关系,并没有什么卵用,下面开始正片

三、RxJava的异步调度

这个东西专门拿出来当一个大分类来讲,在RxJava里如果没有这个东西,要他有何用。这个神奇的东西叫Scheduler,线程调度器。主要用在两个方法里,observeOn()和subscribeOn(),分别对应观察者操作线程和被观察操作线程。
总共来讲有这么几个线程:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
还有个Schedulers.from(Executor executor),你可以自定义线程池来处理。

是不是很吊!
再次回到一开始放出来的代码:

 Observable.create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        //异步操作网络请求后
                        {
                            subscriber.onNext("这是通过RxJava");
                            subscriber.onCompleted();
                        }

                    }
                })
                .subscribeOn(Schedulers.io())//Observable操作放到io线程里
                .observeOn(AndroidSchedulers.mainThread())//Observable操作放在UI线程里
                .subscribe(new Action1<Object>() {//缺省的onNext调用
                    @Override
                    public void call(Object o) {
                        ts2.setText(o.toString());
                    }
                });

这样是不是就很容易理解了?

今天就先到此为止,下一节主要介绍变换和Subject这个神奇的东西

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容