1.简单使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("我是RxJava");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Rxjava被订阅");
}
@Override
public void onNext(String s) {
System.out.println("Observer接收消息: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("Observer出现错误: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("Rxjava 完成");
}
});
}
create为创建操作符,创建过程分析:Observable#create
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}·
调用了Observable的静态方法create(),返回一个被观察者对象
ObjectHelper.requireNonNull(source, "source is null");是判空操作
可以看到我们new出来的匿名内部类,又被ObservableCreate包装来了,看下ObservableCreate的代码:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
}
在Observable中我们new的被观察者赋值给了
ObservableOnSubscribe<T> source;
退出来,继续看建造过程,RxJavaPlugins#onAssembly:
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
其中变量是默认为空的,可以查看onObservableAssembly在哪里被赋值
Snipaste_2020-06-01_21-51-50.png
从return的注释也可以看出,这样子写是为给我们hook用的,所以在onAssembly默认操作中并没有改变什么
现在我们就可以画出创建过程的时序图了:

Rxjava创建时序图.png