泛型在RxJava中的应用

1.什么是泛型?

    泛型(generic)是一种类型参数。

    所谓的泛型就是指将类型作为形参传递给类、接口或者方法,也就是说允许在定义类、接口、方法时使用类型作为形参,在使用时再指定具体类型,所有使用该泛型参数的地方都被统一化,保证类型一致。

2.为什么要使用泛型?

     泛型可以增强编译时错误检测,减少因类型问题引发的运行时异常。也就说编译时必须确定类型,如果类型不匹配,直接报错,这样就避免了运行时类型错误引发运行异常比如崩溃;

    泛型具有更强的类型检查;

    泛型可以避免类型转换。泛型通过使用时传入确定的类型,从而明确的取得具体的类型;

    泛型可以泛型算法,增加代码复用性;

3.泛型使用方式

    泛型有三种使用方式,分别为:泛型类、泛型接口、泛型方法。

    下面直接引用RxJava中的类来具体描述:

3.1 泛型接口

格式:public interface<T,R,E,...>

public interface Observer <@NonNull T>{

void onNext(@NonNull T t);

void onError(@NonNull Throwable e);

void onComplete();

void onSubscribe(@NonNull Disposable d);

}

public interface ObservableSource<@NonNull T> {

/**

    * Subscribes the given {@link Observer} to this {@link ObservableSource} instance.

    * @param observer the {@code Observer}, not {@code null}

    * @throws NullPointerException if {@code observer} is {@code null}

*/

    void subscribe(@NonNull Observer observer);

}

ObservableSource是被观察者,Observer  是RxJava的观察者接口用于获取最终结果

3.2 泛型类

public abstract class Observable<@NonNull T>implements ObservableSource<T> {

protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

……

@Override

public final void subscribe(@NonNull Observer<? super T> observer) {

Objects.requireNonNull(observer,"observer is null");

try {

observer =RxJavaPlugins.onSubscribe(this, observer);

Objects.requireNonNull(observer,"The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

subscribeActual(observer);

}catch (NullPointerException e) {// NOPMD

            throw e;

}catch (Throwable e) {

//Exceptions.throwIfFatal(e);

// can't call onError because no way to know if a Disposable has been set or not

// can't call onSubscribe because the call might have set a Subscription already

            RxJavaPlugins.onError(e);

NullPointerException npe =new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

public static Observable create(@NonNull ObservableOnSubscribe<T>  source) {

Objects.requireNonNull(source,"source is null");

return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));

}

@NonNull

    public final Observable flatMap(@NonNull Function<? super T , ? extends  ObservableSource<? extends R>> mapper) {

return flatMap(mapper,false);

}

……

}


public final class ObservableCreate<T> extends Observable<T> {

    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {

        this.source = source;

    }

    @Override

    protected void subscribeActual(Observer<? super T> observer) {

        CreateEmitter<T> parent = new CreateEmitter<>(observer);

        observer.onSubscribe(parent);

        try {

            source.subscribe(parent);

        } catch (Throwable ex) {

            Exceptions.throwIfFatal(ex);

            parent.onError(ex);

        }

    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {

            this.observer = observer;

        }

        ……

    }

}

Observable 是所有被观察者的基类,ObservableCreate 是调用Observable .create(*)所返回的实际被观察对象,也是使用Observable .create方式的第一个对象

3.3 泛型方法

final public class RxJavaPlugins {

public static Observable<T> onAssembly(@NonNull Observable<T> source) {

Functionf =onObservableAssembly;

if (f !=null) {

return apply(f, source);

}

return source;

}

}

4. 通配符

        在通用代码中,称为通配符的问号( ? )表示未知类型。通配符可以在多种情况下使用:作为参数,字段或局部变量的类型;有时作为返回类型(尽管更具体的做法是更好的编程习惯)。通配符从不用作泛型方法调用,泛型类实例创建或超类型的类型参数。

        声明上限通配符,请使用通配符( ? ),后跟 extends 关键字,然后是其上限。

       声明无界通配符类型使用通配符( ? )来指定,例如 List 。这就是所谓的未知类型的列表。有两种情况下,无界通配符是一种有用的方法。如果你正在编写一个可以使用 Object 类中提供的功能实现的方法。当代码使用通用类中不依赖于类型参数的方法时。例如, List.size 或 List.clear 。事实上,Class 之所以这么经常使用,是因为 Class 中的大部分方法都不依赖于T。

        声明下限通配符使用通配符( ? )表示,后跟 super 关键字,后跟下限

        RxJava代码中多次出现这里不再重复举例,

5.实例分析

        RxJava通过响应式编程方式使用,通常描述为U型结构,这里讲一下函数运行的过程和为什么是U型。具体实例如下

Observable.create(new ObservableOnSubscribe() {

@Override

        public void subscribe(@NonNull ObservableEmitter emitter)throws Throwable {

emitter.onNext("第一次调用");

emitter.onComplete();

}

}).flatMap(new Function>() {

@Override

        public ObservableSourceapply(String s)throws Throwable {

return Observable.create(new ObservableOnSubscribe() {

@Override

                public void subscribe(@NonNull ObservableEmitter emitter)throws Throwable {

emitter.onNext(new String("第二次调用"));

}

});

}

}).subscribe(new Observer() {

@Override

        public void onComplete() {

}

@Override

        public void onError(@NonNull Throwable e) {

}

@Override

        public void onNext(@NonNull Object o) {

}

@Override

        public void onSubscribe(@NonNull Disposable d) {

}

});

}

下面根据代码书写流程进行讲解:

第一步:Observable.create()创建了第一个观察对象(OB1),类型是ObservableCreate,

它里面包含一个ObservableOnSubscribe  source;对象,这个对象是实际订阅观察对象(source1)

不执行具体函数

第二步:flatMap,返回第二个观察对象(OB2),类型是ObservableFlatMap,里边包括一个Function<? super T, ? extends ObservableSource<? extends U>> mapper;和一个ObservableSource source;对象 。这里的source 其实是OB1 ,还有一个source2存放在mapper里

第三步:obseavable.subscribe,也就是OB2.subscribe(observer),实际调用

            subscribeActual(observer);

这个函数是obseavable父类实现的,也就是ObservableFlatMap

@Override

public void subscribeActual(Observer t) {

if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t,mapper)) {

return;

}

source.subscribe(new MergeObserver<>(t,mapper,delayErrors,maxConcurrency,bufferSize));

}

上面第二步讲了这个source是OB1,因此相当于OB1调用source

第四步:OB1.subscribe其实就是obseavable.subscribe,然后 subscribeActual(observer);

@Override

protected void subscribeActual(Observer observer) {

CreateEmitter parent =new CreateEmitter<>(observer);

observer.onSubscribe(parent);

try {

source.subscribe(parent);

}catch (Throwable ex) {

parent.onError(ex);

}

}

第五步调用:observer.onSubscribe(parent);也就是我们实际的Observer

new Observer() {

@Override

    public void onSubscribe(@NonNull Disposable d) {

}

}

第六步:source.subscribe(parent);也就是OBJ1 的source1 开始执行

new ObservableOnSubscribe() {

@Override

    public void subscribe(@NonNull ObservableEmitter emitter)throws Throwable {

emitter.onNext("第一次调用");

emitter.onComplete();

}

}

第七步:OBJ2中的source开始执行

Observable.create(new ObservableOnSubscribe() {

@Override

    public void subscribe(@NonNull ObservableEmitter emitter)throws Throwable {

emitter.onNext(new String("第二次调用"));

}

});

第八步:返回信息给observer

new Observer() {

@Override

    public void onComplete() {

}

@Override

    public void onError(@NonNull Throwable e) {

}

@Override

    public void onNext(@NonNull Object o) {


}


RxJava 相当于俄罗斯套娃,先创建一系列套娃(包含关系)先小后大,需要查看是时从里向外打开。套娃先放大的在放小的就是从外向里打开,这样可能更好理解,我们要的就是最小的套娃里的武功秘籍。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容