static void map() {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer + 1);
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("map-accept:" + s);
}
});
}
从create -> map ->subscribe
create()方法调用后返回ObservableCreate对象,在ObservableCreate对象上调用map()方法
在map()方法会创建ObservableMap对象
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
ObservableMap构造函数里,ObservableCreate作为参数source保存在ObservableMap中,map()方法调用后返回ObservableMap对象,后面调用subscribe()方法。
在Observable类中,调用subscribe()方法时,内部会调用subscribeActual()方法
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
放调用subscribe()方法时,需要查看subscribeActual()方法实现,或者子类subscribeActual()方法实现即可。
从subscribe->map ->create
调用subscribe(),传入观察者,该subscribe()方法是ObservableMap对象的方法,之前所说,调用subscribe()方法,看ObservableMap对象在subscribeActual()上的实现
public void subscribeActual(Observer<? super U> t) {
/**
* Observer订阅时,调用subscribe 接着调用subscribeActual,Observer保存在MapObserver中,
* 通过source.subscribe()将订阅继续向上传递,当上游发送数据时,传递到MapObserver,
* MapObserver在调用MyObserver,完成数据传递
*/
source.subscribe(new MapObserver<T, U>(t, function));
}
subscribeActual调用source.subscribe()方法,原始观察者包装在MapObserver类中,订阅事件继续上传。从ObservableMap构造方法可知,source为ObservableCreate对象,所以 source.subscribe()即为ObservableCreate.subscribe(),具体可以查看查看ObservableCreate类中subscribeActual()方法实现。
ObservableCreate类的方法
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
observer在这里为MapObserver。
subscribeActual内部调用source.subscribe(),这里source为原始数据。
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}
emitter调用onNext()发送数据,即通过CreateEmitter.onNext()发送,onNext调用observer.onNext(t);
这里observer为MapObserver对象,数据传递到MapObserver.onNext()方法。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
//执行函数
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
数据传递到onNext ,调用mapper.apply(t),调用在map()操作符中实现的函数,原始数据经过map操作后返回新的数据,调用actual.onNext(v),数据继续往下走,actual对象为我们最终订阅者,第一个数据传递完毕。