一.Single&SingleObserver
Single/SingleObserver观察者模式可以当做Observable&Observer的扩展版本。
Single的创建,SingleObserver的创建,以及订阅方法都雷同。区别只在于SingleObserver的创建。SingleObserver需要实现的方法没有onNext(Object o)
onComplete(),只有了onSuccess(Object o),onSuccess(Object o)像是两者的结合体。
例子:
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
e.onSuccess("----Single 11");//发送数据,并结束事件流
e.onSuccess("----Single 111");
e.onSuccess("----Single 1111");
}
}).subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
print("Single1 onSubscribe" );
}
@Override
public void onSuccess(@NonNull String s) {
print("Single1 onSuccess" +s);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
Single.just(10)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
print("Single2 just onSuccess"+integer);
}
});
输出结果:
Single1 onSubscribe
Single1 onSuccess----Single 11
Single2 just onSuccess10
二.Completable&CompletableObserver
Completable&CompletableObserver模式和Single/SingleObserver的不同也同样是在CompletableObserver的回调方法中,CompletableObserver需要实现的方法没有onNext(Object o),只有onComplete(), Completable没有数据的发射,只告知观察者事件流的完成。
例子:
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(@NonNull CompletableEmitter e) throws Exception {
print("Completable0 subscribe");
e.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
print("Completable0 onSubscribe");
}
@Override
public void onComplete() {
print("Completable0 onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
print("Completable0 onError");
}
});
Completable.unsafeCreate(new CompletableSource() {
@Override
public void subscribe(@NonNull CompletableObserver cs) {
print("Completable1 subscribe");
cs.onComplete();
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
print("Completable1 onSubscribe");
}
@Override
public void onComplete() {
print("Completable1 onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
print("Completable1 onError");
}
});
Completable.complete().subscribe(new Action() {
@Override
public void run() throws Exception {
print("Completable2 complete");
}
});
Completable.timer(1, TimeUnit.SECONDS).subscribe(new Action() {
@Override
public void run() throws Exception {
print("Completable3 timer");
}
});
输出结果:
Completable0 onSubscribe
Completable0 subscribe
Completable0 onComplete
Completable1 subscribe
Completable1 onComplete
Completable2 complete
Completable3 timer
三. Maybe&MaybeObserver
Maybe&MaybeObserver模式跟Single/SingleObserver模式,Completable&CompletableObserver模式的不同,也同样是在CompletableObserver的回调方法中,MaybeObserver需要实现的方法没有onNext(Object o), 有onComplete(), onSuccess(Object o)方法,onComplete(), onSuccess(Object o)两个分支只会走一个。
例子:
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
print("Maybe0 subscribe" );
e.onComplete();
e.onSuccess("maybe one");
e.onSuccess("maybe two");
e.onSuccess("maybe three");
}
}).subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
print("Maybe0 onSubscribe" );
}
@Override
public void onSuccess(@NonNull String s) {
print("Maybe0 onSuccess" +s);
}
@Override
public void onError(@NonNull Throwable e) {
print("Maybe0 onError" );
}
@Override
public void onComplete() {
print("Maybe0 onComplete" );
}
});
Maybe.just(true).subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
print("Maybe1 just onSubscribe" );
}
@Override
public void onSuccess(@NonNull Boolean aBoolean) {
print("Maybe1 just onSuccess"+aBoolean );
}
@Override
public void onError(@NonNull Throwable e) {
print("Maybe1 just onError" );
}
@Override
public void onComplete() {
print("Maybe1 just onComplete" );
}
});
输出结果:
Maybe0 onSubscribe
Maybe0 subscribe
Maybe0 onComplete
Maybe1 just onSubscribe
Maybe1 just onSuccesstrue