RxJava学习之转换型操作符

RxJava学习之转换型操作符

标签(空格分隔): RX系列


转换型操作符

下面展示了可用于Observable发射的数据执行变换操作的各种操作符

  • map()---对序列的每一项都应用一个函数来变换Observable发射的数据序列
  • flatMap()、concatMap()、flatMapIterable()---将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
  • switchMap()---将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
  • scan()---对Observable发射的每一项数据应用一个函数,然后按照顺序依次发射每一个值
  • groupyBy()---将Observable分拆为Observable集合,将原始的Observable发射的数据按照key分组,每一个Observable发射一组不同的数据
  • buffer()---它顶起从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射
  • window()---定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
  • cast()---在发射之前强制将Observable发射的所有哦数据转换为指定数据类型

map操作符


对Observable发射的每一项数据应用一个函数,执行变换操作
Map操作符对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。

RxJava将这个操作符实现为map函数。这个操作符默认不在任何特定的调度器上执行。

Map操作符的源码
/**
     * Returns an Observable that applies a specified function to each item emitted by the source Observable and
     * emits the results of these function applications.
     * <p>
     * ![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png)
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     * 
     * @param func
     *            a function to apply to each item emitted by the Observable
     * @return an Observable that emits the items from the source Observable, transformed by the specified
     *         function
     * @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
     */
    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
    
    //可以看到接受一个Func1函数
/**
 * Represents a function with one argument.
 * @param <T> the first argument type  作为参数类型
 * @param <R> the result type  作为返回参数类型
 */
public interface Func1<T, R> extends Function {
    R call(T t);
}
    

map例子

static List<Student> studentList = new ArrayList<Student>(){
        {
            add(new Student("FAALLDA", 28));
            add(new Student("小弟弟", 23));
            add(new Student("妻子的孜孜", 25));
        }
    };
    
    /**
     * Map
     * 
     * 通过使用map中的方法对Observable中发射出来的所有数据进行变换
     * 
     * test1()方法是得到多个Student对象中的name,保存到nameList中
     * 注意:接口Func1包装的是有返回值的方法。
     * 
     */
    private static void test1(){
        List<String> nameList = new ArrayList<>();
        Observable.from(studentList)
        .map(new Func1<Student, String>() {

            @Override
            public String call(Student student) {
                return student.name;
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted nameList.size() = " + nameList.size());     
            }
            
            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
                nameList.add(value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
        
    }
    

Map操作符连续使用

/**
     * Map操作符连续使用
     */
    private static void test(){
        Observable.from(studentList)
        //把student转换成Integer
        .map(new Func1<Student, Integer>() {
            
            @Override
            public Integer call(Student student) {
                return student.age;
            }
        })
        //将Integer转换成String
        .map(new Func1<Integer, String>() {

            @Override
            public String call(Integer t) {
                return String.valueOf(t+10);
            }
        })
        .subscribe(new Subscriber<String>() {
            
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            
            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
        
    }
    

Flatmap操作符

FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable


  • FlatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

  • 这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

注意:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。

/**
     * FlatMap操作符
     * FlatMap将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
     */
    private static void test4(){
        List<String> nameList = new ArrayList<>();
        Observable.from(studentList)
        //第一个Student类型使我们的flatMap返回函数call里面的参数类型
        //第二个Entity类型使我们的flatMap中的call的返回值类型
        .flatMap(new Func1<Student, Observable<Entity>>() {
            
            @Override
            public Observable<Entity> call(Student student) {
                Course course = couseMap.get(student.name);
                Entity entity = new Entity(course, student);
                return Observable.just(entity);
            }
        })
        .subscribe(new Subscriber<Entity>() {
            
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onNext(Entity entity) {
                System.out.println("onSuccess entity = " + entity);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
        
    }

concatMap操作符

还有一个concatMap操作符,它类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。


    /**
     * ConcatMap操作符
     * 类似于最简单版本的flatMap,但是它按次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。
     * 
     */
    private static void test5(){
        Observable.from(studentList)
        .concatMap(new Func1<Student, Observable<Course>>() {

            @Override
            public Observable<Course> call(Student t) {
                Course course = couseMap.get(t.name);
                return Observable.just(course);
            }
        })
        .subscribe(new Subscriber<Course>() {
            
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onNext(Course course) {
                System.out.println("onSuccess course = " + course);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
    }

concatMap和flatMap的区别

/**
     * flatMap与ConcatMap操作符比较
     * 区别:
     * 无序:FlatMap对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
     * 有序:ConcatMap不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据。
     * 
     * 说明:在同步线程中,FlatMap和ConcactMap的执行结果是一样的(结果是有序的),
     *      只有在异步线程中,FlatMap结果可能是无序的,而ConcactMap始终能保持有序的结果。
     * 
     * concatMap与flatMap操作符的比较 
     */
    private static void test(){
        List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
        Observable.from(numbers)
        .flatMap(new Func1<Integer, Observable<Integer>>() {

            @Override
            public Observable<Integer> call(Integer t) {
                return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
                //return Observable.just(t);
            }
        })
        .subscribe(new Subscriber<Integer>() {
            
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onNext(Integer value) {
                System.out.println("flatMap onSuccess value = " + value);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
        System.out.println("----------------------------");
        Observable.from(numbers)
        .concatMap(new Func1<Integer, Observable<Integer>>() {
            
            @Override
            public Observable<Integer> call(Integer t) {
                return Observable.just(t).subscribeOn(Schedulers.from(Executors.newCachedThreadPool()));
                //return Observable.just(t);
            }
        })
        .subscribe(new Subscriber<Integer>() {
            
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onNext(Integer value) {
                System.out.println("concatMap onNext value = " + value);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
    }
    

switchMap操作符

  • 它和flatMap很像,除了一点:当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前那个数据的Observable,只监视当前这一个
    /**
     * switchMap
     * 解释:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
     * 用法与FlatMap几乎一样,区别是SwitchMap操作符只会发射[emit]最近的Observables。
     * 
     * 当源Observable发射一个新的数据项时,如果旧数据项订阅还未完成,就取消旧订阅数据和停止监视那个数据项产生的Observable,开始监视新的数据项.
     * 
     * 应用场景:http://blog.csdn.net/jdsjlzx/article/details/51730162
     * 
     * 逻辑推演:
     * A --> 取消空的,没有可以取消的
     * B-->  A1被取消
     * C-->  B1被取消
     * D-->  C1被取消
     * E-->  D1被取消
     * 最终输出E1
     */
    private static void test7(){
        Observable.just("A", "B", "C", "D", "E")
        .switchMap(new Func1<String, Observable<String>>() {  
            @Override  
            public Observable<String> call(String s) {  
                return Observable.just(s+"1").subscribeOn(Schedulers.newThread()); //并发
                //return Observable.just(s+"1");  
            }  
        })
        .subscribe(new Observer<String>() {  
            @Override  
            public void onCompleted() {  
                System.out.println("switchMap onCompleted");
            }  
  
            @Override  
            public void onError(Throwable e) {  
                System.out.println("switchMap onError :" + e);
            }  
  
            @Override  
            public void onNext(String s) {  
                System.out.println("switchMap Next :" + s);
            }  
        });  
        
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

switchMap与flatmap的区别

/**
     * switchMap与flatmap的区别
     * 
     * 说明:在同步线程中,switchMap发射[emit]所有的Observables,
     *      在异步线程中,switchMap只会发射[emit]最近的Observables。
     * 
     */
    private static void test8(){
        ExecutorService service = Executors.newFixedThreadPool(10);
        List<Integer> numbers = Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9, 10);
        Observable.from(numbers)
        .flatMap(new Func1<Integer, Observable<Integer>>() {
            
            @Override
            public Observable<Integer> call(Integer t) {
                return Observable.just(t).subscribeOn(Schedulers.from(service));
                //return Observable.just(t);
            }
        })
        .subscribe(new Subscriber<Integer>() {
            
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onNext(Integer value) {
                System.out.println("flatMap onNext value = " + value);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
        System.out.println("----------------------------------");
        Observable.from(numbers)
        .switchMap(new Func1<Integer, Observable<Integer>>() {
            
            @Override
            public Observable<Integer> call(Integer t) {
                return Observable.just(t).subscribeOn(Schedulers.from(service));
                //return Observable.just(t);
            }
        })
        .subscribe(new Subscriber<Integer>() {
            
            @Override
            public void onCompleted() {
            }
            
            @Override
            public void onNext(Integer value) {
                System.out.println("switchMap2 onNext value = " + value);
            }
            
            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });
        service.shutdown();
    }
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容