上一篇 map干了什么
我们来继续上一个话题,我们知道了参数会分别发送自己,然后呢? 他们怎么去合并? 按照zip的最后一个参数中规定的那样。现在让我们来开始今天的话题。
<pre>
zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
}
})
</pre>
在上一篇中我们分析到了 第一个参数会执行这个subscribeActual方法。
当调用observer.onSubscribe(parent);
<pre>
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
</pre>
它就会执行我们我们新建的 e.onNext(Utils.getUserListWhoLovesCricket()); 这两个方法。 e.onComplete();
<pre>
private Observable<List<User>> getCricketFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
});
}
</pre>
一步一步向下进行 你们肯定知道会执行
ObservableCreate下的方法
<pre>
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
</pre>
好了 到了这里 没有什么难度。在向下的时候 就要仔细了。 observer.onNext(t); 这个observer是谁的对象?它执行的onnext是谁的方法?你们可以思考一下。
好了 我来揭露谜底 是ObservableZip下的onNext方法。
为什么? 不知道大家还有没有印象
<pre>
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
ZipObserver<T, R>[] s = observers;
int len = s.length;
for (int i = 0; i < len; i++) {
s[i] = new ZipObserver<T, R>(this, bufferSize);
}
// this makes sure the contents of the observers array is visible
this.lazySet(0);
actual.onSubscribe(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
sources[i].subscribe(s[i]);
}
}
</pre>
sources[i].subscribe(s[i]);这个方法,它的参数s是 ZipObserver<T, R>[] s,这下明白了吧。
接下来就会执行ZipObserver的
<pre>
public void onNext(T t) {
queue.offer(t);
parent.drain();
}
</pre>
马上就要到达最后一步了,drain()方法。这个方法很长。我就不贴了, 只把最重要的贴上来
v = ObjectHelper.requireNonNull(zipper.apply(os.clone())
zipper就是我们Function里面的参数。os就是你们发射的值。在这里执行就是们zip回调的那个接口。对发射的两个数据怎么去操作就是在Function中去设置了。
好了。zipper大家虽然都知道怎么用。 这里我就是想告诉大家这个基本的一个原理。
从这里面可以得到以下几点哦
我这里创建的是两个Observable,
<pre>
private Observable<List<User>> getCricketFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
});
}
private Observable<List<User>> getFootballFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesFootball());
e.onComplete();
}
}
});
}
</pre>
在这两个对象中我们都可以对数据进行操作。并且最后合并的时候 是以最少的那个数据为主。相信大家看了之后就会明白。