一、RxJava操作符
1、创建操作符
- create、just、from
- interval:创建一个按固定时间间隔发射整数序列的Flowable,相当于定时器
- range:创建发射指定范围的整数序列的Flowable,可替代for循环
- repeat:创建一个N次重复发射特定数据的Flowable
2、变换操作符
- map:通过指定一个Function对象,转换成一个新的对象并发射
- flaMap:将Flowable发射的数据集合变换为Flowable集合,然后将这些Flowable发射的数据平坦化的放进一个单独的Flowable
- cast:强制将Flowable发射的所有数据转换为指定类型
- concatMap:与flatMap一致,解决了flatMap的交叉问题
- flatMapIterable:可将数据包装成Iterable
- buffer:将源Flowable变换为一个新的Flowable,这个新的Flowable每次发射一组列表值
- groupBy:分组元素
3、过滤操作符
- filter:对源Flowable产生的结果自定义规则进行过滤,只有满足条件的结果才会提交给订阅者
- elementAt:返回指定位置的数据 elementAtOrDefault(int,T)可以允许默认值
- distinct():去重 distinctUntilChanged去掉连续重复的数据
- skip:过滤前N项 take:取前N项
- ignoreElements:忽略所有源Flowable产生的结果,只把onCompleted和onError事件通知给订阅者
- throttleFirst:定期发射这个时间段里源Flowable发射的第一个数据,throttleFirst默认在computation调度器上执行
- throttleWithTimeOut:通过时间来限流,发射时间间隔小于指定时间的数据会被过滤掉
4、组合操作符
- startWith:在源Flowable发射的数据前面插上一些数据
- merge:将多个Flowable合并到一个Flowable中进行发射(合并数据可能交错)
- concat:合并发射(有序发射)
- zip:合并数据,根据指定的函数变换它们,并发射一个新值
- combineLastest:如果其中的一个Flowable还有数据没有发射,combineLastest将两个Flowable最新发射的数据组合在一起
Flowable<Integer> flowable1 = Flowable.just(1, 2, 3);
Flowable<String> flowable2 = Flowable.just("a", "b", "c");
Flowable.combineLatest(flowable1, flowable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("Tag", "combineLatest:" + s);
}
});
输出:3a 3b 3c
5、辅助操作符
- delay:让原始Flowable在发射每项数据之前都暂停一段指定的时间段
-
Do:
- subscribeOn:指定自身在哪个线程上运行
- observeOn:指定发射出的数据在哪个线程上运行(一般情况在主线程)
- timeout:如果超时以一个onError终止这个Flowable,或者执行一个备用的Flowable
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> sub) throws Exception {
for (int i = 0; i < 4; i++) {
try {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
sub.onNext(i);
}
sub.onComplete();
}
}, BackpressureStrategy.BUFFER).timeout(200, TimeUnit.MILLISECONDS, Flowable.just(10, 11))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("tag", "timeout:" + integer);
}
});
输出:0 1 2 10 11
6、错误处理操作符
- catch:拦截原始Flowable的onError
onErrorReturn
onErrorResumeNext
onExceptionResumeNext
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 5; i++) {
if (i > 2) {
e.onError(new Throwable("Throwable"));
}
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return 6;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d("tag", "onNext:" + integer);
}
@Override
public void onError(Throwable t) {
Log.d("tag", "onError:" + t.getMessage());
}
@Override
public void onComplete() {
Log.d("tag", "onComplete");
}
});
输出:onNext:0 1 2 6 onComplete
- retry:重试 传递最新的onError给观察者
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 5; i++) {
if (i == 1) {
e.onError(new Throwable("Throwable"));
} else {
e.onNext(i);
}
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER).retry(2).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d("tag", "onNext:" + integer);
}
@Override
public void onError(Throwable t) {
Log.d("tag", "onError:" + t.getMessage());
}
@Override
public void onComplete() {
Log.d("tag", "onComplete");
}
});
上面代码重新订阅次数为2,i=0调用注释1,重试2次同样调用1,这样一共调用3次onNext方法最后才会调用onError方法
输出:0 0 0 onError:Throwable
7、条件操作符和布尔操作符
1、布尔操作符
- all:根据一个函数对源发射的所有数据进行判断,最终返回的结果就是这个判断的结果
- contains:判断源发射的数据是否包含某一个数据 包含返回true
- isEmpty:判断源是否发射过数据 没发射返回true
2、条件操作符
- amb:对于给定2个或多个Flowable,它只发射首先发射数据或通知的那个Flowable的所有数据
Flowable.ambArray(Flowable.just(1, 2, 3).delay(2, TimeUnit.SECONDS), Flowable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("tag", "amb:" + integer);
}
});
输出:4 5 6
- defaultIfEmpty:发射来自原始Flowable的数据,如果源没有发射数据,就发射一个默认数据
8、转换操作符
Flowable.just(1, 2, 3).toList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
for (int i : integers) {
Log.d("tag", "toList:" + i);
}
}
});
- toSortedList:对转换后的list排序,默认升序
Flowable.just(3, 1, 2).toSortedList().subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
for (int i : integers) {
Log.d("tag", "toSortedList:" + i);
}
}
});
输出:1 2 3
二、RxJava使用场景,结合Okhttp、Retrofit
1、配置build.gradle
dependencies {
...
implementation 'io.reactivex.rxjava2:rxjava:2.1.5'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.squareup.retrofit2:retrofit:2.3.0'
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
}
2、网络请求接口
public interface LastFmApiService {
String BASE_PARAMETERS_ARTIST = "?method=artist.getinfo&api_key=fdb3a51437d4281d4d64964d333531d4&format=json";
@GET(BASE_PARAMETERS_ARTIST)
Flowable<ArtistInfo> getArtistInfo(@Query("artist") String artist);
}
3、网络请求
public class RetrofitUtils {
private static final String BASE_KU_GOU_URL = "http://lyrics.kugou.com/";
private static final String BASE_LASTFM_URL = "http://ws.audioscrobbler.com/2.0/";
private Retrofit retrofit;
private RetrofitUtils(boolean is) {
OkHttpClient builder = new OkHttpClient.Builder()
.addInterceptor(new LoggingInterceptor())
.connectTimeout(15, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
// 失败重试
.retryOnConnectionFailure(true)
//.sslSocketFactory(SSLSocketClient.setCertificates())
//.hostnameVerifier(SSLSocketClient.getHostnameVerifier())
.build();
/*
* StringConverterFactory和GsonConverterFactory不能同时使用
* 谁在前返回谁的类型(坑)
* MapConverterFactory和StringConverterFactory可同时使用
*
*/
retrofit = new Retrofit.Builder()
.client(builder)
.baseUrl(is ? BASE_KU_GOU_URL : BASE_LASTFM_URL)
//.addConverterFactory(MapConverterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.addConverterFactory(StringConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
}
}