装饰模式同样是学习Rxjava时绕不开的一个点。
装饰模式:能保证原有类功能完整的情况下提供额外的功能。
上个简单的类图。

熟悉代理模式的同学可能会觉得,这玩意儿难道不是代理模式吗?
从类图看,两者确实很像很像。但目的却大不同。
- 代理模式,主要是为了访问隔离
- 装饰模式,主要是为了功能增强
就我个人来说,两者还有一个比较大区别,那就是——套娃!
装饰模式往往会有类似俄罗斯套娃的情况,比较常见应该是文件IO操作了。
new ObjectInputStream(new BufferedInputStream(new FileInputStream(new File("name"))));
这个套娃看着就很装饰模式,而且每套一层都会在原来的基础上对功能加强一波。
Rxjava的装饰模式
要理解源码,还是得找一个操作符入手。
从map操作符开始
Observable.create(
// 1. ObservableOnSubscribe
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
}
}
)
// map
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s + " map";
}
})
// 2. 订阅
.subscribe(
// 被观察者
new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
和create一样。跟进去瞧瞧先。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
还是熟悉的配方,同样的套路。
RxJavaPlugins依旧不用管,是预留给hook的。
map操作直接返回的是一个ObservableMap对象。把自身,也就是调用map方法的Observable作为参数传了进去,同时传入的还有mapper。
这个例子中this是上篇文章中提到的ObservableCreate。
ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
// ...
}
总体看着和ObservableCreate很像。继承了AbstractObservableWithUpstream。
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
这里要做个阅读理解。
Upstream是事件的上游,同理还有downstream下游。
AbstractObservableWithUpstream会持有上游的引用source。
画个图:

订阅
根据上一篇的经验,看完构造函数之后,就要看subscribeActual方法了,Rxjava实际的订阅操作是在subscribeActual中完成的。
public void subscribeActual(Observer<? super U> t) {
// 本例子中source是ObservableCreate
source.subscribe(new MapObserver<T, U>(t, function));
}
subscribeActual的操作很简单,通过subscribe订阅将上游的source和新观察者MapObserver建立联系。
内部观察者MapObserver
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
// ...
}
继承自BasicFuseableObserver,无二话,撸它!
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
/** The downstream subscriber. */
protected final Observer<? super R> downstream;
/** The upstream subscription. */
protected Disposable upstream;
/** The upstream's QueueDisposable if not null. */
protected QueueDisposable<T> qd;
/** Flag indicating no further onXXX event should be accepted. */
protected boolean done;
/** Holds the established fusion mode of the upstream. */
protected int sourceMode;
/**
* Construct a BasicFuseableObserver by wrapping the given subscriber.
* @param downstream the subscriber, not null (not verified)
*/
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
// final: fixed protocol steps to support fuseable and non-fuseable upstream
@SuppressWarnings("unchecked")
@Override
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
// ...
// 这里默认实现了 onError onComplete之类方法
}
BasicFuseableObserver的作用是接管下游,对其进行增强,也就是之前提到的装饰模式了。
protected final Observer<? super R> downstream;
downstream变量就是之前在subscribeActual中传入的观察者Observer,也就是下游。
通时,通过onSubscribe方法,BasicFuseableObserver也持有了上游。不过,这里要看仔细,他持有的upstream是Disposable类型,而不是Observable类型。其作用是用来取消对上游的事件订阅的。
ObservableMap对原Observer的增强
在他的onNext中
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
// 前面都判断,真实的增强在这里 mapper.apply(t)
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 往下游流转
downstream.onNext(v);
}
通过mapper的apply方法,将t转换为v ,再调用downstream.onNext方法,将v流转到下游
上图:

内部Observer对原Observer的包装是明显的装饰模式,这种设计的自由度很高,事件从下往下流转的过程中,可以随意的插入或删除处理的节点。
下游Observable的内部Observer可以针对各种实际需求对上游传入Observer的进行增强。
小结
Rxjava中,下游的Observer持有上游的Observer,并对其进行增强。