史上最浅显易懂的RxJava入门教程

因为工作需要刚好在学习 RxJava + Retrofit2 + OkHttp3 网络请求框架,网上搜了一些 RxJava 的教程,并不是很好理解,所幸最后我找到了几篇有助于初学者了解 RxJava 的文章,于是结合自己的理解,重新整理成一篇发给大家,希望通过我的咀嚼,能够帮助大家更快的了解和上手 RxJava,尤其是文章的最后,你将理解是怎样一种优势,乃至于我们开始考虑用 RxJava 来替代传统方式。话不多说,以下正文。

观察者模式

RxJava 涉及到所谓的观察者模式。
观察者模式,按我的理解,就是所谓的被观察者和订阅者之间的关系。
其形式是,在作为被观察者的一个方法中,包含着一段订阅,该订阅的本质是抽象类或接口的引用对抽象方法的调用,而该订阅实际指向的是作为实现类的订阅者具体实现的方法。
这实际上就是用到了面向对象编程中“向上转型”的概念。向上转型的套路是:实现类实现了抽象类定义的抽象方法;将实现类的对象赋值给抽象类的引用;然后通过抽象类的引用来调用抽象方法,而该调用实际指向实现类具体实现的方法……好吧,我这里相当于把面向对象的基础又过了一遍,不懂向上转型概念的先滚去看 Mars 的 Java4Android 教程 - - )

//抽象类或接口
public interface Printer{
    void print(String s);//定义抽象函数
}

//实现类实现抽象函数
class CanonPrinter implement Printer{
    @override
    public void print(String s){
        System.out.println(s);
    }
}

//在被观察者中,通过抽象类或接口的引用来调用抽象函数
private static void printSth(Printer printer){
    printer.print("ha ha ha");
}

//执行被观察者时,传入实现类的对象作为实参赋值给抽象类或接口的引用,
//从而在被观察者中通过抽象类或接口的引用来调用print抽象方法,
//而该抽象方法实际指向实现类实现的print方法
public static void main(String[] args){
    printSth(new CanonPrinter());
}

所以若是真要理解成观察和被观察,按照这个逻辑也的确说的过去,也就是说,被观察者刚好触发了这里,然后直接跳转到订阅者实现的方法内部去具体执行被触发后该做出的反应。

被观察者

Observable<String> myObservable = Observable.create( 
  new Observable.OnSubscribe<String>() { 
    @Override 
    public void call(Subscriber<? super String> sub) { 
      sub.onNext("aaa"); 
            sub.onNext("bbb");
      sub.onCompleted(); 
    } 
  } 
); 

订阅者

Subscriber<String> mySubscriber = new Subscriber<String>() { 
  @Override 
  public void onNext(String s) { System.out.println(s); } 

  @Override 
  public void onCompleted() { } 

  @Override 
  public void onError(Throwable e) { } 

}; 

关联起来

myObservable.subscribe(mySubscriber);

有没有觉得和
btn.setOnClickListener(onClickListener)很相似,是的,就是这样。
这一步就是将实现类的对象赋值给抽象类或接口的引用。

不过,观察者和订阅者的创建也有简化版。
例如当观察者只触发一个事件(onNext)时,可以直接使用 just。

观察者

Observable<String> myObservable = Observable.just("Hello, world!"); 

订阅者
当不关心 OnComplete 和 OnError,只需要在 onNext 时做一些处理,可以用 Action1 类

Action1<String> onNextAction = new Action1<String>() { 
  @Override 
  public void call(String s) { 
    System.out.println(s); 
  } 
}; 

subscribe 方法有一个重载版本,接受1~3个 Action1 类型的参数,分别对应 OnNext,OnError,OnComplete
然后我们现在只需要 onNext,就只需要传入一个参数

myObservable.subscribe(onNextAction);

这样,上述的代码最终可以写成

Observable.just("aaa") 
  .subscribe(new Action1<String>() { 
    @Override 
    public void call(String s) { 
       System.out.println(s); 
    } 
  }); 

注意,泛型类不能直接写在类中作为对象的实例变量,当它没有被指定类型时,因为泛型是编译时,不是运行时。上述代码因为 Observable 不指定具体<T>,因此,你应写在方法中,使 Observable 作为局部变量。

而使用 lambda,可以使代码变得更简洁

Observable.just("aaa") 
  .subscribe(s -> System.out.println(s)); 

那如果我想做一些手脚怎么办,特别是,观察者不能被改动,订阅者也不能被改动的时候
这时我们可以通过操作符 map 来完成改动(操作符还有很多,负责中间过程改动的,是map)

例如我给输入的文字加个后缀

Observable.just("aaa") 
 .map(new Func1<String, String>() { 
   @Override 
   public String call(String s) { 
     return s + ".jpg"; 
   } 
 }) 
 .subscribe(s -> System.out.println(s));

lambda:

Observable.just("aaa") 
  .map(s -> s + ".jpg") 
  .subscribe(s -> System.out.println(s));

或者在中间过程获得类型不同的数据

Observable.just("aaa") 
  .map(s -> s.hashCode()) 
  .map(i -> Integer.toString(i)) 
  .subscribe(s -> System.out.println(s));

那么到这里为止,我们知道了,RxJava 本质上和我们之前用的“回调”性质是一样的,就是作为观察者的一方调用抽象函数,作为订阅者的另一方实现抽象函数,并且二者通过 subscribe 方法关联,也就是,在观察者的 subscribe 方法中实现订阅者的匿名内部类,来具体实现被触发的方法。

但是故事说到这里,好像还是没有异步什么事啊,它分明就是“同线程中的身首异处”嘛。

下面来看看操作符

操作符

上述我们已经提到了操作符 map。
然后此处我们有个这样的需求:输入一个关键字,返回相关结果的 url 列表。相当于搜索引擎。

一般我们会这么做

Observable<List<String>> query(String text);

query("Hello, world!") 
  .subscribe(urls -> { 
    for (String url : urls) { 
      System.out.println(url); 
    } 
  }); 

有for循环,这样看起来有点繁杂
我们可以通过操作符 from,对数组中的数据进行逐个处理

Observable.from(urls) 
  .subscribe(url -> System.out.println(url));

也就是

query("Hello, world!") 
  .subscribe(urls -> { 
    Observable.from(urls) 
      .subscribe(url -> System.out.println(url)); 
  }); 

但这样破坏了 RxJava 的结构,因为订阅者最好是保持本身单一的功能,而数据的改变最好在中间过程中通过操作符来完成。
此时我们可以通过操作符 flatMap 来处理这个问题。

Observable.flatMap() 接收一个 Observable 的输出作为输入,同时输出另外一个Observable

query("Hello, world!") 
  .flatMap(new Func1<List<String>, Observable<String>>() { 
    @Override 
    public Observable<String> call(List<String> urls) { 
      return Observable.from(urls); 
    } 
  }) 
  .subscribe(url -> System.out.println(url));

lambda:

query("Hello, world!") 
  .flatMap(urls -> Observable.from(urls)) 
  .subscribe(url -> System.out.println(url)); 

flatMap 输出的新的 Observable 正是我们在 Subscriber 想要接收的。现在 Subscriber 不再收到 List<String>,而是收到一些列单个的字符串,就像 Observable.from() 的输出一样

接着前面的例子,现在我不想打印 url 了,而是要打印收到的每个网站的标题。我当然不能直接将 getTitle 方法放进订阅者, 因为一再强调了,订阅者实现的功能要单一,不要轻易改动。那我将考虑在中间过程中来完成这个改动,这样的话,我的 getTitle 方法每次只能传入一个 url,并且返回值不是一个 String,而是一个输出 String 的 Observable 对象。flatMap 不正是这么运作的吗(通过输入的 Observable 对象得到里面的数据,再将数据输出,以 Observable 对象的方式传递),所以还是考虑使用 flatMap。

query("Hello, world!") 
  .flatMap(urls -> Observable.from(urls)) 
  .flatMap(new Func1<String, Observable<String>>() { 
    @Override 
    public Observable<String> call(String url) { 
      return getTitle(url); 
    } 
  }) 
  .subscribe(title -> System.out.println(title));

lambda:

query("Hello, world!") 
  .flatMap(urls -> Observable.from(urls)) 
  .flatMap(url -> getTitle(url)) 
  .subscribe(title -> System.out.println(title)); 

那么如果我想过滤掉一些情况,例如返回 url 为 null 的,我不要它们的标题了,过滤掉,可以用操作符 filter。

query("Hello, world!") 
  .flatMap(urls -> Observable.from(urls)) 
  .flatMap(url -> getTitle(url)) 
  .filter(title -> title != null) 
  .subscribe(title -> System.out.println(title));

如果想指定输出元素的数量,可以用 take。
以下限制输出 5 个。

query("Hello, world!") 
  .flatMap(urls -> Observable.from(urls)) 
  .flatMap(url -> getTitle(url)) 
  .filter(title -> title != null) 
  .take(5) 
  .subscribe(title -> System.out.println(title)); 

如果想在每次触及订阅之前,完成一些事情,可以用 doOnNext。
例如以下在输出之前,将标题保存到某处。

query("Hello, world!") 
  .flatMap(urls -> Observable.from(urls)) 
  .flatMap(url -> getTitle(url)) 
  .filter(title -> title != null) 
  .take(5) 
  .doOnNext(title -> saveTitle(title)) 
  .subscribe(title -> System.out.println(title)); 

其他的操作符还可以来这篇文章了解
https://mrfu.me/2016/01/10/RxWeekend/#tips7

介绍完操作符,现在开始讲重点。
为什么用 RxJava,它和传统的回调以及异步处理相比存在什么优势?

错误处理

Observable.just("Hello, world!")
  .map(s -> potentialException(s))
  .map(s -> anotherPotentialException(s))
  .subscribe(new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { System.out.println("Completed!"); }

    @Override
    public void onError(Throwable e) { System.out.println("Ouch!"); }

  });

代码中的 potentialException() 和 anotherPotentialException() 有可能会抛出异常。每一个 Observerable 对象在终结的时候都会调用 onCompleted() 或者 onError() 方法,所以 Demo 中会打印“Completed!”或者“Ouch!”

这种模式有以下几个优点:

1.只要有异常发生 onError() 一定会被调用
这极大的简化了错误处理。只需要在一个地方处理错误即可以。

2.操作符不需要处理异常
将异常处理交给订阅者来做,Observerable 的操作符调用链中一旦有一个抛出了异常,就会直接执行 onError() 方法。

3.你能够知道什么时候订阅者已经接收了全部的数据
知道什么时候任务结束能够帮助简化代码的流程。

这种错误处理方式比传统的错误处理更简单。传统的错误处理中,通常是在每个回调中处理错误。这不仅导致了重复的代码,并且意味着每个回调都必须知道如何处理错误,你的回调代码将和调用者紧耦合在一起。

使用 RxJava,Observable 对象根本不需要知道如何处理错误!操作符也不需要处理错误状态:一旦发生错误,就会跳过当前和后续的操作符。所有的错误处理都交给订阅者来做
(这里的意思是说,传统的方式,就算也存在中间过程,那错误也需要在中间过程的每一步中分别处理。而 RxJava 不需要在中间过程处理错误,有错误直接跳到最后,统一由订阅者处理)

调度器

使用 RxJava,你可以使用 subscribeOn() 指定观察者代码运行的线程,使用 observerOn() 指定订阅者运行的线程
(这个过程简单的说就是,订阅者在主线程观察被观察者,所以这个动作被成为 observeOn。而被观察者在异线程添加了订阅者的订阅,所以这个动作叫 subscribeOn,其实说 be subscribe on 更合适)

myObservableServices.retrieveImage(url)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

subscribeOn() 和 observerOn() 可以被添加到任何 Observable 对象上,这两个也是操作符。我不需要关心 Observable 对象以及它上面有哪些操作符。仅仅运用这两个操作符就可以实现在不同的线程中调度

如果使用 AsyncTask 或者其他类似的,我将不得不仔细设计我的代码,找出需要并发执行的部分。使用 RxJava,我可以保持代码不变,仅仅在需要并发的时候调用这两个操作符就可以

订阅

当调用 Observable.subscribe(),会返回一个 Subscription 对象。这个对象代表了被观察者和订阅者之间的联系

Subscription subscription = Observable.just("Hello, World!")
  .subscribe(s -> System.out.println(s));
subscription.unsubscribe();

RxJava 的另外一个好处就是它处理 unsubscribing 的时候,会停止整个调用链。如果你使用了一串很复杂的操作符,调用 unsubscribe 将会在他当前执行的地方终止。不需要做任何额外的工作!

最后,再次声明一遍,这篇文章的学习,其主线是参考自以下这篇博客,是一篇对国外大神的译文。
http://blog.csdn.net/lzyzsd/article/details/41833541/

后面的文章,链接在这里,目前先没有这个打算继续看下去……
http://blog.csdn.net/lzyzsd/article/details/45033611/

以及有人推荐这篇,不知道讲的什么鬼~
http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容