最简单的使用方法是这样的
// 被观察者发送事件 观察者响应事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
这边分为两个步骤
1.create
2.subscribe
create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// 判空操作
ObjectHelper.requireNonNull(source, "source is null");
// 继续包装
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
// 看下onAssembly方法 实际返回的就是ObservableCreate类对象
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
由上面可以知道subscribe
是ObservableOnSubscribe
类的方法,在ObservableOnSubscribe
类中找不到,很容易在Observable
中找到subscribe
的方法。
public final void subscribe(Observer<? super T> observer) {
// 判空操作
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 返回observer对象
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribeActual(observer);
是这段代码的关键 调用的是ObservableCreate
的subscribeActual
方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 实现ObservableEmitter<T>把这个对象传递到subscribe 通过这个对象发送事件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用我们代码里写的onSubscribe方法
observer.onSubscribe(parent);
try {
// 回到我们代码里写的subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
到这里最简单的调用过程就结束了