RxJava使用笔记

一、RxJava操作符

1、创建操作符

  • create、just、from
  • interval:创建一个按固定时间间隔发射整数序列的Flowable,相当于定时器
  • range:创建发射指定范围的整数序列的Flowable,可替代for循环
  • repeat:创建一个N次重复发射特定数据的Flowable

2、变换操作符

  • map:通过指定一个Function对象,转换成一个新的对象并发射
  • flaMap:将Flowable发射的数据集合变换为Flowable集合,然后将这些Flowable发射的数据平坦化的放进一个单独的Flowable
  • cast:强制将Flowable发射的所有数据转换为指定类型
  • concatMap:与flatMap一致,解决了flatMap的交叉问题
  • flatMapIterable:可将数据包装成Iterable
  • buffer:将源Flowable变换为一个新的Flowable,这个新的Flowable每次发射一组列表值
  • groupBy:分组元素

3、过滤操作符

  • filter:对源Flowable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者
  • elementAt:返回指定位置的数据 elementAtOrDefault(int,T)可以允许默认值
  • distinct():去重 distinctUntilChanged去掉连续重复的数据
  • skip:过滤前N项 take:取前N项
  • ignoreElements:忽略所有源Flowable产生的结果,只把onCompleted和onError事件通知给订阅者
  • throttleFirst:定期发射这个时间段里源Flowable发射的第一个数据,throttleFirst默认在computation调度器上执行
  • throttleWithTimeOut:通过时间来限流,发射时间间隔小于指定时间的数据会被过滤掉

4、组合操作符

  • startWith:在源Flowable发射的数据前面插上一些数据
  • merge:将多个Flowable合并到一个Flowable中进行发射(合并数据可能交错)
  • concat:合并发射(有序发射)
  • zip:合并数据,根据指定的函数变换它们,并发射一个新值
  • combineLastest:如果其中的一个Flowable还有数据没有发射,combineLastest将两个Flowable最新发射的数据组合在一起
Flowable<Integer> flowable1 = Flowable.just(1, 2, 3);
                Flowable<String> flowable2 = Flowable.just("a", "b", "c");
                Flowable.combineLatest(flowable1, flowable2, new BiFunction<Integer, String, String>() {
                    @Override
                    public String apply(Integer integer, String s) throws Exception {
                        return integer + s;
                    }
                }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("Tag", "combineLatest:" + s);
                    }
                });
输出:3a 3b 3c

5、辅助操作符

  • delay:让原始Flowable在发射每项数据之前都暂停一段指定的时间段
  • Do:
    新文档 2018-04-15_1.jpg
  • subscribeOn:指定自身在哪个线程上运行
  • observeOn:指定发射出的数据在哪个线程上运行(一般情况在主线程)
  • timeout:如果超时以一个onError终止这个Flowable,或者执行一个备用的Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> sub) throws Exception {
                        for (int i = 0; i < 4; i++) {
                            try {
                                Thread.sleep(i * 100);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            sub.onNext(i);
                        }
                        sub.onComplete();
                    }
                }, BackpressureStrategy.BUFFER).timeout(200, TimeUnit.MILLISECONDS, Flowable.just(10, 11))
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d("tag", "timeout:" + integer);
                            }
                        });
输出:0 1 2 10 11

6、错误处理操作符

  • catch:拦截原始Flowable的onError onErrorReturn onErrorResumeNext onExceptionResumeNext
Flowable.create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 5; i++) {
                            if (i > 2) {
                                e.onError(new Throwable("Throwable"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER).onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        return 6;
                    }
                }).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("tag", "onNext:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d("tag", "onError:" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("tag", "onComplete");
                    }
                });
输出:onNext:0 1 2 6 onComplete
  • retry:重试 传递最新的onError给观察者
Flowable.create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 5; i++) {
                            if (i == 1) {
                                e.onError(new Throwable("Throwable"));
                            } else {
                                e.onNext(i);
                            }
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER).retry(2).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("tag", "onNext:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d("tag", "onError:" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("tag", "onComplete");
                    }
                });
上面代码重新订阅次数为2,i=0调用注释1,重试2次同样调用1,这样一共调用3次onNext方法最后才会调用onError方法
输出:0 0 0 onError:Throwable 

7、条件操作符和布尔操作符

1、布尔操作符
  • all:根据一个函数对源发射的所有数据进行判断,最终返回的结果就是这个判断的结果
  • contains:判断源发射的数据是否包含某一个数据 包含返回true
  • isEmpty:判断源是否发射过数据 没发射返回true
2、条件操作符
  • amb:对于给定2个或多个Flowable,它只发射首先发射数据或通知的那个Flowable的所有数据
Flowable.ambArray(Flowable.just(1, 2, 3).delay(2, TimeUnit.SECONDS), Flowable.just(4, 5, 6))
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d("tag", "amb:" + integer);
                            }
                        });
输出:4 5 6
  • defaultIfEmpty:发射来自原始Flowable的数据,如果源没有发射数据,就发射一个默认数据

8、转换操作符

  • toList:将发射的数据转换成list
Flowable.just(1, 2, 3).toList().subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        for (int i : integers) {
                            Log.d("tag", "toList:" + i);
                        }
                    }
                });
  • toSortedList:对转换后的list排序,默认升序
Flowable.just(3, 1, 2).toSortedList().subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        for (int i : integers) {
                            Log.d("tag", "toSortedList:" + i);
                        }
                    }
                });
输出:1 2 3
  • toMap:转换成Map(默认HashMap)

二、RxJava使用场景,结合Okhttp、Retrofit

1、配置build.gradle

dependencies {
   ...
    implementation 'io.reactivex.rxjava2:rxjava:2.1.5'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'com.squareup.retrofit2:retrofit:2.3.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
}

2、网络请求接口

public interface LastFmApiService {

    String BASE_PARAMETERS_ARTIST = "?method=artist.getinfo&api_key=fdb3a51437d4281d4d64964d333531d4&format=json";

    @GET(BASE_PARAMETERS_ARTIST)
    Flowable<ArtistInfo> getArtistInfo(@Query("artist") String artist);
}

3、网络请求

public class RetrofitUtils {

    private static final String BASE_KU_GOU_URL = "http://lyrics.kugou.com/";
    private static final String BASE_LASTFM_URL = "http://ws.audioscrobbler.com/2.0/";
    private Retrofit retrofit;

    private RetrofitUtils(boolean is) {
        OkHttpClient builder = new OkHttpClient.Builder()
                .addInterceptor(new LoggingInterceptor())
                .connectTimeout(15, TimeUnit.SECONDS)
                .readTimeout(15, TimeUnit.SECONDS)
                // 失败重试
                .retryOnConnectionFailure(true)
                //.sslSocketFactory(SSLSocketClient.setCertificates())
                //.hostnameVerifier(SSLSocketClient.getHostnameVerifier())
                .build();
        /*
         * StringConverterFactory和GsonConverterFactory不能同时使用
         * 谁在前返回谁的类型(坑)
         * MapConverterFactory和StringConverterFactory可同时使用
         *
         */

        retrofit = new Retrofit.Builder()
                .client(builder)
                .baseUrl(is ? BASE_KU_GOU_URL : BASE_LASTFM_URL)
                //.addConverterFactory(MapConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .addConverterFactory(StringConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容