RxJava 操作符(创建、变换)

概述

RxJava 操作符的类型有多种,如:创建、变换、过滤、组合、错误处理、辅助、条件和布尔操作符等,还有许多延伸操作符,这里接单记录常用操作符。
操作符官方文档

创建操作符

  • create (unsafeCreate)
    create 操作符创建一个Observable(被观察者)
        Observable observable = Observable.unsafeCreate(new Observable.OnSubscribe() {
            @Override
            public void call(Object o) {
                subscriber.onNext("令狐冲");
                subscriber.onNext("练会了独孤九剑");
                subscriber.onCompleted();
            }
        });
        subscriber = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.e("zpan", "onCompleted =====");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.e("zpan", "onNext =" + s);
            }
        };
        observable.subscribe(subscriber);
E/zpan: onNext =令狐冲
        onNext =练会了独孤九剑
E/zpan: onCompleted =====
  • just
    just 操作符创建一个依次将数据发射出去的Observable。
    注意:最多只能是10条数据。
        Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e("zpan", " just=" + integer);
                    }
                });
  • from
    from 操作符将集合/数组依次发射出去,没有数据数的限制。
        ArrayList<Integer> ints = new ArrayList<>();
        ints.add(1);
        ints.add(2);
        ints.add(3);

        Observable.from(ints)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e("zpan", " from=" + integer);
                    }
                });
  • interval
    interval 创建一个按固定时间间隔发射整数序列的Observable,相当于定时器。
    interval()函数有两个参数:第一个是两次发射的时间间隔,第二个是用到的时间单位。
    public void setInterval(View view) {
        Observable.interval(2, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.e("zpan", " interval = " + aLong);
                    }
                });
    }
  • rang
    range 发射指定范围的有序的整数序列的Observable,可以替代for循环。
    range()函数有两个参数:第一个是起始值(不小于0),第二个是结束值
    public void setRange(View view) {
        Observable.range(1, 5)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e("zpan", " range = " + integer);
                    }
                });
    }
  • repeat
    repeat 创建一个N次重复发射特定数据的Observable
    repea()函数有一个参数:重复次数
    public void setRepeat(View view) {
        Observable.range(0, 3)
                .repeat(2)   // 重复 2 次
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e("zpan", " repeat = " + integer);
                    }
                });
    }
  • defer
    只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable。
        String[] strings1 = {"Hello", "World"};
        String[] strings2 = {"Hello", "RxJava"};
        
        Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.from(strings1);
            }
        });
        strings1 = strings2;    //订阅之前把 strings1 修改了
        observable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("zpan", "defer = " + s);
            }
        });
E/zpan: defer = Hello
        defer = RxJava
  • empty
    创建一个发射空数据的Observable
        Observable.empty()
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        Log.e("zpan", "有参数 - " + o);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e("zpan", "报错 - " + throwable.getMessage());
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        Log.e("zpan", "无参数");
                    }
                });
E/zpan: 无参数
  • error
    创建一个发射error事件的Observable
Observable.error(new Exception("错误信息"))
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        Log.e("zpan", "有参数 - " + o);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e("zpan", "报错 - " + throwable.getMessage());
                    }
                }, new Action0() {
                    @Override
                    public void call() {
                        Log.e("zpan", "无参数");
                    }
                });
E/zpan: 报错 - 错误信息
  • never
    创建一个不发射任何事件也不会结束的Observable
  • timer
    创建一个在给定的延时之后发射数据为0的Observable
Observable.timer(1000, TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.e("zpan", " timer = " + aLong);
                    }
                });

变换操作符

变换操作符是对Observable发射的数据按照一定规则做变换,然后将变换后的数据发射出去。

  • Map
    map 操作符通过指定一个Func对象,将Observable转换成一个新的Observable对象并发射
Observable.just("123456")
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        return s + "456789";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("zpan", " map = " + s);
                    }
                });
E/zpan:  map = 123456456789
  • flatMap
    flatMap 操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable。作用比Map强大。
    注意:flatmap 的合并允许交叉,也就是说可能会交错发送事件,顺序可能会错乱。
    concatMap 操作符功能与flatmap操作符一致;不过,它解决了flatmap交叉问题。用法同flatmap
  • cast
    cast 操作符的作用是强制将Observable发射的所有数据转换为指定类型。
List<String> list = new ArrayList<>();
        list.add("11111111");
        list.add("22222222");
        list.add("33333333");
        list.add("44444444");
        Observable.from(list)
                .flatMap(new Func1<String, Observable<?>>() {
                    @Override
                    public Observable<?> call(String s) {
                        return Observable.just("新增" + s);
                    }
                })
                .cast(String.class)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("zpan", " flatmap = " + s);
                    }
                });
  • flatMapIterable
    flatMapIterable 操作符可以将数据包装成Iterable,在Iterable中我们就可以对数据进行处理了
Observable.just(1, 2, 3)
                .flatMapIterable(new Func1<Integer, Iterable<?>>() {
                    @Override
                    public Iterable<?> call(Integer integer) {
                        List<Integer> list = new ArrayList<>();
                        list.add(integer + 2);
                        return list;
                    }
                })
                .cast(Integer.class)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e("zpan", " flatMapIterable =" + integer);
                    }
                });
  • buffer
    buffer 操作符将原Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。
Observable.just(1, 2, 3, 4, 5, 6)
                .buffer(3)    // 缓存容量是 3
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        for (Integer integer : integers) {
                            Log.e("zpan", " buffer =" + integer);
                        }
                        Log.e("zpan", "=====华丽的分界线=====");
                    }
                });
E/zpan:  buffer =1
     buffer =2
     buffer =3
    =====华丽的分界线=====
     buffer =4
     buffer =5
     buffer =6
    =====华丽的分界线=====
  • window
    window操作符和buffer操作符类似,只不过window操作符发射的是Observable而不是数据列表。
Observable.just(1, 2, 3, 4, 5, 6)
                .window(3)
                .subscribe(new Action1<Observable<Integer>>() {
                    @Override
                    public void call(Observable<Integer> integerObservable) {
                        
                    }
                });
  • groupBy
    groupBy 操作符用于分组元素,将原Observable变换成一个发射Observable的新Observable(分组后的)
BookInfo b1 = new BookInfo("九阳神功", "A");
        BookInfo b2 = new BookInfo("西游记", "H");
        BookInfo b3 = new BookInfo("语文", "G");
        BookInfo b4 = new BookInfo("葵花宝典", "A");
        BookInfo b5 = new BookInfo("三国演义", "H");
        BookInfo b6 = new BookInfo("数学", "G");
        BookInfo b7 = new BookInfo("九阴真经", "A");

        Observable<GroupedObservable<String, BookInfo>> groupedObservable
                = Observable.just(b1, b2, b3, b4, b5, b6, b7)
                .groupBy(new Func1<BookInfo, String>() {
                    @Override
                    public String call(BookInfo bookInfo) {
                        return bookInfo.bookId;
                    }
                });
        // concat 组合操作符
        Observable.concat(groupedObservable)
                .subscribe(new Action1<BookInfo>() {
                    @Override
                    public void call(BookInfo bookInfo) {
                        Log.e("zpan", "groupBy = " + bookInfo.bookName + " - " + bookInfo.bookId);
                        Log.e("zpan", "========");
                    }
                });
E/zpan: groupBy = 九阳神功 - A
    ========
    groupBy = 葵花宝典 - A
    ========
    groupBy = 九阴真经 - A
    ========
    groupBy = 西游记 - H
    ========
    groupBy = 三国演义 - H
    ========
    groupBy = 语文 - G
    ========
    groupBy = 数学 - G
    ========
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354