【RxJava】- 变换操作符源码分析
【RxJava】- 过滤操作符源码分析
【RxJava】- 结合操作符源码分析
【RxJava】- 连接操作符源码分析
简介
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。简单一点就是创建一个事件,注册一个观察者,当事件发生改变时,及时通知观察者。同时RxJava可以把一序列的异步事件按照一定规则组合成新的事件序列。
RxAndroid里面就几个类,是在RxJava基础上针对Android开发封装了一些使用方法而已。
参考
GitHub:RxJava
GitHub:RxAndroid
ReactiveX:reactivex
中文文档地址:ReactiveX/RxJava文档中文版
版本
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
订阅
- subscribe(@NonNull Observer<? super T> observer)
执行被观察者subscribeActual方法并传入观察者实例。
调度器
下面将大体介绍一下RaJava里面的调度器(Scheduler)。Schedulers类用于返回标准Scheduler实例的静态工厂方法。
Observable
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) { return apply(f, source);}
return source;
}
下面基本都是在变量“f”等于null情况下分析。
Creating Observables
创建操作
create
返回ObservableCreate实例
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
try {
if (!emitter.isDisposed()) {
for (int i = 1; i < 5; i++) {emitter.onNext(i);}
emitter.onComplete();
}
} catch (Exception e) {
emitter.onError(e);
}}.subscribe(new Observer<Integer>() {...}
看一下create方法
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
看一下onAssembly方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onObservableAssembly默认是null,所以create最终返回的是持有被观察者的source(ObservableCreate类型)。
然后调用ObservableCreate对象的subscribe方法,并传入观察者实例,在subscribe方法中继续调用ObservableCreate的subscribeActual方法并传入观察者实例observer。
看一下subscribeActual方法做了什么
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
创建事件发射器CreateEmitter并传入观察者实例observer,调用onSubscribe方法,通知观察者,观察关系已经准备好,并传入事件发射器,以便观察者可以主动选择放弃对事件通知的接受。
在被观察者中,通过调用CreateEmitter中的onNext,onError,onComplete等方法,如果事件观察没有被取消,那么会调用观察者(observer)中对应的方法来通知观察者。
-
发射器
在ObservableCreate有那个发射器,CreateEmitter和SerializedEmitter,一般的发射器和序列发射器。SerializedEmitter维护了一个队列数组,保证有序的调用onNext, onError 和 onComplete,调用onNext时,会把传入的值插入到队列,然后循环从队列获取,然后执行观察者的onNext方法。
至于有序调用,我查看源码后,我的理解是这样的:有序指的是在执行完所以队列里面的事件(即队列全部值都被取出并通知观察者),然后才会执行onError或者onComplete方法,onError和onComplete只有其中一个方法被执行了,另一个不会再被执行。
如果执行了onError和onComplete其中一个,那么剩余队列里面的事件将不会被执行。
当调用onError时异常为ExceptionHelper.TERMINATED时,观察者的onError不会被调用,更多请直接查看源码。
defer
返回ObservableDefer实例
延期创建被观察者对象,直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable。
调用defer传入的实例(Supplier类型)的get()方法,获取被观察者对象(ObservableSource类型)
Observable.defer((Supplier<ObservableSource<Integer>>) () -> observer -> observer.onNext(1)).subscribe(...)
具体的可以看一下ObservableDefer这个类,至于subscribe方法怎么传参数,大致看一下调用逻辑就明白了。
Empty/Never/Throw
Empty
返回ObservableEmpty实例,创建一个不发射任何数据,但是正常终止的Observable,会调用观察者onSubscribe和onComplete方法Never
返回ObservableNever实例,创建一个不发射数据也不终止的Observable,调用观察者onSubscribe方法。-
Throw
用error方法实现,返回ObservableError实例,参数为持有异常实例的JustValue实例。创建一个不发射数据以一个错误终止的Observable,调用观察者onSubscribe和onError方法。
From
数据转换,实现有:fromAction,fromArray,fromCallable,fromCompletable,fromFuture,fromFuture,fromIterable,fromMaybe,fromPublisher,fromRunnable,fromSingle,fromSupplier。对应Observable实现请自己查看源码,里面代码不多,阅读完基本知道怎么用。
Interval
创建一个按固定时间间隔发射整数序列的Observable,返回ObservableInterval实例,同时会默认传入一个ComputationScheduler实例作为调度器。从0开始自加,每发射一次加1。
interval(long initialDelay, long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler)
- initialDelay
延迟发射第一个值,即“0”,后面按period间隔时间正常发射。 - period
两次发射直接的间隔时间 - unit
initialDelay和period时间单位 - scheduler
调度器
如果想在主线程接收事件,需要用observeOn(AndroidSchedulers.mainThread())转换。AndroidSchedulers就是RxAndroid里面的类了。
Just
创建一个发射指定值的Observable,可以传入多个参数,如果传入一个参数返回ObservableJust实例,多个返回ObservableFromArray实例,前者把传入的一个参数原封不动发射后就完成,后者需要把数组中的数据一个一个原封不动发射后完成,前者完成可以看成是后者的一种特殊情况。
Range
创建一个发射特定整数序列的Observable,参数如果是int返回ObservableRange实例,如果是long返回ObservableRangeLong实例。
这个和上面的interval差不多,不同的interval从0开始,如果不取消,就一直发射,而Range只发射自定参数范围的整数,发射完就停止,发射一次自加1。比如range(1,3)将收到1,2,3.
Repeat
创建一个发射特定数据重复多次的Observable,返回ObservableRepeat实例。比如对just操作做重复操作:
Observable.just(1,3).repeat().subscribe(...)
repeat可接收一个参数用作重复次数。
-
repeatUntil
返回ObservableRepeatUntil实例,需要传入BooleanSupplier类型参数。每个操作完成后都会调用ObservableRepeatUntil中的onComplete方法,而在该方法中又会调用BooleanSupplier的getAsBoolean方法来判断是否需要再次执行操作,如果不需要者操作终止。
-
repeatWhen
返回ObservableRepeatUntil实例,需要传入Function类型参数。有条件的重新订阅和发射原来的Observable。比如:Observable.just("A", "B").repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception { return Observable.timer(10000, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<String>() {...}
代码中设置了下次执行的时间是10秒之后,这种方式只能执行两次
Start
返回一个Observable,它发射一个类似于函数声明的值。
Timer
创建一个Observable,它在一个给定的延迟后发射一个特殊的值,返回ObservableTimer实例。
Transforming Observables
转换操作
Buffer
Observable.create(emitter -> {}).buffer(1)
定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
-
ObservableBuffer
buffer(int count),buffer(int count, int skip) buffer(int count, @NonNull Supplier<U> bufferSupplier) buffer(int count, int skip, @NonNull Supplier<U> bufferSupplier)
都返回ObservableBuffer实例,bufferSupplier默认ArrayListSupplier实例。
Observable.create((ObservableOnSubscribe<Integer>) emitter -> { for (int i = 0 ; i < 10 ; i++){ emitter.onNext(i); } }).buffer(5).subscribe(...)
当count == skip时,收集数据策略由BufferExactObserver完成,当count != skip由BufferSkipObserver完成。
BufferExactObserver
当buffer收集的数据到达count时发射一次-
BufferSkipObserver
就有点意思了,BufferSkipObserver中有个队列数组buffers,每采集一次数据时,当if (index++ % skip == 0)时,会创建一个新的数据收集数组,然后放到buffers中。然后遍历buffers中的数据,并将数据放入数据收集数组里面,当前一个数据收集数组收集到的数据个数等于count便会发射一次通知观察者,然后从buffers移除这个数据收集数组,如果buffers还存在数据收集数组,那么接下来的数据便会插到这个数组中。
-
ObservableBufferTimed
buffer(long timespan, long timeskip, @NonNull TimeUnit unit) buffer(long timespan, long timeskip, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) buffer(long timespan, @NonNull TimeUnit unit) buffer(long timespan, @NonNull TimeUnit unit, int count) buffer(long timespan, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, int count)
都返回ObservableBufferTimed实例,bufferSupplier默认ArrayListSupplier实例。
- 当timespan == timeskip && maxSize == Integer.MAX_VALUE,收集策略BufferExactUnboundedObserver完成。
- timespan == timeskip,BufferExactBoundedObserver完成。
- 其它情况BufferSkipBoundedObserver完成。
定期以List的形式发射新的数据,采集数据的时候会把数据放到集合中,达到count停止收集,当间隔时间达到timespan或者timeskip时候,发射一次数据,具体算法可以查看上面收集策略实现类。
ObservableBufferBoundary
不贴方法了,自己看源码。监视这个叫openingIndicator的Observable(它发射bufferOpen对象),每当openingIndicator发射了一个数据时,它就创建一个新的List开始收集原始Observable的数据,并将openingIndicator传递给closingIndicator函数。这个函数返回一个Observable。buffer监视这个Observable,当它检测到一个来自这个Observable的数据时,就关闭List并且发射它自己的数据(之前的那个List)。ObservableBufferExactBoundary
不贴方法了,自己看源码。和ObservableBufferBoundary差不多。
总结
首先需要了解,RxJava被观察者和观察者之间的调度流程,个一定要清楚,这样对分析RxJava的操作符源码很有帮助,否则你将会陷入代码无尽的调用中。
Observable的subscribeActual就是做中转作用,调用到下一个ObservableOnSubscribe的subscribe方法中。