重学Rxjava之装饰模式

装饰模式同样是学习Rxjava时绕不开的一个点。
装饰模式:能保证原有类功能完整的情况下提供额外的功能。
上个简单的类图。


装饰模式.png

熟悉代理模式的同学可能会觉得,这玩意儿难道不是代理模式吗?
从类图看,两者确实很像很像。但目的却大不同。

  • 代理模式,主要是为了访问隔离
  • 装饰模式,主要是为了功能增强

就我个人来说,两者还有一个比较大区别,那就是——套娃!
装饰模式往往会有类似俄罗斯套娃的情况,比较常见应该是文件IO操作了。

new ObjectInputStream(new BufferedInputStream(new FileInputStream(new File("name"))));

这个套娃看着就很装饰模式,而且每套一层都会在原来的基础上对功能加强一波。

Rxjava的装饰模式

要理解源码,还是得找一个操作符入手。

从map操作符开始

Observable.create(
        // 1. ObservableOnSubscribe
        new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {

            }
        }
        )
        // map
        .map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                return s + " map";
            }
        })
        // 2. 订阅
        .subscribe(
                // 被观察者
                new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String s) {

                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

和create一样。跟进去瞧瞧先。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

还是熟悉的配方,同样的套路。
RxJavaPlugins依旧不用管,是预留给hook的。
map操作直接返回的是一个ObservableMap对象。把自身,也就是调用map方法的Observable作为参数传了进去,同时传入的还有mapper。
这个例子中this是上篇文章中提到的ObservableCreate。

ObservableMap

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    // ...
}

总体看着和ObservableCreate很像。继承了AbstractObservableWithUpstream。

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

这里要做个阅读理解。
Upstream是事件的上游,同理还有downstream下游。
AbstractObservableWithUpstream会持有上游的引用source
画个图:

Rxjava随便画画1.png

订阅

根据上一篇的经验,看完构造函数之后,就要看subscribeActual方法了,Rxjava实际的订阅操作是在subscribeActual中完成的。

public void subscribeActual(Observer<? super U> t) {
    // 本例子中source是ObservableCreate
    source.subscribe(new MapObserver<T, U>(t, function));
}

subscribeActual的操作很简单,通过subscribe订阅将上游的source和新观察者MapObserver建立联系。

内部观察者MapObserver

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;

    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            downstream.onNext(null);
            return;
        }

        U v;

        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        downstream.onNext(v);
    }

    // ...
}

继承自BasicFuseableObserver,无二话,撸它!

public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {

    /** The downstream subscriber. */
    protected final Observer<? super R> downstream;

    /** The upstream subscription. */
    protected Disposable upstream;

    /** The upstream's QueueDisposable if not null. */
    protected QueueDisposable<T> qd;

    /** Flag indicating no further onXXX event should be accepted. */
    protected boolean done;

    /** Holds the established fusion mode of the upstream. */
    protected int sourceMode;

    /**
     * Construct a BasicFuseableObserver by wrapping the given subscriber.
     * @param downstream the subscriber, not null (not verified)
     */
    public BasicFuseableObserver(Observer<? super R> downstream) {
        this.downstream = downstream;
    }

    // final: fixed protocol steps to support fuseable and non-fuseable upstream
    @SuppressWarnings("unchecked")
    @Override
    public final void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {

            this.upstream = d;
            if (d instanceof QueueDisposable) {
                this.qd = (QueueDisposable<T>)d;
            }

            if (beforeDownstream()) {

                downstream.onSubscribe(this);

                afterDownstream();
            }

        }
    }
    // ... 
    // 这里默认实现了 onError onComplete之类方法
}

BasicFuseableObserver的作用是接管下游,对其进行增强,也就是之前提到的装饰模式了。

protected final Observer<? super R> downstream;

downstream变量就是之前在subscribeActual中传入的观察者Observer,也就是下游。
通时,通过onSubscribe方法,BasicFuseableObserver也持有了上游。不过,这里要看仔细,他持有的upstream是Disposable类型,而不是Observable类型。其作用是用来取消对上游的事件订阅的。

ObservableMap对原Observer的增强

在他的onNext中

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }

    U v;

    try {
        // 前面都判断,真实的增强在这里 mapper.apply(t)
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    // 往下游流转
    downstream.onNext(v);
}

通过mapper的apply方法,将t转换为v ,再调用downstream.onNext方法,将v流转到下游
上图:

Rxjava随便画画2.png

内部Observer对原Observer的包装是明显的装饰模式,这种设计的自由度很高,事件从下往下流转的过程中,可以随意的插入或删除处理的节点。
下游Observable的内部Observer可以针对各种实际需求对上游传入Observer的进行增强。

小结

Rxjava中,下游的Observer持有上游的Observer,并对其进行增强。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容