ObservableA:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
});
ObservableB:
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "This is " + integer;
}
})
Subscriber_One:
new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println(s);
}
}
ObservableB.subscribe(Subscriber_One)
分析:
ObservableB:
call
this.source = source; // ObservableA
this.transformer = transformer;
call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
//MapSubscriber的构造函数
//this.actual = actual; Subscriber_One 对象
// this.mapper = mapper; 就是真是的map方法
o.add(parent);
source.unsafeSubscribe(parent);// 调用ObservableA的call
}
ObservableA:
call
call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
看MapSubscriber的onNext方法:
onNext(T t) {
R result;
try {
result = mapper.call(t); //调用 mapper的call: "This is " + integer
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result); //actual Subscriber_One
}