最近开始看Rxjava的源码,感觉有很多没见过的操作记录一下!
private void ObservableMethod() {
//在Rxjava2.0 版本 Observable——>对应 Observer
//这种方式无背压
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
mEmitter = e;
e.onNext(1);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
//用来取消订阅
}
@Override
public void onNext(Integer value) {
Log.d("value", value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
一个最简单的链式调用,无背压。
Observable 是事件的源头,其中有很多操作符。
public abstract class Observable<T> implements ObservableSource<T>{
......
}
Observalbe 实现了ObservableSource<T>
只有一个方法 让指定的观察者订阅这个事件
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
通过上面代码看到 我们调用 Observable.create(new ObservableOnSubscribe<Integer>()).subscribe()方法
我们的Observable是一个抽象类,那么我们create返回的是什么对象用于调用subscribe()呢?
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
我们看到先检测一下是否为null,然后通过ObservableCreate<T>(source)创建的对象,ObservableCreate 类继承于Observable
ObservableCreate 传入一个source 给 ObservableOnSubscribe source,这个source是以后调用的关键。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
.........省略n个方法.......
}
这个onAssembly 方法是于hook操作有关,目前没有用到
这里的f为null 直接return source;
// Calls the associated hook function.
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
终于到调用subscribe 虽然我们用ObservableCreate调用subscribe方法,但是它是一个final方法调用的是父类的方法
1.先检测是否为空,
2.hook操作不需要先略,
3.略
4.订阅
Observable.create(....).subscribe(Observer ..)
public final void subscribe(Observer<? super T> observer) {
//1.检测是否为空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//2.hook 有关操作
observer = RxJavaPlugins.onSubscribe(this, observer);
//3.检测是否为空
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
4.真正的订阅操作
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
这里是订阅执行的真正方法,我们先看下Emitter相关类
public interface Emitter<T> {
//这里都是Observer 里面的方法
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
public interface ObservableEmitter<T> extends Emitter<T> {
//这里是于Disposable有关的方法
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//对Emitter里的方法进行了重写,
通过静态代理将事件传递给了Observer中的方法
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
这里先创建一个发射器,将Observer放进去,2处使得观察者和被观察者关系可以取消
在 source.subscribe(parent) 绑定 我们通过这个发射器通过调用它发送给Observer
@Override
protected void subscribeActual(Observer<? super T> observer) {
1. CreateEmitter<T> parent = new CreateEmitter<T>(observer);
2. observer.onSubscribe(parent);
try {
3. source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}