一、Single.create
1.1 singleTest
public void singleTest() {
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> emitter) {
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onSuccess(String s) {}
@Override
public void onError(Throwable e) {}
});
}
1.2 Single.create
public static <T> Single<T> create(SingleOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleCreate<T>(source));
}
Single.create返回SingleCreate对象, 该对象持有SingleOnSubscribe的引用, 继续向下查看SingleCreate.subscribe的流程.
1.2 SingleCreate.subscribe
public final void subscribe(SingleObserver<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
触发SingleCreate.subscribeActual的执行.
1.3 SingleCreate.subscribeActual
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
Emitter<T> parent = new Emitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
Single与Observable的区别就在subscribeActual具体实现中的Emitter实现类的不同.
二、Observable.create
同样的方式打开observableTest的流程
public void observableTest() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
}
ObservableCreate.subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
从上面代码看出, Single.create与Observable.create的区别在于Emitter上面.
三、SingleCreate.Emitter与ObservableCreate.CreateEmitter
static final class Single.Emitter<T> extends AtomicReference<Disposable> implements SingleEmitter<T>, Disposable {
final SingleObserver<? super T> downstream;
Emitter(SingleObserver<? super T> downstream) {
this.downstream = downstream;
}
@Override
public void onSuccess(T value) {
if (get() != DisposableHelper.DISPOSED) {
Disposable d = getAndSet(DisposableHelper.DISPOSED);
if (d != DisposableHelper.DISPOSED) {
try {
if (value == null) {
downstream.onError(new NullPointerException("onSuccess called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
downstream.onSuccess(value);
}
} finally {
if (d != null) {
d.dispose();
}
}
}
}
}
}
static final class ObservableCreate.CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
}
onSuccess与onNext在执行时, 都会首先通过isDispose()判断下游是否中断了与上游的连接.
不同点:
SingleCreate.Emitter在执行完onSuccess()之后会执行dispose()来中断上下游之间的连接, 表明上游发射器只发送一次消息, 下游也只接收一次消息. 而Observable.CreateEmitter在调用onNext()之后, 并没有调用dispose(), 而是当调用onComplete()之后才会执行dispose()操作来中断上下游, 所以也就是说如果是Observable.create方式, 则上游可以连续发送多次消息, 同理下游也可以多次接收消息.