ReactiveX 和 RxJava 手记

引言:
学习了一下RxJava,理解其是一个以升级版的观察者模式为核心的异步处理库。旨在以更加简介、可读性更强的代码去实现数据异步处理和线程前通信。
下面,是本人对RxJava基础的学习笔记和总结,算是入门级别。

Rx介绍


ReactiveX 简称 Rx,全称 Reactive Extensions,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Java等几乎所有的编程语言。Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

  • Reactive: 响应式
  • LINQ: Language Integrated Query的简称,它是集成在.NET编程语言中的一种特性。已成为编程语言的一个组成部分,在编写程序时可以得到很好的编译时语法检查,丰富的元数据,智能感知、 静态类型等强类型语言的好处。
  • 迭代器模式:核心思想是:通过定义遍历或查看对象中所有元素的方法的接口,并根据不同的类进行不同的方法实现相,已达到对类数据遍历的抽象以及对类内部如何获取数据的过程进行掩盖的目的。当于Java中的Iterator(迭代器)有它的继承接口如ListIterator和它的实现类等,我们在遍历Set、Map时,用到他们的Iterator,这样,他们具体怎么拿出数据的过程,我们不用知道。
  • 观察者模式:有时被称作发布/订阅模式,观察者模式定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。【下面RxJava的使用过程就是观察者模式的体现】
  • Rx = Observables【用于表示异步数据流】 + LINQ【用它的操作符查询异步数据流】 + Schedules【参数化异步数据流的并发处理】
  • Rx用到的设计模式精华:观察者模式、迭代器模式
  • RxJava中最重要的是:Observable【被观察者,事件源】+ Subscriber【观察者,订阅者】

RxJava图解


可先通过图解总览大概:
RxJava之观察者模式的基本运作过程,如下:


RxJava之观察者模式的基本运作过程
  • <u>说明一点:Subscriber实现了Observer和Subscription</u>
  • <u>通过subscribe()方法,Observable 与 观察者绑定。</u>
  • <u>Subscriber与Observer的周期方法大概一致,Subscriber多了个用于清理数据的onStart()方法。</u>
  • <u>unsubscribe()方法在Observer对象调用完onCompleted()onError()方法后,被调用,进行订阅关系的解绑。</u>
    RxJava观察者模式顺序图,如下:
    RxJava观察者模式顺序图

注意:Subscribe<T> 是实现 Observable<T>Subscription 的一个抽象类,在调用subscribe(params)方法时,如果这个params类型为Observer<T>,则最终它会转成Subscriber<T>,同时,此方法会返回一个Subscription对象,用于调用unsubscribe()方法解绑。

单线程中RxJava基本用法和例子


1. RxJava的几种基本写法(观察者模式)

方式一:

原始的观察者模式写法,如下:

///被观察者
Observable<String> myObservable = Observable.create(
                new Observable.OnSubscribe<String>(){

                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        subscriber.onNext("hello world");
                        subscriber.onCompleted();
                    }
                }
        );

///观察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
          @Override
          public void onCompleted() {}

          @Override
          public void onError(Throwable e) {}

          @Override
          public void onNext(String s) {
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
      };

///订阅(让两者产生关联,并启动)
 myObservable.subscribe(mySubscriber);

方式二:

相对方式一,化简定义方法体的部分,使用Action来实现不完整回调,结果如下:

//被观察者
//等价于: call(String) -> onNext(String)过程只调用一次 ->onCompleted()/onError()
Observable<String> myObservable = Observable.just("Hello world");

///观察者
///调用subscribe()时自动生成Subscriber并调用onNext()
Action1<String> onNextAction = new Action1<String>() {
      @Override
      public void call(String s) {
          Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
      }
};

///观察者
///调用subscribe()时自动生成Subscriber并调用onError()
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};

///观察者
///调用subscribe()时自动生成Subscriber并调用onCompleted()
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

//////订阅(让两者产生关联,并启动)
 myObservable.subscribe(onNextAction);
 // myObservable.subscribe(onErrorAction);
 // myObservable.subscribe(onCompletedAction);

方式三:

相对方式二,进行链式调用,如下:

///省略Obervable对象的创建
Observable.just("this is your sign:")
                ///省略Action1对象的创建,直接匿名内部类方式添加订阅
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
                    }
                });

注意:

  1. just:如果只是调用: onNext() 【一到多次】 --> onCompleted()这个过程,那么,可以使用just()快速创建Observable

2. 基本应用

1. 打印字符串数组

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });

Observable.from(params) : params是数组类型的参数,在执行时,会调用Subscriber的onNext方法多次,每次处理一个item,之后,调用onCompleted()或者onError().

2. 通过id获取图片并显示

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

多线程中RxJava的使用


在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。

1. 基本写法

Observable.just(1,2,3,4)
                ///指定 subscribe() 发生在 IO 线程
                .subscribeOn(Schedulers.io())
                // 指定 Subscriber 的回调发生在主线程
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.e("TestActivity", "当前线程:"+ Thread.currentThread());
                        String res = "字符串:"+integer;
                        return res;
                    }
                })
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        Toast.makeText(TestActivity.this,"完成",Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("TestActivity", "当前线程:"+ Thread.currentThread());
                        Toast.makeText(TestActivity.this,s,Toast.LENGTH_SHORT).show();
                    }
                });

知识点:

  1. 加了map这个RxJava的映射方法,用于将事件处理的复杂过程【如:输入参数是Integer类型,输出结果是String类型】给被观察者来做,尽可能地减少观察者的工作。
    知识点:
  2. just((1,2,3,4):
    前者等价于如下代码:
```
Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            Log.e("TestActivity", "call当前线程:"+ Thread.currentThread());
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onCompleted();
        }
    })
```
  1. Scheduler:
* 背景:在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 `subscribe()`,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
* 概念:调度器(线程控制器)
* 作用:切换线程传递事件,达到异步的目的
* RxJava内置的Scheduler:(文章下面会详细总结)
  * `Schedulers.immediate()`:默认模式。直接使用当前线程运行。
  * `Schedulers.newThread()`:总是启动新线程,并在新线程中运行。
  * `Sched.io()`:I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。
  * `Schedulers.computation()`: 计算所使用的 Scheduler。
  * `AndroidSchedulers.mainThread()`:它指定的操作将在 Android 主线程运行。
  1. Obervable.subscribeOn(Scheduler):让call方法以及之前的操作,发生在指定的线程中运行
  2. Obervable.observeOn(Scheduler):让call之后的回调操作例如map、onNext等操作,发生在指定的线程中运行。

RxJava常用操作--数据转换处理


在事件传递过程中,如果观察者有需要,还可以通过数据转换处理,将传入的数据进行加工或调用,得到更多不同类型的信息。
RxJava提供给我们:map,flatMap来支持数据的‘一对一’和’一对多‘的转换。

  1. map
    作用:实现数据的一对一转化过程
    以下例子可以说明:
///省略Obervable对象的创建
Observable.just("this is your sign:")
              ///将传入的参数由String变成String[]
              .map(new Func1<String, String[]>() {
                  @Override
                  public String[] call(String s) {
                      String[] strings = s.split(" ");
                      return strings;
                  }
              })
              ///将传入的参数由String[]变成Integer
              .map(new Func1<String[], Integer>() {
                  @Override
                  public Integer call(String[] strings) {
                      int len = strings.length;
                      return len;
                  }
              })
              ///将传入的参数由Integer变成String
              .map(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {
                      return integer+"";
                  }
              })
              ///省略Action1对象的创建,直接匿名内部类方式添加订阅
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
                  }
              });

  1. flatMap
    作用:实现数据的一对多转换过程
    先看如下具体例子:
private void testFlatMap() throws CloneNotSupportedException {
      List<Student> studentList = new ArrayList<>();
      ///测试:构建两个Student对象
      Student xiaoming = new Student();
      Student honghong = new Student();
      ///测试:构建Course对象集
      Course chinese = new Course("语文");
      Course english = new Course("英语");
      Course math  = new  Course("数学");

      ///进行赋值操作,这样一来:
      /// xiaoming:id为“2222”,并有两门课程:语文和英语
      /// honghong:id为“007” ,并有两门课程:英语和数学
      xiaoming.id= "2222";
      honghong.id= "007";
      xiaoming.courseList = new ArrayList<>();
      xiaoming.courseList.add(chinese.clone());
      xiaoming.courseList.add(english.clone());
      honghong.courseList = new ArrayList<>();
      honghong.courseList.add(english.clone());
      honghong.courseList.add(math.clone());

      studentList.add(xiaoming);
      studentList.add(honghong);

      ///下面的过程,就是提取:列表中的列表
      Observable.from(studentList)
              .flatMap(new Func1<Student, Observable<Course>>() {
                  @Override
                  public Observable<Course> call(Student student) {
                      Log.e("学生信息", student.id);
                      return Observable.from(student.courseList);
                  }
              })
              .map(new Func1<Course, String>() {
                  @Override
                  public String call(Course course) {
                      return course.name;
                  }
              })
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.e("course信息",s);
                  }
              });
  }

最终得到结果为:


知识点:

  1. flatMap:
  • 作用:实现传递数据的一对多变换(比如:我想要对一个列表中每一个item都进行一个数据类型转换并输出的操作)
  • 原理:
    • 1)使用传入的事件对象创建一个 Observable 对象
    • 2)并不发送这个 Observable, 而是将它激活,于是它开始发送事件
    • 3)每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法
    • 结果:把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。
  1. Funx 和 Actionx:
  • 'x'的意义:从0开始,表示有x个参数的Fun()和Action()方法。

关于Single


  • Single类似于Observable,可绑定若干Observer并向他们发送响应信息,区别在于:
    1. Single只会发射一个值,或者一个错误通知,而不是发射一系列的值。
    2. 订阅Observable需要onNext()、onComplete()、onError()三个回调方法【在Observer中的】,而订阅Single只需要两个方法onSuccess()、onError()
  • Single会将任务处理最终给到以下两个方式中的一个,之后,终止订阅关系。
    • onSuccess - 允许情况下,Single发射单个的值到这个方法。
    • onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法。
  • 实例:
    Single.create(new Single.OnSubscribe<String>() {
              @Override
              public void call(SingleSubscriber<? super String> singleSubscriber) {
                  singleSubscriber.onSuccess("一次性初始化");
              }
          }).map(new Func1<String, Integer>() {
              @Override
              public Integer call(String s) {
                  return s.length();
              }
          }).subscribe(new Action1<Integer>() {
              @Override
              public void call(Integer integer) {
                  ///onSuccess()要做的操作
              }
          }, new Action1<Throwable>() {
              @Override
              public void call(Throwable throwable) {
                  // onError() 要做的操作
              }
          });
    

RxJava各方法汇总


1. 用于创建Observable的操作符:

  • filter() —输出和输入相同的元素,并且会过滤掉那些不满足检查条件的。
  • take() —输出最多指定数量的结果。
  • Delay() —让发射数据的时机延后一段时间
  • Create — 通过调用观察者的方法从头创建一个Observable
  • Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
  • Empty/Never/Throw — 创建行为受限的特殊Observable
  • From — 将其它的对象或数据结构转换为Observable
  • Interval — 创建一个定时发射整数序列的Observable
  • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
  • Range — 创建发射指定范围的整数序列的Observable
  • Repeat — 创建重复发射特定的数据或数据序列的Observable
  • Start — 创建发射一个函数的返回值的Observable
  • Timer — 创建在一个指定的延迟之后发射单个数据的Observable

2. 用于对Observable发射的数据进行变换的操作符:

  • Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
  • GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
  • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
  • Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
  • Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

3. 线程切换和控制相关操作符:

  • subscribeOn(Scheduler) — 指定事件的call方法以及以前的操作到一个线程中
  • observeOn(Scheduler) — 指定事件的call方法之后的操作(如:map(),onNext(),onCompleted(),onError())到一个线程中【注意:不包括Subscriber.onStart()方法,该方法在默认它所在的线程中执行】
  • 参数Scheduler有:
    • Schedulers.immediate():默认模式。直接使用当前线程运行。
    • Schedulers.newThread():总是启动新线程,并在新线程中运行。
    • Schedulers.io():I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • AndroidSchedulers.mainThread():它指定的操作将在 Android 主线程运行。

4. Single相关方法汇总:

  • compose() -- 创建一个自定义的操作符
  • concat()/concatWith() -- 连接多个Single和Observable发射的数据
  • create() -- 调用观察者的create方法创建一个Single
  • error() -- 返回一个立即给订阅者发射错误通知的Single
  • map()
  • flatMap()
  • flatMapObservable():Observable -- 返回一个Observable,它发射对原Single的数据执行flatMap操作后的结果
  • from():Single -- 将Future转换成Single
  • just(V) -- 返回一个发射一个指定值V的Single
  • merge() -- 将一个Single(它发射的数据是另一个Single,假设为B)转换成另一个Single(它发射来自另一个Single(B)的数据)
  • merge()/mergeWith():Observable -- 合并发射来自多个Single的数据
  • observeOn() -- 指示Single在指定的调度程序上调用订阅者的方法
  • subscribeOn() -- 指示Single在指定的调度程序上执行操作
  • onErrorReturn() -- 将一个发射错误通知的Single转换成一个发射指定数据项的Single
  • timeout() -- 它给原有的Single添加超时控制,如果超时了就发射一个错误通知
  • toSingle() -- 将一个发射单个值的Observable转换为一个Single
  • zip()/zipWith():Single -- 将多个Single转换为一个,后者发射的数据是对前者应用一个函数后的结果

参考文章


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容