RxJava入门解析(二)

续上一篇文章

一、RxJava的变换。

引用别人的一句话,变换就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。那么何为加工,下面开始以API举例

1、map()

 Observable.just("abc")
                .map(new Func1() {
                    @Override
                    public Object call(Object o) {
                        return o.toString()+"def";
                    }
                }).subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
        
                    }
        
                    @Override
                    public void onError(Throwable e) {
        
                    }
        
                    @Override
                    public void onNext(Object o) {
                        System.out.println(o.toString());
                    }
                });

由上方代码可以看出,在map方法里将传入的字符串进行了变化,这里出现了一个新的类叫Func1。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
另外可以看出经过map变化后,数据直接传入了onNext中进行使用,另外还可以将数据类型进行转变。这就由读者自行修改了。

2、flatMap()

这是一个很好用的方法,类似于map()方法对要发射的数据进行转换,但是与map()不同的是flatMap()方法并不是直接作用于SubScriber中,而是返回一个Observable的对象,然后由这个对象来执行:

 Observable.just("abc","bcd","cde")
                .flatMap(new Func1() {
                    @Override
                    public Object call(Object o) {
                        return Observable.just(o.toString()+"---");
                    }
                }).subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println(o.toString());
                    }
                });

map和flatmap的区别有点像just和from,一个是对数据直接传入,一个是对数组的数据进行分割处理。

/**
     * Returns an Observable that emits items based on applying a function that you supply to each item emitted
     * by the source Observable, where that function returns an Observable, and then merging those resulting
     * Observables and emitting the results of this merger.
*/

这是源码中官方的解释。对Observable发射的数据都应用一个函数,这个函数返回一个Observable,然后合并这些Observables,并且发送合并的结果。 flatMap和map操作符很相像,flatMap发送的是合并后的Observables,map操作符发送的是应用函数后返回的结果集。

3、lift()

在RxJava中,有各种的变换方式,但是原理都是基于lift方法的原理来做的,什么是lift方法,先看一下源码:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//1
        return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
  } 


public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }



public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

看一下源码可以得出几个结论
1、lift变换是new了一个新的被观察者来继续之前的操作
2、Operator对象可以视为一个带返回值的Subscriber对象
3、在new新的Observable对象时,创建了一个新的OnSubscribeLift对象作为OnSubscribe传入
4、新的OnSubscribeLift使用的OnSubscribe为原始的OnSubscribe,Operator为新的Subscriber
5、Operator的构造方法中已经传入了原始的Subscriber,可以直接使用
6、在subscribe方法调用时,其实是进入到了新的OnSubscribeLift的call方法中,然后由原始的onSubscribe对象调用新的Subscriber对象。举个例子:

     Observable.create(new Observable.OnSubscribe<Object>() {
                    @Override
                    public void call(Subscriber<? super Object> subscriber) {

                        System.out.println("====OnSubscribe===call=");
                        subscriber.onNext("123");
                        subscriber.onCompleted();
                    }
                })
                .lift(new Observable.Operator<Object, Object>() {

                    @Override
                    public Subscriber<? super Object> call(Subscriber<? super Object> subscriber) {

                        System.out.println("====lift===call=");
                        return new Subscriber<Object>() {
                            @Override
                            public void onCompleted() {

                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onNext(Object o) {
                                System.out.println("====lift===onNext=");
                                subscriber.onNext(Integer.decode(o.toString()));
                            }

                            @Override
                            public void onStart() {
                                super.onStart();
                                System.out.println("====lift===onStart=");
                            }
                        };
                    }
                }).subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    @Override
                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.println("====Subscriber===onNext=");
                        System.out.println(o.toString());
                    }
                    @Override
                    public void onStart() {
                        super.onStart();
                        System.out.println("====Subscriber===onStart=");
                    }
                });

打印结果为

====Subscriber===onStart=
====lift===call=
====lift===onStart=
====OnSubscribe===call=
====lift===onNext=
====Subscriber===onNext=
123

先由subscribe方法中调用原始的Subscriber的start方法,然后调用到lift的call方法,在call方法中调用了新的Subscriber的start方法,然后原始的OnSubscribe(parent)调用了call方法,传入了新的Subscriber,然后顺序调用onNext方法。
我们可以看一下map()方法的实现:

  public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return create(new OnSubscribeMap<T, R>(this, func));
    }

 
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

}

逻辑基本是一样的,就不多做赘述。

Subject

RxJava中常见的Subject有4种,分别是AsyncSubject、 BehaviorSubject、 PublishSubject、 ReplaySubject
Subject既可以做观察者也可以做被观察者。
1、AsyncSubject :AsyncSubject无论输入多少参数,永远只输出最后一个参数。
2、BehaviorSubject:会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。
3、PublishSubject:发送订阅起到Completed之间所有的值。
4、ReplaySubject:无论何时订阅,都会将所有历史订阅内容全部发出。
使用方法:

//        PublishSubject sb = PublishSubject.create();
        BehaviorSubject sb = BehaviorSubject.create();//配置默认值为BehaviorSubject.create("defaultData");
//        AsyncSubject sb = AsyncSubject .create();
//        ReplaySubject sb = ReplaySubject.create();
        sb.subscribe(
                new Action1<Object>() {
                    @Override
                        public void call(Object o) {
                        System.out.println(o.toString());
                    }
                });

        sb.onNext(1);
        sb.onNext(2);
        sb.onNext(3); 
        sb.onCompleted();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容