RX常用操作符PartA

转载玉刚说公众号,备份一下~
被观察者(Observable)
观察者(Observer)
订阅(subscribe)

首先在 gradle 文件中添加依赖:

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

基本格式
Observable.create(new ObservableOnSubscribe < Integer > () {
    @Override
    public void subscribe(ObservableEmitter < Integer > e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
})
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
});

事件种类 作用
onNext() 发送该事件时,观察者会回调 onNext() 方法
onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送

  1. 创建操作符

1.1 create()

方法预览:public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
有什么用:创建一个被观察者
怎么用:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello Observer");
        e.onComplete();
    }
});

上面的代码非常简单,创建 ObservableOnSubscribe 并重写其 subscribe 方法,就可以通过 ObservableEmitter 发射器向观察者发送事件。
以下创建一个观察者,来验证这个被观察者是否成功创建。
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(String s) {
        Log.d("chan","=============onNext " + s);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
        Log.d("chan","=============onComplete ");
    }
};
observable.subscribe(observer);
打印结果:05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
=============onComplete

1.2 just()

方法预览:public static <T> Observable<T> just(T item) ......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
有什么用?创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。
怎么用?
Observable.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});
上面的代码直接使用链式调用,代码也非常简单,这里就不细说了,看看打印结果:
05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 

1.3 From 操作符

1.3.1 fromArray()

方法预览:public static <T> Observable<T> fromArray(T... items)
有什么用?这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

怎么用?
Integer array[] = {1, 2, 3, 4};
Observable.fromArray(array)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});
代码和 just() 基本上一样,直接看打印结果:
05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
=================onNext 1
=================onNext 2
=================onNext 3
=================onNext 4
=================onComplete 

1.3.2 fromCallable()

方法预览:public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
有什么用?这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

怎么用?

Observable.fromCallable(new Callable < Integer > () {
    @Override
    public Integer call() throws Exception {
        return 1;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "================accept " + integer);
    }
});
打印结果:

05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1

1.3.3 fromFuture()

方法预览:

public static <T> Observable<T> fromFuture(Future<? extends T> future)

有什么用?

参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

怎么用?

FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
    @Override
    public String call() throws Exception {
        Log.d(TAG, "CallableDemo is Running");
        return "返回结果";
    }
});
Observable.fromFuture(futureTask)
    .doOnSubscribe(new Consumer < Disposable > () {
    @Override
    public void accept(Disposable disposable) throws Exception {
        futureTask.run();
    }
})
.subscribe(new Consumer < String > () {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "================accept " + s);
    }
});
doOnSubscribe() 的作用就是只有订阅时才会发送事件,具体会在下面讲解。

打印结果:

05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
================accept 返回结果

1.3.4 fromIterable()

方法预览:

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
有什么用?

直接发送一个 List 集合数据给观察者

怎么用?

List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
Observable.fromIterable(list)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=================onSubscribe");
    }
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "=================onNext " + integer);
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=================onError ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "=================onComplete ");
    }
});
打印结果如下:

05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
=================onNext 0
=================onNext 1
=================onNext 2
=================onNext 3
=================onComplete 

1.4 defer()

方法预览:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
有什么用?

这个方法的作用就是直到被观察者被订阅后才会创建被观察者。

怎么用?

// i 要定义为成员变量
Integer i = 100;
Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
    @Override
    public ObservableSource<? extends Integer> call() throws Exception {
        return Observable.just(i);
    }
});
i = 200;
Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "================onNext " + integer);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
};
observable.subscribe(observer);
i = 300;
observable.subscribe(observer);
打印结果如下:

05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
================onNext 300
因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。

1.5 timer()

方法预览:

public static Observable<Long> timer(long delay, TimeUnit unit) 
......
有什么用?

当到指定时间后就会发送一个 0L 的值给观察者。

怎么用?

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "===============onNext " + aLong);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
打印结果:

05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0

1.6 interval()

方法预览:

public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
......
有什么用?

每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

怎么用?

Observable.interval(4, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }
    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
打印结果:

05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5
从时间就可以看出每隔4秒就会发出一次数字递增1的事件。这里说下 interval() 第三个方法的 initialDelay 参数,这个参数的意思就是 onSubscribe 回调之后,再次回调 onNext 的间隔时间。

1.7 intervalRange()

方法预览:

public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
有什么用?

可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。

怎么用?

Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
.subscribe(new Observer < Long > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }
    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
打印结果:

05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 6
可以看出收到5次 onNext 事件,并且是从 2 开始的。

1.8 range()

方法预览:

public static Observable<Integer> range(final int start, final int count)
有什么用?

同时发送一定范围的事件序列。

怎么用?

Observable.range(2, 5)
.subscribe(new Observer < Integer > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==============onSubscribe ");
    }
    @Override
    public void onNext(Integer aLong) {
        Log.d(TAG, "==============onNext " + aLong);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
打印结果:

05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe 
==============onNext 2
==============onNext 3
==============onNext 4
==============onNext 5
==============onNext 6

1.9 rangeLong()

方法预览:

public static Observable<Long> rangeLong(long start, long count)
有什么用?

作用与 range() 一样,只是数据类型为 Long

怎么用?

用法与 range() 一样,这里就不再赘述了。

1.10 empty() & never() & error()

方法预览:

public static <T> Observable<T> empty()
public static <T> Observable<T> never()
public static <T> Observable<T> error(final Throwable exception)
有什么用?

empty() : 直接发送 onComplete() 事件
never():不发送任何事件
error():发送 onError() 事件
怎么用?

Observable.empty()
.subscribe(new Observer < Object > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "==================onSubscribe");
    }
    @Override
    public void onNext(Object o) {
        Log.d(TAG, "==================onNext");
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "==================onError " + e);
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "==================onComplete");
    }
});
打印结果:

05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
==================onComplete
换成 never() 的打印结果:

05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe
换成 error() 的打印结果:

05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
==================onError java.lang.NullPointerException
2. 转换操作符

2.1 map()

方法预览:public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
有什么用?map 可以将被观察者发送的数据类型转变成其他的类型

怎么用?

以下代码将 Integer 类型的数据转换成 String。

Observable.just(1, 2, 3)
.map(new Function < Integer, String > () {
    @Override
    public String apply(Integer integer) throws Exception {
        return "I'm " + integer;
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.e(TAG, "===================onSubscribe");
    }
    @Override
    public void onNext(String s) {
        Log.e(TAG, "===================onNext " + s);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
打印结果:

05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe
===================onNext I'm 1
===================onNext I'm 2
===================onNext I'm 3

2.2 flatMap()

方法预览:public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
......
有什么用?这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。
怎么用?
flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个例子来说明 flatMap() 的用法。

假设一个有一个 Person 类,这个类的定义如下:

public class Person {
    private String name;
    private List<Plan> planList = new ArrayList<>();
    public Person(String name, List<Plan> planList) {
        this.name = name;
        this.planList = planList;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public List<Plan> getPlanList() {
        return planList;
    }
    public void setPlanList(List<Plan> planList) {
        this.planList = planList;
    }
}
Person 类有一个 name 和 planList 两个变量,分别代表的是人名和计划清单。

Plan 类的定义如下:

public class Plan {
    private String time;
    private String content;
    private List<String> actionList = new ArrayList<>();
    public Plan(String time, String content) {
        this.time = time;
        this.content = content;
    }
    public String getTime() {
        return time;
    }
    public void setTime(String time) {
        this.time = time;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }
    public List<String> getActionList() {
        return actionList;
    }
    public void setActionList(List<String> actionList) {
        this.actionList = actionList;
    }
}
现在有一个需求就是要将 Person 集合中的每个元素中的 Plan 的 action 打印出来。
首先用 map() 来实现这个需求看看:

Observable.fromIterable(personList)
.map(new Function < Person, List < Plan >> () {
    @Override
    public List < Plan > apply(Person person) throws Exception {
        return person.getPlanList();
    }
})
.subscribe(new Observer < List < Plan >> () {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(List < Plan > plans) {
        for (Plan plan: plans) {
            List < String > planActionList = plan.getActionList();
            for (String action: planActionList) {
                Log.d(TAG, "==================action " + action);
            }
        }
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
可以看到 onNext() 用了嵌套 for 循环来实现,如果代码逻辑复杂起来的话,可能需要多重循环才可以实现。

现在看下使用 flatMap() 实现:

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        return Observable.fromIterable(person.getPlanList());
    }
})
.flatMap(new Function < Plan, ObservableSource < String >> () {
    @Override
    public ObservableSource < String > apply(Plan plan) throws Exception {
        return Observable.fromIterable(plan.getActionList());
    }
})
.subscribe(new Observer < String > () {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(String s) {
        Log.d(TAG, "==================action: " + s);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
从代码可以看出,只需要两个 flatMap() 就可以完成需求,并且代码逻辑非常清晰。

2.3 concatMap()

方法预览:

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
有什么用?

concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

怎么用?

还是使用上面 flatMap() 的例子来讲解,首先来试下 flatMap() 来验证发送的事件是否是无序的,代码如下:

Observable.fromIterable(personList)
.flatMap(new Function < Person, ObservableSource < Plan >> () {
    @Override
    public ObservableSource < Plan > apply(Person person) {
        if ("chan".equals(person.getName())) {
            return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.MILLISECONDS);
        }
        return Observable.fromIterable(person.getPlanList());
    }
})
.subscribe(new Observer < Plan > () {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Plan plan) {
        Log.d(TAG, "==================plan " + plan.getContent());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
为了更好的验证 flatMap 是无序的,使用了一个 delay() 方法来延迟,直接看打印结果:

05-21 13:57:14.031 21616-21616/com.example.rxjavademo D/chan: ==================plan chan 上课
==================plan chan 写作业
==================plan chan 打篮球
05-21 13:57:14.041 21616-21641/com.example.rxjavademo D/chan: ==================plan Zede 开会
==================plan Zede 写代码
==================plan Zede 写文章
可以看到本来 Zede 的事件发送顺序是排在 chan 事件之前,但是经过延迟后, 这两个事件序列发送顺序互换了。

现在来验证下 concatMap() 是否是有序的,使用上面同样的代码,只是把 flatMap() 换成 concatMap(),打印结果如下:

05-21 13:58:42.917 21799-21823/com.example.rxjavademo D/chan: ==================plan Zede 开会
==================plan Zede 写代码
==================plan Zede 写文章
==================plan chan 上课
==================plan chan 写作业
==================plan chan 打篮球
这就代表 concatMap() 转换后发送的事件序列是有序的了。

2.4 buffer()

方法预览:

public final Observable<List<T>> buffer(int count, int skip)
......
有什么用?

从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

怎么用?

buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。这样说可能还是有点抽象,直接看代码:

Observable.just(1, 2, 3, 4, 5)
.buffer(2, 1)
.subscribe(new Observer < List < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(List < Integer > integers) {
        Log.d(TAG, "================缓冲区大小: " + integers.size());
        for (Integer i: integers) {
            Log.d(TAG, "================元素: " + i);
        }
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
打印结果:

05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================缓冲区大小: 2
================元素: 1
================元素: 2
================缓冲区大小: 2
================元素: 2
================元素: 3
================缓冲区大小: 2
================元素: 3
================元素: 4
================缓冲区大小: 2
================元素: 4
================元素: 5
================缓冲区大小: 1
================元素: 5
从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。

2.5 groupBy()

方法预览:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
有什么用?

将发送的数据进行分组,每个分组都会返回一个被观察者。

怎么用?

Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
.groupBy(new Function < Integer, Integer > () {
    @Override
    public Integer apply(Integer integer) throws Exception {
        return integer % 3;
    }
})
.subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "====================onSubscribe ");
    }
    @Override
    public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
        Log.d(TAG, "====================onNext ");
        integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "====================GroupedObservable onSubscribe ");
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "====================GroupedObservable onNext  groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "====================GroupedObservable onError ");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "====================GroupedObservable onComplete ");
            }
        });
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "====================onError ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "====================onComplete ");
    }
});
在 groupBy() 方法返回的参数是分组的名字,每返回一个值,那就代表会创建一个组,以上的代码就是将1~10的数据分成3组,来看看打印结果:

05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe 
05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
====================GroupedObservable onSubscribe     ====================GroupedObservable onNext  groupName: 2 value: 5
====================GroupedObservable onNext  groupName: 2 value: 2
====================onNext 
====================GroupedObservable onSubscribe 
====================GroupedObservable onNext  groupName: 0 value: 3
05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
====================GroupedObservable onSubscribe 
====================GroupedObservable onNext  groupName: 1 value: 4
====================GroupedObservable onNext  groupName: 1 value: 1
====================GroupedObservable onNext  groupName: 0 value: 6
====================GroupedObservable onNext  groupName: 2 value: 8
====================GroupedObservable onNext  groupName: 0 value: 9
====================GroupedObservable onNext  groupName: 1 value: 7
====================GroupedObservable onNext  groupName: 1 value: 10
05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete 
====================GroupedObservable onComplete 
====================GroupedObservable onComplete 
====================onComplete 
可以看到返回的结果中是有3个组的。

2.6 scan()

方法预览:

public final Observable<T> scan(BiFunction<T, T, T> accumulator)
有什么用?

将数据以一定的逻辑聚合起来。

怎么用?

Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction < Integer, Integer, Integer > () {
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception {
        Log.d(TAG, "====================apply ");
        Log.d(TAG, "====================integer " + integer);
        Log.d(TAG, "====================integer2 " + integer2);
        return integer + integer2;
    }
})
.subscribe(new Consumer < Integer > () {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "====================accept " + integer);
    }
});
打印结果:

05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
====================apply 
====================integer 1
====================integer2 2
====================accept 3
====================apply 
05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
====================integer2 3
====================accept 6
====================apply 
====================integer 6
====================integer2 4
====================accept 10
====================apply 
====================integer 10
====================integer2 5
====================accept 15

2.7 window()

方法预览:

public final Observable<Observable<T>> window(long count)
......
有什么用?

发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。

怎么用?

Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Observer < Observable < Integer >> () {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "=====================onSubscribe ");
    }
    @Override
    public void onNext(Observable < Integer > integerObservable) {
        integerObservable.subscribe(new Observer < Integer > () {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "=====================integerObservable onSubscribe ");
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "=====================integerObservable onNext " + integer);
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "=====================integerObservable onError ");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "=====================integerObservable onComplete ");
            }
        });
    }
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "=====================onError ");
    }
    @Override
    public void onComplete() {
        Log.d(TAG, "=====================onComplete ");
    }
});
打印结果:

05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe 
05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe 
05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1
=====================integerObservable onNext 2
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 3
=====================integerObservable onNext 4
=====================integerObservable onComplete 
=====================integerObservable onSubscribe 
=====================integerObservable onNext 5
=====================integerObservable onComplete 
=====================onComplete 
从结果可以发现,window() 将 1~5 的事件分成了3组。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容

  • 简单举例场景如下 参加某英语学习群 每天群里贴出一句名言 大家跟读打卡 实际情况很多小伙伴发音并不好 却依然在坚持...
    seasnake阅读 792评论 2 2
  • 全局配置:git config --global user.name "yourname"git config -...
    孤逐王阅读 252评论 0 5
  • 口清魂不浊, 心净可驱魔。 本是阿罗汉, 何须修正佛。
    海王星1984阅读 710评论 2 1
  • 有个曾经关系颇好的同学结婚了, 他和女友相识相恋十余年, 2018年2月23日终于领证了。 从朋友圈和空间的动态看...
    舒邪阅读 182评论 0 1