版本:RxJava1.3.0
主要分析subscribe()
实现方式和原理。(过程在会涉及到操作符等,例如map...
) 都知道Rx敲代码很爽快,主要归功于它可以将一个业务可以链式的实现(丰富的操作符)和方便线程调度。
这里主要分析链式的实现
关键点: Observable OnSubscribe subscribe subscriber(Observer)
一句话串联,即:观察者(subscriber|observer)订阅(subscribe)消息(被观察者 Observable OnSubscribe )
示例:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("goods");
subscriber.onCompleted();
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
但是我们常用的一般是Action1
并不上面提到的subscriber
,原来是已经封装成ActionSubscriber
:
public final Subscription subscribe(final Action1<? super T> onNext) {
...
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
继续往下找subscribe
的最终实现:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
...
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
...
}
return Subscriptions.unsubscribed();
}
}
关键点 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
(这里可以将代码简化为 observable.onSubscribe.call(subscriber)
)这行对以上所谓的 观察、订阅进行了串联。即你在create(OnSubscribe)
中进行的onNext
操作的对象就是Action1
(即ActionSubscriber)。
有了以上基础,下面再来加入map
操作符(这里才是重中之重)