代码示例
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
LogUtils.loge("subscriber call ...");
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("test1");
subscriber.onCompleted();
}
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
LogUtils.loge("Observer onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
LogUtils.loge("Observer onNext s = " + s);
}
};
Subscription subscription = observable.subscribe(observer);
常用类说明
被观察者
rx.Observable
订阅
rx.Subscription
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
观察者
rx.Observer
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
执行流程
rx.Observable#create(rx.Observable.OnSubscribe<T>)
public static <T> Observable<T> create(OnSubscribe<T> f) {
// 加载RxJavaHooks的static,初始化资源
// 返回一个Observable对象
return new Observable<T>(RxJavaHooks.onCreate(f));
}
rx.plugins.RxJavaHooks#onCreate(rx.Observable.OnSubscribe<T>)
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
// 这里其实是调用到了onObservableCreate的call方法
return f.call(onSubscribe);
}
return onSubscribe;
}
rx.plugins.RxJavaHooks
static {
init();
}
static void init() {
onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
// 调用开始订阅的方法
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};
onObservableReturn = new Func1<Subscription, Subscription>() {
@Override
public Subscription call(Subscription f) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);
}
};
initCreate();
}
static void initCreate() {
onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable.OnSubscribe f) {
/*
这里1. 初始化RxJavaObservableExecutionHook
2. 返回我们传入的Observable.OnSubscribe
*/
return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
}
};
}
rx.plugins.RxJavaPlugins#getObservableExecutionHook
public RxJavaObservableExecutionHook getObservableExecutionHook() {
if (observableExecutionHook.get() == null) {
// 从系统配置文件中查找一个RxJavaObservableExecutionHook的实现类
Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
// impl = null
if (impl == null) {
// 没有找到就使用这个默认的RxJavaObservableExecutionHookDefault实现类
observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
}
}
return observableExecutionHook.get();
}
rx.plugins.RxJavaObservableExecutionHookDefault
rx.Observable#subscribe(rx.Observer<? super T>)
public final Subscription subscribe(final Observer<? super T> observer) {
// 使用ObserverSubscriber对observer进行包装
return subscribe(new ObserverSubscriber<T>(observer));
}
rx.Observable#subscribe(rx.Subscriber<? super T>)
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
rx.Observable#subscribe(rx.Subscriber<? super T>, rx.Observable<T>)
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
// 保证线程安全
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
// 调用先RxJavaHooks的onObservableStart的call方法,然后再调用我们在activity中定义的onSubscribe的call方法
// 这里其实就是调用了开始订阅
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
}
rx.plugins.RxJavaHooks#onObservableStart
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
rx.plugins.RxJavaHooks#onObservableReturn
public static Subscription onObservableReturn(Subscription subscription) {
Func1<Subscription, Subscription> f = onObservableReturn;
if (f != null) {
return f.call(subscription);
}
return subscription;
}
ObserverSubscriber类
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;
public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
// 调用observer方法
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
// 调用observer方法
observer.onCompleted();
}
}
源码阅读总结
Subscription关联观察者和订阅者
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
try {
// 调用先RxJavaHooks的onObservableStart的call方法,然后再调用我们在activity中定义的onSubscribe的call方法
// 这里其实就是调用了开始订阅
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
}
}
observable.onSubscribe执行
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
课堂总结
Observable
- 观察得到的-被观察者
- 通过Observable创建一个可观察的序列(create方法)
- 通过subscribe去注册一个观察者
Observer
- 用于接收数据-观察者
- 作为Observable的subsceibe方法的参数
Subscription
- 订阅,用于描述被观察者和观察者之间的关系
- 用于取消订阅和获取当前订阅状态
OnSubscribe
- 当订阅时会触发此接口的调用
- 在Observable内部,实际作用是向订阅者发射数据
Subscribe
- 实现了Observer和Subscription
- 只有自己才能阻止自己