RxJava学习笔记(变换Observables)

转换Observables

转换Observables,顾名思义就是变换可观测序列,来创建一个能够更好的满足我们需求的序列。

map家族

RxJava提供了几个mapping函数: map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有这些函数都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。

Map

Rxjava的map函数接收一个指定的Func1对象,然后将它应用到每一个由Observable发射的值上。Func1是一个接口,两个泛型,内部只有一个call方法,一个泛型是参数类型,一个泛型是返回值类型,在这里就可以理解这个call方法就是将可观测序列元素转换的转换方法。so,意思就是我们可以自己定制自己需要的转换方法。

  • 比如下面的代码,我们让可观测序列元素的值乘以5发射。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<Integer>() {
              @Override
              public void call(Subscriber<? super Integer> subscriber) {
                  subscriber.onNext(1);
                  subscriber.onNext(2);
                  subscriber.onNext(3);
              }
          }).map(new Func1<Integer, Integer>() {
              @Override
              public Integer call(Integer integer) {
                  return integer * 5;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onCompleted() {
                  System.out.println("completed");
              }
    
              @Override
              public void onError(Throwable e) {
                  System.out.println("something is error");
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println("i = " + integer);
              }
          });
      }
    

打印结果:</br>
i = 5</br>
i = 10</br>
i = 15</br>

FlatMap

flatMap同样也是转换元素的,不过和map不同的是,flatMap转换的返回的是Observable对象。要对其进行理解做一个对比比较容易理解。已知:两个javabean类,School和Student:

public class School {
    private List<Student> mStudents = new ArrayList<>();

    public List<Student> getStudents() {
        return mStudents;
    }

    public void setStudents(List<Student> students) {
        mStudents = students;
    }
}

就School特殊一点,只写了一个Student的集合,当然,当中还可以包含其他数据,这里只是体现其中有一个数据集合,Student类就是一个普通bean类,包含name,age两个成员变量及其get/set方法。然后假设我们获取到一个School变量,需要从中得到所有的Student的具体数据,用map我们可以这样干:

public static void main(String... args) {
    School school = new School();
    school.getStudents().add(new Student("张三" , 23));
    school.getStudents().add(new Student("李四" , 24));
    school.getStudents().add(new Student("王五" , 25));

    Observable.just(school)
            .map(new Func1<School, List<Student>>() {
                @Override
                public List<Student> call(School school) {
                    return school.getStudents();
                }
            })
            .subscribe(new Observer<List<Student>>() {
                @Override
                public void onCompleted() {
                
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(List<Student> students) {
                    for (Student student : students) {
                        System.out.println("name = " + student.getName());
                    }
                }
            });
}

接收一个Student数据集合遍历获取当中的数据,但是如果我们不想在代码中使用for循环遍历,而是希望在Subscriber中直接传入单个的Student对象呢,用map显然是行不通的,因为map仅仅是一对一的转化,而现在要求的是一对多的转换,那么如何实现呢,我们用flatMap:

public static void main(String... args) {
    School school = new School();
    school.getStudents().add(new Student("张三" , 23));
    school.getStudents().add(new Student("李四" , 24));
    school.getStudents().add(new Student("王五" , 25));

    Observable.just(school)
            .flatMap(new Func1<School, Observable<Student>>() {
                @Override
                public Observable<Student> call(School school) {
                    return Observable.from(school.getStudents());
                }
            })
            .subscribe(new Observer<Student>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Student student) {
                    System.out.println("name = " + student.getName());
                }
            });
}

打印的结果都是:</br>
name = 张三</br>
name = 李四</br>
name = 王五</br>

我们有一个数据序列,它发射一个数据序列,这些数据本身自己也可以发射Observable,flatMap就可以合并这些Observable发射的数据,然后将合并后的结果作为最终的Observable。但是它的合并允许交叉,也就意味着flatMap()不能够保证在最终生成的Observable中源Observables确切的发射顺序。

ConcatMap

RxJava的concatMap()函数解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。

public static void main(String... args) {
    School school = new School();
    school.setAddress("China");
    school.getStudents().add(new Student("张三" , 23));
    school.getStudents().add(new Student("李四" , 24));
    school.getStudents().add(new Student("王五" , 25));

    Observable<School> observable = Observable.just(school);
    observable.subscribe(new Action1<School>() {
        @Override
        public void call(School school) {
            System.out.println("school address = " + school.getAddress());
        }
    });
    observable.concatMap(new Func1<School, Observable<Student>>() {
        @Override
        public Observable<Student> call(School school) {
            return Observable.from(school.getStudents());
        }
    }).subscribe(new Observer<Student>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Student student) {
            System.out.println("student name = " + student.getName());
        }
    });
}

这里给School类添加一个属性地址,打印结果:</br>
school address = China</br>
student name = 张三</br>
student name = 李四</br>
student name = 王五</br>

FlatMapIterable

flatMapIterable()可以将数据包装成Iterable,在Iterable中我们可以随意对数据进行加工。

public static void main(String... args) {
    Observable.just(1,2,3,4)
            .flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
                @Override
                public Iterable<Integer> call(Integer integer) {
                    List<Integer> list = new ArrayList();
                    list.add(integer);
                    list.add(5);
                    return list;
                }
            }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println("i = " + integer);
        }
    });
}

这里在每个数据创建的Iterable中都加入了一个元素"5",打印结果如下:</br>
i = 1</br>
i = 5</br>
i = 2</br>
i = 5</br>
i = 3</br>
i = 5</br>
i = 4</br>
i = 5</br>

SwitchMap

每当源Observable发射一个新的数据项( Observable) 时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

Scan

RxJava的scan()函数可以看做是一个累积函数。scan()函数对原始Observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。

public static void main(String... args) {
    Observable.just(1,2,3,4,5)
            .scan(new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) {
                    return integer + integer2;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("rxjava onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("rxjava onError");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("result = " + integer);
                }
            });
}

这里对每一个数据元素都进行了相加的操作,看结果更明了:</br>
result = 1</br>
result = 3</br>
result = 6</br>
result = 10</br>
result = 15</br>
rxjava onCompleted</br>

还有一个scan()函数的变体scan(R initialValue, Func2<R, ? super T, R> accumulator),它可以设置一个初始值作为第一个发射的元素值,

public static void main(String... args) {
    Observable.just(1,2,3,4,5)
            .scan(10, new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) {
                    return integer + integer2;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {
                    System.out.println("rxjava onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("rxjava onError");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("result = " + integer);
                }
            });
}

这里我们为scan()设置了一个初始值10,那么发射的第一个元素就是这个10,之后又与其相加,打印结果如下:</br>
result = 10</br>
result = 11</br>
result = 13</br>
result = 16</br>
result = 20</br>
result = 25</br>
rxjava onCompleted</br>

GroupBy

groupBy()将源Observable变换成一个发射Observables的新的Observable。它们中的每一个新的Observable都发射一组指定的数据。实际使用中,我们需要提供一个生成key的规则(也就是Func1中的call方法),所有key相同的数据会包含在同一个小的Observable中。

public static void main(String... args) {
    Student lisi = new Student("李四", 24);
    Student wangwu = new Student("王五", 25);
    Student zhangsan = new Student("张三", 23);
    Student jianjian = new Student("jianjian", 24);
    Student wenwen = new Student("wenwen", 25);

    Observable<GroupedObservable<Integer, Student>> GroupedObservable = Observable.just(lisi, wangwu, zhangsan, jianjian, wenwen)
            .groupBy(new Func1<Student, Integer>() {
                @Override
                public Integer call(Student student) {
                    return student.getAge();
                }
            });
    Observable.concat(GroupedObservable)
            .subscribe(new Action1<Student>() {
                @Override
                public void call(Student student) {
                    System.out.println("student = " + student.getName() + ", age = " + student.getAge());
                }
            });
}

这里我们首先创建了一个新的Observable:GroupedObservable,它将会发送一个带有GroupedObservable的序列(也就是指发送的数据项的类型为GroupedObservable)。GroupedObservable是一个特殊的Observable,它基于一个分组的key,在这个例子中,key就是student的年龄age,代表的意思就是年龄相同的数据会包含在一起。打印结果:</br>
student = 李四, age = 24</br>
student = jianjian, age = 24</br>
student = 王五, age = 25</br>
student = wenwen, age = 25</br>
student = 张三, age = 23</br>

Buffer

RxJava中的 buffer() 函数将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。

public static void main(String... args) {
    Observable.just(1,2,3,4,5,6)
            .buffer(2)
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    for (Integer i : integers) {
                        System.out.println("i = " + i);
                    }
                    System.out.println("-------------------");
                }
            });
}

这里的buffer(2)指定了缓冲容量的大小为2,打印结果:</br>
i = 1</br>
i = 2</br>
-------------------</br>
i = 3</br>
i = 4</br>
-------------------</br>
i = 5</br>
i = 6</br>
-------------------</br>
实际上,buffer()函数还有几种重载方法,其中一个允许你指定一个skip值,此后每 skip 项数据,然后又用count项数据填充缓冲区。

public static void main(String... args) {
    Observable.just(1,2,3,4,5,6)
            .buffer(2, 3)
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    for (Integer i : integers) {
                        System.out.println("i = " + i);
                    }
                    System.out.println("-------------------");
                }
            });
}

这里buffer(2, 3)设置了每3个元素中用两个元素填充缓冲区,打印结果:</br>
i = 1</br>
i = 2</br>
-------------------</br>
i = 4</br>
i = 5</br>
-------------------</br>
buffer() 还可以带一个 timespan 的参数,会创建一个每隔timespan时间段就会发射一个列表的Observable。

Window

Rxjava的window()buffer()很像,但是它发射的是Observable而不是数据列表,发射的这些Observables都是源Observable数据的一个子集,数量由参数count来定,最后发送一个onCompleted()结束。

public static void main(String... args) {
    Observable.just(1,2,3,4,5)
            .window(3)
            .subscribe(new Observer<Observable<Integer>>() {
                @Override
                public void onCompleted() {
                    System.out.println("window onCompleted");
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Observable<Integer> integerObservable) {
                    integerObservable.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.println("onCompleted");
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(Integer integer) {
                            System.out.println("i = " + integer);
                        }
                    });
                }
            });
}

这里的window(3)设置了子集的大小为3,打印结果:</br>
i = 1</br>
i = 2</br>
i = 3</br>
onCompleted</br>
i = 4</br>
i = 5</br>
onCompleted</br>
window onCompleted</br>

同样的,window()也有带skip参数的重载方法。

Cast

Rxjava的cast()可以将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。第一次看到这个,我还以为可以随便转,就用一个Integer转成String试试咯,好吧原谅我的天真,java.lang.ClassCastException立马打我脸,接着我创建了一个父类一个子类,里面很简单。
父类:

public class Father {
    public void eat(){
        System.out.println("father was eat");
    }
}

子类:

public class Son extends Father {
    public void eat(){
        System.out.println("son was eat");
    }
}

接着:

public static void main(String... args) {
    Observable.just(new Son())
            .cast(Father.class)
            .subscribe(new Action1<Father>() {
                @Override
                public void call(Father f) {
                    f.eat();
                }
            });
}

注意这里只能向上转型,也就是说子类转成父类,构成这样:Father f = new Son(),要记住一点,"编译看左边,运行看右边",打印结果:</br>
son was eat</br>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容

  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,190评论 2 8
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,646评论 8 93
  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,853评论 0 10
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,464评论 7 62
  • 引言:学习了一下RxJava,理解其是一个以升级版的观察者模式为核心的异步处理库。旨在以更加简介、可读性更强的代码...
    androidjp阅读 5,689评论 1 28