Observable.just(1, 2, 3, 4).skip(1).subscribe(System.out::println);
输出2,3,4
skip操作符的含义就是跳过指定数量的事件,忽略前面的事件,看看源码里是怎么写的吧
public final Observable<T> skip(long count) {
if (count <= 0) {
return RxJavaPlugins.onAssembly(this);
}
return RxJavaPlugins.onAssembly(new ObservableSkip<T>(this, count));
}
追踪到ObservableSkip类里看看
public final class ObservableSkip<T> extends AbstractObservableWithUpstream<T, T> {
final long n;
public ObservableSkip(ObservableSource<T> source, long n) {
super(source);
this.n = n;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new SkipObserver<T>(observer, n));
}
...
当我们调用subscribe
时,就会触发subscribeActual
函数的调用
这里的source
就是我们代码里Observable.just(1, 2, 3, 4)
返回的ObservableFromArray
对象,看的出,实际上就是在中间又加了一个观察者(SkipObserver),当ObservableFromArray
对象调用subscribe
时,就会触发SkipObserver
对象的onSubscribe
方法,在这个方法里this.upstream
就是FromArrayDisposable
对象,先别管这个是什么,主要看事件流是怎么发送到观察者的。
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
好了,开始发送事件onNext,是谁在发送事件?,其实就是一直都是观察者在发送,因为这里有个SkipObserver
,事件先由它发出来,经过onNext
,
属性remaining
表示还需要逃过多少事件,一直到remaining
为0,事件就会交给下游downstream
处理。
@Override
public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
downstream.onNext(t);
}
}
基于源码2.2.4版本