RxJava——基础学习(六),过滤操作符

学习资料

1.过滤操作符

作用:对Observable发射的 数据序列 进行 过滤或选择

1.1 first

只发射第一个或者第一个满足某个条件的数据项

1.1.1 first()第一项

简单使用:

public class FirstDemo {
    public static void main(String[] args) {
        first();
    }

    /**
     * 只发送第一项数据
     */
    private static void first() {
        Observable
                .just(1,2,3,4)
                .first()
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println(integer);
                    }
                });
    }
}

运行结果:

1

1.1.2 first(Func1)

只发送第一个满足条件的数据

简单使用:

    /**
     * 第一个偶数
     */
    private static void firstTrue() {
        Observable
                .just(1,3,4,5)
                .first(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe((i)->System.out.println("第一个偶数:" + i));
    }

运行结果:

第一个偶数:4

last正好与first相反,是只发送最后一个或者最后一个满足条件的数据项


1.2 take

只发送前N项数据

1.2.1 take(int)

只发送前int项数据项,默认不任何特定的调度器上执行

简单使用:

    /**
     * 只发送前5项数据
     */
    private static void takeInt() {
        Observable
                .interval(500, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

0 ,1 ,2 ,3 ,4 ,

1.2.2 take(long,TimeUnit,Scheduler)

在写定的时间段内,会发送Observable发出的数据项,默认在computation运算调度器上执行

简单使用:

    /**
     * 发送1s以内的数据项
     */
    private static void takeTime() {
        Observable
                .interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .take(1,TimeUnit.SECONDS ,Schedulers.newThread())
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

0 ,1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,

发送的数据项只是1s以内的,并不包括1s时的

takeLast是只发送后n项数据


1.3 Skip

跳过数据项

1.3.1 skip(int)

跳过Observable发送的前n项数据项,默认不在任何特定的调度器上执行

简单使用:

    /**
     * 跳过前3项数据
     */
    private static void skipInt() {
        Observable
                .range(1,10)
                .skip(3)
                .subscribe((i) -> System.out.print(i + " ,"));
    } 

运行结果:

4 ,5 ,6 ,7 ,8 ,9 ,10 ,

1.3.2 skip(long,TimeUnit,Scheduler)

跳过给定的时间段内Obsvable发送过来的数据项,默认在computation运算调度器上执行

简单使用:

    /**
     * 发送前5个,500毫秒之后的数据项
     */
    private static void skipTime() {
        Observable
                .interval(100, TimeUnit.MILLISECONDS, Schedulers.immediate())
                .skip(500,TimeUnit.MILLISECONDS)
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

4 ,5 ,6 ,7 ,8 ,

SkipLast就是跳过后n个数据项


1.4 Sample

定期发射Observable最近发射的数据项

1.4.1 sample(long,TimeUnit)

定时查看一个Observable,然后将自上次采样后,Observable最近一次发送的数据发送出去,默认在默认在computation调度器上执行

注意:如果从上次采样后,原始的Observable没有发出数据项,sample操作返回的新的Observable在监测期时间内也不会发射任何数据

简单使用:

    /**
     * 每隔100毫秒,将Observable最近一个发送的数据项发送出去
     */
    private static void sampleTime() {
        Observable
                .interval(30,TimeUnit.MILLISECONDS, Schedulers.immediate())
                .sample(100, TimeUnit.MILLISECONDS)
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));

    }

运行结果:

1 ,5 ,8 ,11 ,15 ,

1.4.2 sample(Observable)

  • sample(signal)

当监测到名字为signalObaervable发过来一个信号或者终止时,就对原始Observable发送的数据进行采样,然后将自从上次采样以来最近一次发送的数据发送出去

默认不在任何特定的调度器上执行

简单使用:

    /**
     * 每当收到信号时,将最近发送的一个数据项发送出去
     */
    private static void sameSignal() {
        Observable
                .interval(30,TimeUnit.MILLISECONDS, Schedulers.immediate())
                .sample(Observable.interval(100,TimeUnit.MILLISECONDS))
                .take(5)
                .subscribe((i) -> System.out.print(i + " ,"));
    }

运行结果:

2 ,5 ,8 ,12 ,15 ,

第一个输出的数字是2,因为是从0开始的,每次输出的数字中间都会间隔2个


1.5 Debounce

两次发送数据项间隔大于一段指定的时间,才发射一个数据

注意:最后的onCompleted信号,会紧随着最后一项原始Observable数据项,即使是小于时间间隔,一旦结束到onCompleted信号,整个操作也就结束了,onCompleted通知不会触发限流

1.5.1 debounce(long,TimeUnit)

在指定的时间long间隔进行限流,个人理解,过滤两次数据小于指定间隔的数据项,与上次发送的时间差大于间隔的数据项才进行发送

默认在computation调度器上执行

简单使用:

    /**
     * 输出两次间隔大于150秒的数据项
     */
    private static void deBounceTime() {
        Observable
                .create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        try {
                            for (int i = 0; i < 5; i++) {
                                //产生在100到200间随机时间间隔
                                TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 100 + 100));
                                subscriber.onNext(i);
                            }
                             //延迟结束信号 否则最后一次一定不会发送
                            TimeUnit.MILLISECONDS.sleep(100);
                            subscriber.onCompleted();
                        } catch (InterruptedException e) {
                            subscriber.onError(e);
                        }
                    }
                })
                .debounce(150, TimeUnit.MILLISECONDS, Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(" --> onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                         System.out.println(e.getMessage());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.print(integer + " ,");
                    }
                });
    }

运行结果:

2 ,4 , --> onCompleted

注意subscriber.onCompleted()发送结束通知信号的时机


1.5.2 debounce(Func1)

对原始Observable的每一个数据项应用一个函数进行限流,这个函数返回一个Observable。接到通知前,原始Observable发送的数据项将会被抑制

默认不在任何特定的调度器上执行

简单使用:

    /**
     * 在没有接到通知的150毫秒内,原始Observable发送的数据项将会被抑制
     */
    private static void deBounceSignal() {
        Observable
                .create((subscriber) -> {
                    try {
                        for (int i = 0; i < 5; i++) {
                            //产生在100到200间随机时间间隔
                            TimeUnit.MILLISECONDS.sleep((int) (Math.random() * 100 + 100));
                            subscriber.onNext(i);
                        }
                        //延迟结束信号 否则最后一次一定不会发送
                        TimeUnit.MILLISECONDS.sleep(100);
                        subscriber.onCompleted();
                    } catch (InterruptedException e) {
                        subscriber.onError(e);
                    }
                })
                .debounce(new Func1<Object, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(Object o) {
                        //每隔150毫秒发出一个通知
                        return Observable.interval(150, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted() {
                        System.out.println(" --> onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println(e.getMessage());
                    }

                    @Override
                    public void onNext(Object o) {
                        System.out.print(o + " ,");
                    }
                });
    }   

运行结果:

0 ,2 ,4 , --> onCompleted

注意:最后一个数据项一定会被发送,即使在没有接到通知的150毫秒内


1.6 Distinct

过滤掉重复的数据项,默认不在任何特定的调度器上执行

1.6.1 distinct()

简单使用:

    /**
     * 去除重复项
     */
    private static void distinct() {
        Observable
                .just(1,2,2,3,4,4,5,6,6)
                .distinct()
                .subscribe((i) -> System.out.print(i+" ,"));
    }

运行结果:

1 ,2 ,3 ,4 ,5 ,6 ,

1.6.2 distinct(Func1)

将原始Observable发送的数据项应用一个函数,根据这个函数产生不同的key,之后的数据项便是比较key,而不再管数据项

简单使用:

    /**
     * 根据条件指定过滤的key ,将之后出现 key为"1","2",全部过滤
     */
    private static void distinctKey() {
        Observable
                .just(1,2,2,3,3,4,5,6,6)
                .distinct(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        //设置key
                        return integer / 2 == 0 ? "1" : "2" ;
                    }
                })
                .subscribe((i) -> System.out.print(i+" ,"));
    }

运行结果:

1 ,2 ,

1.7 ElementAt

只发射索引值为N的数据项,索引从0开始

如果传递的索引为负数,或者索引不小于数据项个数,将会抛出一个IndexOutOfBoundsException异常

简单使用:

public class ElementAtDemo {
    /**
     * 输出索引值为5的数据项,从0开始
     */
    public static void main(String[] args) {
        Observable
                .range(1,10)
                .elementAt(5)
                .subscribe(System.out::println);
    }
}

运行结果:

6

1.8 IgnoreElements

不发射任何数据,只发射Observable的终止通知onErroronCompleted

ignoreElements

若不关心Obsvable发送的数据项,只想在完成时,或者遇到错误终止时收到通知,可以使用,这个操作符永远不会调用观察者的onNext()方法

默认不在任何特定的调度器上执行


2. 最后

感觉过滤操作符比变换操作符理解起来要容易一些

本人很菜,有错误请指出

共鸣 :)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,819评论 0 10
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,570评论 8 93
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,744评论 0 1
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,178评论 2 8
  • RxJava正在Android开发者中变的越来越流行。唯一的问题就是上手不容易,尤其是大部分人之前都是使用命令式编...
    刘启敏阅读 1,831评论 1 7