RxJava 学习进行中-Scan&Debounce&ThrottleWithTimeout

RX

1.Scan

连续地对数据序列的每一项应用一个函数,然后连续发射结果
操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator

看完文档,感觉Scan叫的太牵强,不过自己也想不出别这个更好的词来,当然accumulator更加得形象。它接受的参数可以只是一个固定有两个参数一个输出的Func2<>

        Character[] strs = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h','i'};
        Observable<Character> origin = Observable.from(strs);

        origin.scan("",new Func2<String, Character, String>() {

            @Override
            public String call(String s, Character character) {
                String result = s+character.charValue();
                Log.d("scan","拼接字符 "+s +" + "+character.charValue() +" = " +result);
                return result;
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String result) {
                Log.d("scan","接收到的 result = "+result);
            }
        });

可以看到Scan还有一个参数,这里是一个空字符串,这是另一种形式,第一个参数是你预先设定好的初值,第二个参数则是你的函数,初值会作为第一个值加入到方法中去。

注意

A = C , B随意
    scan("",new Func2<A, B, C>() {
            @Override
            public C call(A a,  B b) {
                return .....;
            }
        })

这是因为Scan会接受上一个scan返回值作为第一个参数,当然如果你写不一样了,AS会提醒你的,一片红色波浪线...

运行结果

2.Debounce & ThrottleWithTimeout

先贴上官方说明

仅在过了一段指定的时间还没发射数据时才发射一个数据,会过滤掉发射速率过快的数据项。

一开我的理解是这样的,在一定时间段内发送在这个时间段内输出的最后一个数据

如下图

错误的理解

于是乎,我写了以下代码做测试

        Observable.interval(1, TimeUnit.SECONDS)//每隔1秒发送一个Long型的从0自增的数
                .debounce(3, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debounce","aLong num = "+aLong);
                    }
                });

然而,打印出来的是一片空白。啥都没有!我懵逼了,跟想象中不一样啊!

于是乎请教了搜索引擎,找到呼啸而过11写的代码

   Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            if(subscriber.isUnsubscribed()) return;
            try {
                //产生结果的间隔时间分别为100、200、300...900毫秒
                for (int i = 1; i < 10; i++) {
                    subscriber.onNext(i);
                    Thread.sleep(i * 100);
                }
                subscriber.onCompleted();
            }catch(Exception e){
                subscriber.onError(e);
            }
        }
    }).subscribeOn(Schedulers.newThread())
            .debounce(400, TimeUnit.MILLISECONDS)  //超时时间为400毫秒
            .subscribe(
                    new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.d("debounce","onnext aLong num = "+integer);
                        }
                    }, new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            Log.d("debounce","onerror ");
                        }
                    }, new Action0() {
                        @Override
                        public void call() {
                            Log.d("debounce","oncompleted ");
                        }
                    });

运行结果如下:
Next:4
Next:5
Next:6
Next:7
Next:8
Next:9
completed!

恍然大悟,其真正含义是在输出了一个数据后的一段时间内,没有再次输出新的数据,则把这个数据真正的发送出去;假如在这段时间内有新的数据输出,则以这个数据作为将要发送的数据项,并且重置这个时间段,重新计时。

正确的理解

我把之前的测试代码按照我的理解修改了一下

/**debounce设置时间间隔为3秒,interval每一秒发送一个数据项,中间用filter过滤掉了第1、2、3,6、
7,11、12、13、14秒的数据项,就是为了看第0项,第5项,第10项能否发送,按理解,在3秒的时间间隔
内没有新数据项到达才会发送最后一个数据项,即0、10项是可以被发送的,而5不可以;y以及使用map打印
出真正到达debounce的数据项,方便理解
*/
        Observable.interval(1, TimeUnit.SECONDS)//每隔1秒发送一个Long型的从0自增的数
                .filter(new Func1<Long, Boolean>() {
                    @Override
                    public Boolean call(Long aLong) {

                        if(aLong==1) return false;
                        if(aLong==2) return false;
                        if(aLong==3) return false; //发送0

                        if(aLong==6) return false;
                        if(aLong==7) return false; //不能发送5

                        if(aLong==11) return false;//发送 10
                        if(aLong==12) return false;
                        if(aLong==13) return false;
                        if(aLong==14) return false;
                        return true;
                    }
                })
                .map(new Func1<Long, Long>() {

                    @Override
                    public Long call(Long aLong) {
                        Log.d("debounce","发送了 "+aLong);
                        return aLong;
                    }
                })
                .debounce(3, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                                @Override
                                public void call(Long aLong) {
                                    Log.d("debounce","aLong num = "+aLong);
                                }
                            });

运行结果

Debounce有另一种形式,使用一个Func1<?, Observable<?>>的函数来限制发送的数据。

来自bobo_wang的代码

   Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .debounce(new Func1<Integer, Observable<Integer>>() {
                          @Override
                          public Observable<Integer> call(final Integer integer) {
                              return Observable.create(new Observable.OnSubscribe<Integer>() {
                                  @Override
                                  public void call(Subscriber<? super Integer> subscriber) {
                                      //如果%2==0,则发射数据并调用了onCompleted结束,则不会被丢弃
                                      if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
                                          subscriber.onNext(integer);
                                          subscriber.onCompleted();
                                      }
                                  }
                              });
                          }
                      }).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("debounce","integer = "+integer);
                }
            });

输出为
debounce:2
debounce:4
debounce:6
debounce:8
debounce:9

ThrottleWithTimeout则只有跟使用时间参数来限流的Debounce一样的功能。

这两个操作符的理解没有那么容易,需要写多几个例子来加深自己的印象。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,250评论 2 8
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,865评论 8 93
  • 版权声明:本文为小斑马伟原创文章,转载请注明出处! 上篇简单的阐述了响应式编程的基本理论。这篇主要对响应编程进行详...
    ZebraWei阅读 2,593评论 0 2
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 976评论 0 3
  • 响应式编程简介 响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者...
    说码解字阅读 3,141评论 0 5