RxJava2.0 操作符(6)—— Utility 辅助操作符

这个页面展示的操作符可用于组合多个 Observables。

Delay — 延时发射 Observable 的结果。
DelaySubscription — 延时处理订阅请求。
DoOnEach — 注册一个动作,对 Observable 发射的每个数据项使用。
DoOnComplete — 注册一个动作,对正常完成的 Observable 使用。
DoOnError — 注册一个动作,对发生错误的 Observable 使用。
DoOnTerminate — 注册一个动作,对完成的 Observable 使用,无论是否发生错误。
DoOnSubscribe — 注册一个动作,在观察者订阅时使用。
DoOnUnsubscribe — 注册一个动作,在观察者取消订阅时使用。
Dematerialize — 将上面的结果逆转回一个 Observable
ObserveOn — 指定观察者观察 Observable 的调度器
Materialize — 将 Observable 转换成一个通知列表
Serialize — 强制一个 Observable 连续调用并保证行为正确
Subscribe — 操作来自 Observable 的发射物和通知。
SubscribeOn — 指定 Observable 执行任务的调度器。
TimeInterval — 定期发射数据。
Timeout - 对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。
Timestamp — 给 Observable 发射的每个数据项添加一个时间戳。

6.1 Delay

延迟一段指定的时间再发射来自 Observable 的请求。

Delay
Delay

RxJava 的实现是 delay 和 delaySubscription。不同之处在于 Delay 是延时数据的发射,而 DelaySubscription 是延时注册 Subscriber。

6.1.1 Delay

delay
delay

示例代码:

final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delay(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
        if (integer == 1) {
            Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
        }
        Log.e(TAG, "accept:" + integer);
    }
});

输出结果:

delay Time :2408
accept:1
accept:2

6.1.2 delaySubscription

delaySubscription
delaySubscription

示例代码:

final long currentTimeMillis = System.currentTimeMillis();
Observable.range(1, 2).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer aLong) throws Exception {
        if (aLong == 1) {
            Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
        }
        Log.e(TAG, "accept:" + aLong);
    }
});

输出结果:

delay Time :2500
accept:1
accept:2

6.2 Do

注册一个动作作为原始 Observable 生命周期事件的一种占位符。

Do
Do

Do 操作符就是给 Observable 的生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段的时候,这些回调就会被触发。
在 Rxjava2.0 中实现了很多的 do 操作符的变体。

6.2.1 doAfterNext

实现方法:doAfterNext(Consumer)
从上流向下流发射后被调用。

示例代码:

public static void demo_doAfterNext(){
    Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    ob1.doAfterNext(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG,"doAfterNext="+integer);
        }
    }).subscribe(getNormalObserver());
}


public static Disposable mDisposable ;
//可重复使用
public static Observer<Integer> getNormalObserver(){
    return new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            mDisposable = d;
        }

        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG,"normal,onNext:"+integer);
        }

        @Override
        public void onError(@NonNull Throwable error) {
            Log.e(TAG,"normal,Error: " + error.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG,"normal,onComplete");
        }
    };
}

输出结果:

normal,onNext:1
doAfterNext : 1
normal,onNext:2
doAfterNext : 2
normal,onNext:3
doAfterNext : 3
normal,onComplete

6.2.2 doAfterTerminate

doAfterTerminate
doAfterTerminate

实现方法: doAfterTerminate(Action)

注册一个 Action,当 Observable 调用 onComplete 或 onError 触发。

示例代码:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
//                emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doAfterTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doAfterTerminate run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
normal,onComplete
doAfterTerminate run

6.2.3 doFinally

实现方法: doFinally(Action onDispose)

当 Observable 调用 onError 或 onCompleted 之后调用指定的操作,或由下游处理。
doFinally 优先于 doAfterTerminate 的调用。

示例代码:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
//      emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doFinally(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doFinally run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
normal,onComplete
doFinally run

6.2.4 doOnDispose

doOnDispose
doOnDispose

实现方法:doOnDispose(Action onDispose)

当 Observable 取消订阅时,它就会被调用。

示例代码:

Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {

        emitter.onNext(1);
        //mDisposable 参考6.2.1
        if (mDisposable != null) {
            mDisposable.dispose();
        }
        emitter.onNext(2);
        emitter.onComplete();
//                emitter.onError(new Throwable("nothingerro"));
    }
});
ob1.doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doOnDispose run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
doOnDispose run

6.2.5 doOnComplete

doOnComplete
doOnComplete

当它产生的 Observable 正常终止调用 onCompleted 时会被调用。
Javadoc: doOnCompleted(Action)

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothingerror"));
        emitter.onComplete();

    }
});
ob1.doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG, "doOnComplete run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
doOnComplete run
normal,onComplete

6.2.6 doOnEach

doOnEach
doOnEach

doOnEach 操作符让你可以注册一个回调,它产生的 Observable 每发射一项数据就会调用它一次。不仅包括 onNext 还包括 onError 和 onCompleted。

示例代码:

 Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothingerror"));
        emitter.onComplete();

    }
});
ob1.doOnEach(new Observer<Integer>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull Integer integer) {
        Log.e(TAG, "doOnEach,onNext:" + integer);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.e(TAG, "doOnEach,onError:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "doOnEach,onComplete");
    }
}).subscribe(getNormalObserver());

输出结果:

doOnEach,onNext:1
normal,onNext:1
doOnEach,onNext:2
normal,onNext:2
doOnEach,onComplete
normal,onComplete

6.2.7 doOnError

doOnError 操作符注册一个动作,当它产生的 Observable 异常终止调用 onError 时会被调用。

doOnError
doOnError

实现方法 doOnError(Consumer<? super Throwable>);

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();

    }
});
ob1.doOnError(new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        Log.e(TAG,"doOnError : "+throwable.getMessage());
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
doOnError : nothing error
normal,Error: nothing error

6.2.8 doOnLifecycle

调用相应的 onXXX 方法(在所有 Observer 之间共享),用于序列的生命周期事件(订阅,取消,请求)。


doOnLifecycle
doOnLifecycle

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//                emitter.onError(new Throwable("nothing error"));
        if (mDisposable != null) {
            mDisposable.dispose();
        }
        emitter.onComplete();

    }
});
ob1.doOnLifecycle(new Consumer<Disposable>() {
    @Override
    public void accept(@NonNull Disposable disposable) throws Exception {
        Log.e(TAG, "doOnLifecycle ,disposable:" + disposable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG, "doOnLifecycle ,run");
    }
}).subscribe(getNormalObserver());

输出结果:

doOnLifecycle ,disposable:null
normal,onNext:1
normal,onNext:2
doOnLifecycle ,run

6.2.9 doOnNext

doOnNext操作符类似于 doOnEach(Consumer)。

doOnNext
doOnNext

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();
    }
});
ob1.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
    Log.e(TAG, "doOnNext ,onNext:"+integer);
}
}).subscribe(getNormalObserver());

输出结果:

doOnNext ,onNext:1
normal,onNext:1
doOnNext ,onNext:2
normal,onNext:2
normal,onComplete

6.2.10 doOnSubscribe

doOnSubscribe,当观察者订阅它生成的 Observable 它就会被调用。

doOnSubscribe
doOnSubscribe

实践:在 Observable 发射前做一些初始化操作(比如开始加载数据时显示载入中界面)。

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();

    }
});
ob1.doOnSubscribe(new Consumer<Disposable>() {
    @Override
    public void accept(@NonNull Disposable disposable) throws Exception {
        Log.e(TAG, "doOnSubscribe,disposable:" + disposable);
    }
}).subscribe(getNormalObserver());

输出结果:

doOnSubscribe,disposable:null
normal,onNext:1
normal,onNext:2
normal,onComplete

6.2.11 doOnTerminate

doOnTerminate 操作符注册一个动作,当它产生的 Observable 终止之前会被调用,无论是正常还是异常终止。

doOnTerminate
doOnTerminate

实现方法:doOnTerminate(Action)
实践:不管消息流最终以 onError() / onComplete() 结束,都会被调用(类似 Java 的 finally ),对于某些需要 onError() / onComplete() 后都要执行的操作(如网络加载成功/失败都要隐藏载入中界面),可以放在这里。

注意:取消订阅时,不会调用 doOnTerminate 方法。

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("nothing error"));
        emitter.onComplete();

    }
});
ob1.doOnTerminate(new Action() {
    @Override
    public void run() throws Exception {
        Log.e(TAG,"doOnTerminate,run");
    }
}).subscribe(getNormalObserver());

输出结果:

normal,onNext:1
normal,onNext:2
doOnTerminate,run
normal,onComplete

6.2.12 onTerminateDetach

当执行了反注册 unsubscribes 或者发送数据序列中断了,解除上游生产者对下游接受者的引用。
实践:onTerminateDetach 会使 Observable 调用 UnSubscriber 时,对 Subscriber 的引用会被释放,从而避免造成内存泄漏。

6.3 Meterialize / Dematerialize

6.3.1 Materialize

Materialize 将数据项和事件通知都当做数据项发射,Dematerialize 刚好相反。


Meterialize
Meterialize

一个合法的有限的 Obversable 将调用它的观察者的 onNext 方法零次或多次,然后调用观察者的 onCompleted 或 onError 仅一次。Materialize 操作符将这一系列调用,包括原来的 onNext 通知和终止通知 onCompleted 或 onError 都转换为一个 Observable 发射的数据序列。
通俗一点的说法:Meterialize 操作符将 OnNext / OnError / OnComplet e都转化为一个 Notification 对象并按照原来的顺序发射出来。

示例代码:

Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
//      emitter.onError(new Throwable("love world"));
        emitter.onComplete();
    }
});

ob1.materialize().subscribe(new Consumer<Notification<Integer>>() {
    @Override
    public void accept(@NonNull Notification<Integer> in) throws Exception {
        if (in.isOnNext()) {
            Log.e(TAG, "materialize,onNext: " + in.isOnNext());
            return;
        }
        if (in.isOnError()) {
            Log.e(TAG, "materialize,onError: "+in.getError().getMessage());
            return;
        }
        if (in.isOnComplete()) {
            Log.e(TAG, "materialize,OnComplete");
            return;
        }
    }
});

输出结果:

materialize,onNext: true
materialize,onNext: true
materialize,OnComplete

6.3.2 Dematerialize

而 Dematerialize 执行相反的过程。


Dematerialize
Dematerialize

示例代码:

Observable<Notification<Integer>> ob1 = Observable.create(new ObservableOnSubscribe<Notification<Integer>>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Notification<Integer>> e) throws Exception {
        e.onNext(Notification.createOnNext(1));
        e.onNext(Notification.<Integer>createOnError(new Throwable("My error!")));
        e.onNext(Notification.<Integer>createOnComplete());
        
    }
});
ob1.dematerialize().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {

    }

    @Override
    public void onNext(@NonNull Object o) {
        Log.e(TAG, "onNext:" + o.toString());
    }

    @Override
    public void onError(@NonNull Throwable e) {
        Log.e(TAG, "onError:" + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.e(TAG, "onComplete");
    }
});

输出结果:

onNext:1
onComplete

6.4 ObserveOn / SubscribeOn

指定一个观察者在哪个调度器(线程)上观察这个 Observable。


ObserveOn
ObserveOn

SubscribeOn
SubscribeOn

ObserverOn 用来指定观察者所运行的线程,也就是发射出的数据在那个线程上使用。
在 Android 中,如果经常会遇见这样场景,我们需要从网络中读取数据,之后修改 UI 界面,观察者就必须在主线程上运行,就如同 AsyncTask 的 onPostExecute。

.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程  
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程  

注意:当遇到一个异常时 ObserveOn 会立即向前传递这个 onError 终止通知,它不会等待慢速消费的 Observable 接受任何之前它已经收到但还没有发射的数据项。这可能意味着 onError 通知会跳到(并吞掉)原始 Observable 发射的数据项前面,正如下图所示的。


ObserveOn
ObserveOn

示例代码:

/**
 Schedulers.io() 代表 io 操作的线程, 通常用于网络,读写文件等 io 密集型的操作
 Schedulers.computation() 代表 CPU 计算密集型的操作, 例如需要大量计算的操作
 Schedulers.newThread() 代表一个常规的新线程
 */
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
        Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
        Log.e(TAG, "onNext:" +integer);
    }
});

输出结果:

subscribeOn:RxCachedThreadScheduler-1
observerOn:RxNewThreadScheduler-1
onNext:1

6.4.1 unsubscribeOn

修改原 Observable,以便订阅者将其配置在指定的调度器(线程)上。

示例代码:

//将线程从 computation 换到 io 中
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
    
            Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
            emitter.onNext(1);
        }
    }).subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "doOnNext,observerOn:" + Thread.currentThread().getName());
            Log.e(TAG, "doOnNext,onNext:" + integer);
        }
    })
    .unsubscribeOn(Schedulers.computation())
    .subscribeOn(Schedulers.io())
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
            Log.e(TAG, "onNext:" + integer);
        }
    });

输出结果:

subscribeOn:RxNewThreadScheduler-1
doOnNext,observerOn:RxComputationThreadPool-1
doOnNext,onNext:1
observerOn:RxComputationThreadPool-1
onNext:1

6.5 Serialize

强制一个 Observable 连续调用并保证行为正确。

Serialize
Serialize

一个 Observable 可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让 Observable 行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。使用 serialize 操作符,你可以纠正这个 Observable 的行为,保证它的行为是正确的且是同步的。

6.6 TimeInterval

将一个发射数据的 Observable 转换为发射那些数据发射时间间隔的 Observable。

TimeInterval
TimeInterval

TimeInterval 操作符拦截原始 Observable 发射的数据项,替换为两个连续发射物之间流逝的时间长度。 也就是说这个使用这个操作符后发射的不再是原始数据,而是原始数据发射的时间间隔。新的 Observable 的第一个发射物表示的是在观察者订阅原始 Observable 到原始 Observable 发射它的第一项数据之间流逝的时间长度。 不存在与原始 Observable 发射最后一项数据和发射 onCompleted 通知之间时长对应的发射物。timeInterval 默认在 immediate 调度器上执行,你可以通过传参数修改。

示例代码:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .take(3)
    .timeInterval()
    .subscribe(new Consumer<Timed<Long>>() {
        @Override
        public void accept(@NonNull Timed<Long> t) throws Exception {
            Log.e(TAG, "onNext: " + t.value() + " , time = " + t.time());
        }
    });

输出结果:

onNext: 0 , time = 104
onNext: 1 , time = 113
onNext: 2 , time = 100

6.7 Timeout

对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。


Timeout
Timeout

Timeout 操作符给 Observable 加上超时时间,每发射一个数据后就重置计时器,当超过预定的时间还没有发射下一个数据,就抛出一个超时的异常。
RxJava2.0 中的实现的 Timeout 操作符有好几个变体:

  • timeout(long,TimeUnit): 第一个变体接受一个时长参数,每当原始 Observable 发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始 Observable 没有发射另一项数据,timeout 就抛出 TimeoutException,以一个错误通知终止 Observable。 这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。
  • timeout(long,TimeUnit,Observable): 这个版本的 timeout 在超时时会切换到使用一个你指定的备用的 Observable,而不是发错误通知。它也默认在 computation 调度器上执行。
  • timeout(Function):这个版本的 timeout 使用一个函数针对原始 Observable 的每一项返回一个 Observable,如果当这个 Observable 终止时原始 Observable 还没有发射另一项数据,就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止 Observable。
  • timeout(Function,Observable): 这个版本的 timeout 同时指定超时时长和备用的 Observable。它默认在immediate调度器上执行

示例代码1:

/**
 * 在 150 毫秒间隔内如果没有发射数据。发送一个 TimeoutException 通知终止。
 * */
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 5; i++) {
                Thread.sleep(i * 100);
                emitter.onNext(i);
            }
            emitter.onComplete();
        }
    })
    .timeout(150, TimeUnit.MILLISECONDS)
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            
        }
    
        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG, "onNext:" + integer);
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            Log.e(TAG, "onError:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete");
        }
    });

输出结果:

onNext:0
onNext:1
onError:null

示例代码 2:

 /**
 * 只接收 200 毫秒间隔内发送的数据,如果超时则切换到 Observable.just(100, 200)
 * */
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 5; i++) {
                Thread.sleep(i * 100);
                emitter.onNext(i);
            }
            emitter.onComplete();
        }
    })
    .timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200))
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "accept:" + integer);
        }
    });

输出结果:

accept:0
accept:1
accept:100
accept:200

6.8 Timestamp

给 Observable 发射的数据项附加一个时间戳。


Timestamp
Timestamp

它将一个发射 T 类型数据的 Observable 转换为一个发射类型为 Timestamped 的数据的 Observable,每一项都包含数据的发射时间。也就是把 Observable 发射的数据重新包装了一下,将数据发射的时间打包一起发射出去,这样观察者不仅能得到数据,还能得到数据的发射时间。 timestamp 默认在 immediate 调度器上执行,但是可以通过参数指定其它的调度器。

示例代码:

Observable.range(1, 3)
    .timestamp()
    .subscribe(new Consumer<Timed<Integer>>() {
        @Override
        public void accept(@NonNull Timed<Integer> t) throws Exception {
            Log.e(TAG, "accept ,onNext:" + t.value() + ",time = " + t.time());
        }
    });

输出结果:

accept ,onNext:1,time = 1494606809418
accept ,onNext:2,time = 1494606809420
accept ,onNext:3,time = 1494606809420

6.9 Using

创建一个只在 Observable 生命周期内存在的一次性资源.


Using
Using

当一个观察者订阅 using 返回的 Observable 时,using 将会使用 Observable 工厂函数创建观察者要观察 Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个 Observable 时,或者当观察者终止时(无论是正常终止还是因错误而终止),using 使用第三个函数释放它创建的资源。

using 操作符接受三个参数:

  • 一个用户创建一次性资源的工厂函数
  • 一个用于创建 Observable 的工厂函数
  • 一个用于释放资源的函数

示例代码:

Observable.using(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return new Random().nextInt(10);
        }
    }, new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
            return Observable.just("hello+" + integer, "world+" + integer);
        }
    }, new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "using,accept - >" + integer);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.e(TAG, "subscribe,accept -> " + s);
        }
    });

输出结果:

subscribe,accept -> hello+8
subscribe,accept -> world+8
using,accept - >8

6.10 To

将 Observable 转换为另一个对象或数据结构。


To
To

ReactiveX 的很多语言特定实现都有一种操作符让你可以将 Observable 或者 Observable 发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到 Observable 终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的 Observable。

在某些 ReactiveX 实现中,还有一个操作符用于将 Observable 转换成阻塞式的。一个阻塞式的 Ogbservable 在普通的 Observable 的基础上增加了几个方法,用于操作 Observable 发射的数据项。

RxJava2.x 中实现了多种 To 操作符:

6.10.1 To

示例代码:


输出结果:


6.10.2 toFuture

返回表示该 Observable 发出的单个值的 Future。
如果 Observable 发出多个项目,Future 将会收到一个 IllegalArgumentException。 如果 Observable 为空,Future 将收到一个 NoSuchElementException。

如果 Observable 可能会发出多个项目,请使用Observable.toList() 、toBlocking() 、toFuture()。


toFuture
toFuture
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,851评论 0 10
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,186评论 2 8
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,816评论 0 1
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,633评论 8 93
  • RxJava正在Android开发者中变的越来越流行。唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编...
    刘启敏阅读 1,849评论 1 7