RxJava入门解析(二)

续上一篇文章

一、RxJava的变换。

引用别人的一句话,变换就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。那么何为加工,下面开始以API举例

1、map()

 Observable.just("abc")
                .map(new Func1() {
                    @Override
                    public Object call(Object o) {
                        return o.toString()+"def";
                    }
                }).subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
        
                    }
        
                    @Override
                    public void onError(Throwable e) {
        
                    }
        
                    @Override
                    public void onNext(Object o) {
                        System.out.println(o.toString());
                    }
                });

由上方代码可以看出,在map方法里将传入的字符串进行了变化,这里出现了一个新的类叫Func1。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
另外可以看出经过map变化后,数据直接传入了onNext中进行使用,另外还可以将数据类型进行转变。这就由读者自行修改了。

2、flatMap()

这是一个很好用的方法,类似于map()方法对要发射的数据进行转换,但是与map()不同的是flatMap()方法并不是直接作用于SubScriber中,而是返回一个Observable的对象,然后由这个对象来执行:

 Observable.just("abc","bcd","cde")
                .flatMap(new Func1() {
                    @Override
                    public Object call(Object o) {
                        return Observable.just(o.toString()+"---");
                    }
                }).subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o.toString());
                    }
                });

map和flatmap的区别有点像just和from,一个是对数据直接传入,一个是对数组的数据进行分割处理。

/**
     * Returns an Observable that emits items based on applying a function that you supply to each item emitted
     * by the source Observable, where that function returns an Observable, and then merging those resulting
     * Observables and emitting the results of this merger.
*/

这是源码中官方的解释。对Observable发射的数据都应用一个函数,这个函数返回一个Observable,然后合并这些Observables,并且发送合并的结果。 flatMap和map操作符很相像,flatMap发送的是合并后的Observables,map操作符发送的是应用函数后返回的结果集。

3、lift()

在RxJava中,有各种的变换方式,但是原理都是基于lift方法的原理来做的,什么是lift方法,先看一下源码:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//1
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
  } 


public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }



public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

看一下源码可以得出几个结论
1、lift变换是new了一个新的被观察者来继续之前的操作
2、Operator对象可以视为一个带返回值的Subscriber对象
3、在new新的Observable对象时,创建了一个新的OnSubscribeLift对象作为OnSubscribe传入
4、新的OnSubscribeLift使用的OnSubscribe为原始的OnSubscribe,Operator为新的Subscriber
5、Operator的构造方法中已经传入了原始的Subscriber,可以直接使用
6、在subscribe方法调用时,其实是进入到了新的OnSubscribeLift的call方法中,然后由原始的onSubscribe对象调用新的Subscriber对象。举个例子:

     Observable.create(new Observable.OnSubscribe<Object>() {
                    @Override
                    public void call(Subscriber<? super Object> subscriber) {

                        System.out.println("====OnSubscribe===call=");
                        subscriber.onNext("123");
                        subscriber.onCompleted();
                    }
                })
                .lift(new Observable.Operator<Object, Object>() {

                    @Override
                    public Subscriber<? super Object> call(Subscriber<? super Object> subscriber) {

                        System.out.println("====lift===call=");
                        return new Subscriber<Object>() {
                            @Override
                            public void onCompleted() {

                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onNext(Object o) {
                                System.out.println("====lift===onNext=");
                                subscriber.onNext(Integer.decode(o.toString()));
                            }

                            @Override
                            public void onStart() {
                                super.onStart();
                                System.out.println("====lift===onStart=");
                            }
                        };
                    }
                }).subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println("====Subscriber===onNext=");
                        System.out.println(o.toString());
                    }
                    @Override
                    public void onStart() {
                        super.onStart();
                        System.out.println("====Subscriber===onStart=");
                    }
                });

打印结果为

====Subscriber===onStart=
====lift===call=
====lift===onStart=
====OnSubscribe===call=
====lift===onNext=
====Subscriber===onNext=
123

先由subscribe方法中调用原始的Subscriber的start方法,然后调用到lift的call方法,在call方法中调用了新的Subscriber的start方法,然后原始的OnSubscribe(parent)调用了call方法,传入了新的Subscriber,然后顺序调用onNext方法。
我们可以看一下map()方法的实现:

  public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
    }

 
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

}

逻辑基本是一样的,就不多做赘述。

Subject

RxJava中常见的Subject有4种,分别是AsyncSubject、 BehaviorSubject、 PublishSubject、 ReplaySubject
Subject既可以做观察者也可以做被观察者。
1、AsyncSubject :AsyncSubject无论输入多少参数,永远只输出最后一个参数。
2、BehaviorSubject:会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。
3、PublishSubject:发送订阅起到Completed之间所有的值。
4、ReplaySubject:无论何时订阅,都会将所有历史订阅内容全部发出。
使用方法:

//        PublishSubject sb = PublishSubject.create();
        BehaviorSubject sb = BehaviorSubject.create();//配置默认值为BehaviorSubject.create("defaultData");
//        AsyncSubject sb = AsyncSubject .create();
//        ReplaySubject sb = ReplaySubject.create();
        sb.subscribe(
                new Action1<Object>() {
                    @Override
                        public void call(Object o) {
                        System.out.println(o.toString());
                    }
                });

        sb.onNext(1);
        sb.onNext(2);
        sb.onNext(3); 
        sb.onCompleted();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容