Rxjava使用入门

什么是 ReactiveX?

   ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。

   实时数据处理是一件普通的现象,有一个高效、干净和可扩展的方式来处理这些情景是重要的。使用 Observables 和 Operators 来熟练操作它们。ReactiveX 提供一个可组合又灵活的 API 来创建和处理数据流,同时简化了异步编程带来的一些担忧,如:线程创建和并发问题。

RxJava 简介

   RxJava 是 ReactiveX 在 Java 上的开源的实现。Observable(被观察者) 和 Subscriber(订阅者)是两个主要的类。在 RxJava 上,一个 Observable 是一个发出数据流或者事件的类,Subscriber 是一个对这些发出的 items (数据流或者事件)进行处理(采取行动)的类。一个 Observable 的标准流发出一个或多个 item,然后成功完成或者出错。一个 Observable 可以有多个 Subscribers,并且通过 Observable 发出的每一个 item,该 item 将会被发送到 Subscriber.onNext() 方法来进行处理。一旦 Observable 不再发出 items,它将会调用 Subscriber.onCompleted() 方法,或如果有一个出错的话 Observable 会调用 Subscriber.onError() 方法。

   现在,我们知道了很多关于 Observable 和 Subscriber 类,我们可以继续去介绍有关 Observables 的创建和订阅。

Observable integerObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
});

   这个 Observable 发出了整数 1,2,3 然后结束了。现在我们需要创建一个 Subscriber,那样我们就能让这些发出的流起作用。

Subscriber integerSubscriber = new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {
   }
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
};

   我们的 Subscriber 只是简单的把任何发出的 items 打印出来,完成之后通知我们。一旦你有一个 Observable 和一个 Subscriber,可以通过 Observable.subscribe() 方法将他们联系起来。

integerObservable.subscribe(integerSubscriber);
// Outputs:
// onNext: 1
// onNext: 2
// onNext: 3
// Complete!

   上面所有这些代码可以简单的通过使用 Observable.just() 方法来创建一个 Observable 去发出这些定义的值,并且我们的 Subscriber 可以改变成匿名的内部类,如下:

Observable.just(1, 2 ,3).subscribe(new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }
   @Override
   public void onError(Throwable e) {}
   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
});

   创建和订阅一个 Observable 是足够简单的,可能这并不是非常有用的,但这只是用 RxJava 的一个开始。通过调用操作符,任何的 Observable 都能进行输出转变,多个 Operators 能链接到 Observable 上。例如,在我们刚才的 Observable 中,我们只想要收到奇数的数字。要做到这一点,我可以使用 filter() 操作符。

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }
           @Override
           public void onError(Throwable e) {
           }
           @Override
           public void onNext(Integer value) {
               System.out.println("onNext: " + value);
           }
       });
// Outputs:
// onNext: 1
// onNext: 3
// onNext: 5
// Complete!

   我们的 filter() 操作符定义了一个方法,将取出我们发出的整数,并对所有的奇数返回为 true,所有的偶数返回为 false。从我们的 filter() 返回为 false 的值是不会发出到 Subscriber 的,我们也不会在输出中看到他们。注意:filter() 操作符返回的是一个 Observable,这样我们的订阅方式就可以像之前的做法那样了。

   现在,我想找到发出的这些奇数的平方根,一种方法是在调用我们的 Subscriber 的每一个 onNext() 去计算平方根。然而,如果我们在我们的 Subscriber 中做计算平方根的操作的话,这样得到期望可能就不能进一步实现的数据的流式转换了。要做到这一点,我们可以在 filter() 操作符上链上 map() 操作符。

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .map(new Func1() {
           @Override
           public Double call(Integer value) {
               return Math.sqrt(value);
           }
       })
       .subscribe(new Subscriber() { // notice Subscriber type changed to 
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }

           @Override
           public void onError(Throwable e) {
           }

           @Override
           public void onNext(Double value) {
               System.out.println("onNext: " + value);
           }
       });
// Outputs:
// onNext: 1.0
// onNext: 1.7320508075688772
// onNext: 2.23606797749979
// Complete!

   操作符的链式使用是构成 RxJava 必不可少的一部分,让你可以灵活的实现任何你想要的需求。随着对于 Observables 和 Operators 相互作用的理解,我们可以进入下一个话题:RxJava 和 Android 的整合。

让 Android 中的线程操作变得简单

   在 Android 开发中有一个常见的场景是需要在后台线程去分担一定量的工作,一旦该任务完成,会将结果回调到主线程去显示结果。

   在 Android 中,我们有多种方法来做这样的事:用 AsyncTasks,Loaders,Services 等。然而,这些解决方式通常不是最好的。Asynctasks 很容易导致内存泄露,CursorLoaders 与 ContentProvider 需要大量的配置和设置样板代码,还有 Services 的目的是为了长时间在后台运营的,而不是处理快速完成的操作,如:做一个网络请求或者从数据库加载内容。

   让我们看看 RxJava 是怎么帮我们解决这些问题的。下面这样的布局有一个按钮去开始一个长时间运行的操作,并且始终显示进度条,这样我们可以确保我们的操作是运行在后台线程的而不是在主线程。

<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
   xmlns:app="http://schemas.android.com/apk/res-auto"
   android:id="@+id/root_view"
   android:layout_width="match_parent"
   android:layout_height="match_parent"
   android:fitsSystemWindows="true"
   android:orientation="vertical">

   <android.support.v7.widget.Toolbar
       android:id="@+id/toolbar"
       android:layout_width="match_parent"
       android:layout_height="?attr/actionBarSize"
       android:background="?attr/colorPrimary"
       app:popupTheme="@style/AppTheme.PopupOverlay"
       app:theme="@style/ThemeOverlay.AppCompat.Dark.ActionBar" />

   <Button
       android:id="@+id/start_btn"
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:layout_gravity="center_horizontal"
       android:text="@string/start_operation_text" />

   <ProgressBar
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:layout_gravity="center_horizontal"
       android:indeterminate="true" />

</LinearLayout>

   一旦按钮被点击,它会禁用按钮并开启长时间运行的操作,并且一旦这个操作完成便会显示一个 Snackbar,然后按钮会重新变得可点击。这里是一个用 AsyncTask 实现我们这个“长期运行的操作”的例子。这个按钮只是 new 了一个 SampleAsyncTask 并 executes 了它。

public String longRunningOperation() {
   try {
       Thread.sleep(2000);
   } catch (InterruptedException e) {
       // error
   }
   return "Complete!";
}

private class SampleAsyncTask extends AsyncTask {

   @Override
   protected String doInBackground(Void... params) {
       return longRunningOperation();
   }

   @Override
   protected void onPostExecute(String result) {
       Snackbar.make(rootView, result, Snackbar.LENGTH_LONG).show();
       startAsyncTaskButton.setEnabled(true);
   }
}

   现在,我们如何将这个 AsyncTask 用 RxJava 来实现呢?首先,我们需要添加以下内容到我们 app 的 gradle build 文件下: compile 'io.reactivex:rxjava:1.0.14' 。然后我们需要创建一个 Observable 来调用我们这个长时间运行的操作。这可以使用 Observable.create() 方法来做到。

final Observable operationObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(longRunningOperation());
       subscriber.onCompleted();
   }
});

   我们创建了 Observable 将会调用 longRunningOperation() 方法,将返回的结果作为参数给 onNext() 方法,然后调用 onCompleted() 来完成 Observable (注:在我们的 Observable 去订阅之前,我们的操作是不会被调用的)。接下来,当 button 被点击时,我们需要给我们的 Observable 做订阅。我添加了一个新的 button 用 RxJava 版本来处理我们的任务。

startRxOperationButton.setOnClickListener(new View.OnClickListener() {
   @Override
   public void onClick(final View v) {
       v.setEnabled(false);
       operationObservable.subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               v.setEnabled(true);
           }

           @Override
           public void onError(Throwable e) {}

           @Override
           public void onNext(String value) {
               Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show();
           }
       });
   }
});

   现在当我们建立应用程序时,然后点击新 button 时,会发生什么?我们的进度显示会冻结,然后我们 UI 变得反应迟钝。这是因为我们还没有定义我们的 Observable 应该在什么线程上,以及我们应该在什么线程去订阅它。这是 RxJava 的 Schedulers(调度器) 功能。

   对于任何 Observable 你可以定义在两个不同的线程,Observable 会操作在它上面。使用 Observable.observeOn() 可以定义在一个线程上,可以用来监听和检查从 Observable 最新发出的 items (Subscriber 的 onNext,onCompleted 和 onError 方法会执行在 observeOn 所指定的线程上),并使用 Observable.subscribeOn() 来定义一个线程,将其运行我们 Observable 的代码(长时间运行的操作)。

   RxJava 默认情况下是单线程的,你会需要利用 observeOn() 和 subscribeOn() 方法为你的应用带来多线程操作。RxJava 附带了几个现成的 Schedulers 给 Observables 使用,如:Schedulers.io() (用于 I/O 操作),Schedulers.computation()(计算工作),和 Schedulers.newThread()(为任务创建的新线程)。然而,从 Android 的角度来看,你可能想知道如何把订阅代码执行到主线程。我们可以用 RxAndroid 库来实现这一目标。

   RxAndroid 是一个对 RxJava 的轻量级扩展为了 Android 的主线程提供 Scheduler,也能去创建一个 Scheduler 用于运行在任何给定的 Android Handler 类上。用新的 Schedulers,Observable 创建之前能让我们将其修改为在后台线程执行我们的任务,并将我们的结果推到主线程上。

   要在 APP 中用 RxAndroid,只要在 gradle build 文件中添加这行代码就行了:

compile 'io.reactivex:rxandroid:1.0.1'
final Observable operationObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(longRunningOperation());
       subscriber.onCompleted();
   }
})
       .subscribeOn(Schedulers.io()) // subscribeOn the I/O thread
       .observeOn(AndroidSchedulers.mainThread()); // observeOn the UI Thread

   我们修改Observable将用 Schedulers.io() 去订阅,并用 AndroidSchedulers.mainThread() 方法将观察的结果返回到 UI 线程上 。现在,当我们建立我们的 APP 并点击我们的 Rx 操作的按钮,我们可以看到当操作运行时它将不再阻塞 UI 线程。

   所有上述的例子利用了 Observable 类来发出我们的结果,当一个操作仅仅只需要发出一个结果然后就完成的情况我们可以有另外一个选择。RxJava 发布的 1.0.13 版本介绍了 Single 类。Single 类可以用于创建像下面这样的方法:

Subscription subscription = Single.create(new Single.OnSubscribe() {
           @Override
           public void call(SingleSubscriber singleSubscriber) {
               String value = longRunningOperation();
               singleSubscriber.onSuccess(value);
           }
       })
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Action1() {
           @Override
           public void call(String value) {
               // onSuccess
               Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show();
           }
       }, new Action1() {
           @Override
           public void call(Throwable throwable) {
               // handle onError
           }
       });

   当给一个 Single 类做订阅时,只有一个 onSuccess 的 Action 和 onError 的 action。Single 类有不同于 Observable 的操作符,有几个操作符具有将 Single 转换到 Observable 的机制。例如:用 Single.mergeWith() 操作符,两个或更多同类型的 Singles 可以合并到一起去创建一个 Observable,发出每个 Single 的结果给一个 Observable。

防止内存泄露

   对于 AsyncTasks 所提到的缺点是,如果对于涉及了 Activity 或 Fragment 的处理不仔细的话,AsyncTasks 可能会造成内存泄露。不幸的是,使用 RxJava 不会魔术般的缓解内存泄露危机,但是防止内存泄露是很简单的。

   如果你一直在关注代码,你可能会注意到你调用的 Observable.subscribe() 的返回值是一个 Subscription 对象。Subscription 类只有两个方法,unsubscribe() 和 isUnsubscribed()。为了防止可能的内存泄露,在你的 Activity 或 Fragment 的 onDestroy 里,用 Subscription.isUnsubscribed() 检查你的 Subscription 是否是 unsubscribed。如果调用了 Subscription.unsubscribe() ,Unsubscribing将会对 items 停止通知给你的 Subscriber,并允许垃圾回收机制释放对象,防止任何 RxJava 造成内存泄露。如果你正在处理多个 Observables 和 Subscribers,所有的 Subscription 对象可以添加到 CompositeSubscription,然后可以使用 CompositeSubscription.unsubscribe() 方法在同一时间进行退订(unsubscribed)。

写在最后

   RxJava 在 Android 生态系统中提供非常棒的多线程选项。让我们能轻松的去后台线程做操作,然后将结果推到 UI 线程上。这对于任何 Android 应用来说都是非常需要的功能,能够运用 RxJava 的众多操作符来处理任何操作的结果仅仅是为了创造更多的附加值。然而 RxJava 要求我们对这个库有更好的了解,充分利用其功能,所花费在这个库上的时间就能让你带来更大的回报。

   这篇博客并未涉及 RxJava 的更进一步的主题:热观察 vs 冷观察、处理 backpressure、 Rx 的 Subject 类。用 RxJava 替代 AsyncTask 所涉及的示例代码可以在 Github 上找到。

英文原文地址

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,625评论 13 85
  • 最近项目里面有用到Rxjava框架,感觉很强大的巨作,所以在网上搜了很多相关文章,发现一片文章很不错,今天把这篇文...
    Scus阅读 6,876评论 2 50
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 913评论 0 2
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,468评论 7 62
  • 今天回老家祭祖,看到了好多小孩子。男孩,女孩,都出动了,带着一份孝心,有模有样地给老祖宗们磕头,说新年祝福话...
    chunma阅读 244评论 0 0