上一章讲了Rxjava2的基本用法,这一章就来说说Rxjava2的结构设计和操作符原理。
一、基本类的简介
1)、Disposable
public interface Disposable {
void dispose();
boolean isDisposed();
}
Disposable是控制类,是用于控制事件流取消的接口,接口中有两个功能,一个是dispose()取消事件流的发送,isDisposed()中是判断事件流是否取消了。
将控制事件流的单独拿出来成为一个接口,就像关闭流的Closeable接口,就是最小知识原则。
2)、Observer
public interface Observer<T> {
// 订阅的时候将Disposable对象传递给用户,用做控制事件流
void onSubscribe(Disposable d);
// 该事件的处理完成
void onNext(T value);
// 错误
void onError(Throwable e);
// 事件流完成
void onComplete();
}
Observer是观察者模式中观察者概念。Observer提供了四种回调,方便用户获取相应的信息。
其中onError和onComplete事件一旦发送了,剩下的没发送的事件就不在发送了。
3)、Emitter
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
Emitter类是发射器类,功能是为了发送最原始事件流。可以用Emitter对象发送一系列的事件。最后Observer对象中就会收到对应的事件。
4)、ObservableEmitter
public interface ObservableEmitter<T> extends Emitter<T> {
/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param d the disposable, null is allowed
*/
void setDisposable(Disposable d);
/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(Cancellable c);
/**
* Returns true if the downstream disposed the sequence.
* @return true if the downstream disposed the sequence
*/
boolean isDisposed();
/**
* Ensures that calls to onNext, onError and onComplete are properly serialized.
* @return the serialized ObservableEmitter
*/
ObservableEmitter<T> serialize();
}
从结构可以看出ObservableEmitter类是继承自Emitter,它是Observable无背压版本的发射器,所以复写了Emitter。当订阅了Observer的时候,就会传给用户一个ObservableEmitter类型的对象,可以使用该对象发送一系列的原始事件,作为事件的发起点。
操作ObservableEmitter对象,实际上是个静态代理模式。从结构上Emitter和Observer里面的方法是一致的。表面上操作的是Emitter对象,实际上操作的是包装好的Observer对象。
见ObservableCreate类代码:
// Observable.create传入的对象
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// 核心代码
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 将Observer包装成为一个Emitter对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用Observer中的onSubscribe方法,Observe中的onSubscribe方法被执行
observer.onSubscribe(parent);
try {
// 将包装好的Emitter对象,传递给用户,用户用该对象发送系列事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
从源码中可以见,一共三个步骤。
- 1、将Observer包装成为ObservableEmitter对象parent
- 2、调用Observer中的onSubscribe方法,Observe中的onSubscribe方法被执行
- 3、执行ObservableOnSubscribe中的subscribe方法,并将parent传递给用户。通过Emitter代理调用Observer方法。
5)、ObservableOnSubscribe
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter<T> e) throws Exception;
}
解释:
- 具有subscribe方法的函数接口,该方法接收ObservableEmitter实例,该实例允许以取消安全的方式推送事件。
当调用了ObservableOnSubscribe#subscribe中的时候,就代表事件已经开始发射了。ObservableEmitter实例就是操作事件流的最开始地方。
6)、ObservableSource
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(Observer<? super T> observer);
}
ObservableSource#subscribe为观察者模式中的订阅模块,让观察者和被观察者产生联系。
7)、Observable
精简结构:
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
/**
* Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and
* emits the results of these function applications.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the output type
* @param mapper
* a function to apply to each item emitted by the ObservableSource
* @return an Observable that emits the items from the source ObservableSource, transformed by the specified
* function
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic.
* <p>There is no need to call any of the plugin hooks on the current Observable instance or
* the Subscriber.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
}
Observable类为观察者模式中的被观察者。也是Rxjava2无背压版本的主要核心类。
一般操作三个步骤:
- 创建一个Observable对象作为操作原始类,如Create方式
- 通过链式模式补充Observable事件流的功能,如map操作符,也就是包装类,将原有的Observable加以新的功能,并返回一个新的Observable继续使用。
- 注册订阅事件,执行subscribe方法,触发事件的发射。
Observable类中主要功能就是订阅Observer对象,同时也实现了很多默认的操作符供使用者使用,也可以自定义操作符。具体的逻辑后面再说。
二、操作符变换原理
先看下下面的例子。先创建一个Observable对象,在采用map和filter操作符处理该事件,最后Observer将收到合法的事件
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
})
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return true;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Rxjava的操作符原理都是一样,先用map阐明下具体的原理。map操作符就是映射的意思,将一个对象变成另外一种对象。
①、调用map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
对于使用者就是调用Observable中的map方法,达到包装转化的效果。
RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper))就是将传入的ObservableMap对象原封不动的传入。
#######Function接口
map操作符是装换的功能,那么就必须有个转化的方法,所以Function就是转换的接口。如下就是将T类型的数据t转变成R类型的数。
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(T t) throws Exception;
}
②、生成ObservableMap对象
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
ObservableMap是继承自Observable,将原来的Observable保存,这个是关键点,只有保存了上一个的Observable,才能形成一个链式。
Observable中的subscribe方法中本质就是调用subscribeActual(Observer<? super U> t)方法。subscribeActual是个核心方法,它完成了两个功能:
- 包装传入的Observer对象
- 用上一个的Observable调用subscribe(Observer),这样就有调用上一个Observable的subscribe()方法,从而实现了链式
MapObserver类实现了Observer和Disposable类。实现事件的映射,这里重点看下onNext方法,其他的回调基本就是原封不动的回调。
将onNext方法提取出来,单独看:
U v = mapper.apply(t);
actual.onNext(v);
先用复写的Function对象完成事件的转换成U类型的,再调用上一个Observer的onNext方法。
三、总结
前面介绍了map操作符,map操作符Observable类,是保存上一个的Observable对象,并用上一个的Observable对象去subscribe执行下一个Observable传入的Observer。对于其他的操作符也是如此。
通过一层层包装好的Observer交给最开始的Observable即ObservableCreate。而ObservableCreate的subscribe方法就是ObservableOnSubscribe对象,方法中的Emitter就是代理包装好的Observer对象。