过滤
过滤类型的操作符就比较简单啦,主要就是将数据进行筛选,比如我们最常用的:
filter
通常用这个操作符去过滤掉不需要的数据,保证下游只接收我们想要的,比如,我现在只想要2号店的所有销售人员的数据:
Flowable.fromIterable(cityStores)
.concatMap((Function<CityStore, Publisher<Salesman>>) cityStore -> {
int delay = 0;
if (1 == cityStore.getCityCtoreId()) {
delay = 500;
}
return Flowable.fromIterable(cityStore.getSalesman()).delay(delay, TimeUnit.MILLISECONDS);
})
.filter(salesman -> salesman.getCityStoreId() == 2)
.subscribe(salesman -> Log.d(TAG, salesman.getCityStoreId() + "店的"
+ salesman.getSalesManId() + "号的业绩为:"
+ salesman.getSalesPerformance() + "元"));
// 2店的0号的业绩为:60元
// 2店的1号的业绩为:27元
// 2店的2号的业绩为:33元
完全不用写for循环判断了!!
debounce
翻译过来就是防抖动的意思,其含义为,当我们发送了一次数据,在规定的时间内,又发送了新的数据,那么这一次发送的数据会被丢掉,先来看个例子
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
emitter.onNext("张三");
Thread.sleep(299);
emitter.onNext("李四");
Thread.sleep(300);
emitter.onNext("王五");
Thread.sleep(350);
emitter.onNext("赵六");
Thread.sleep(250);
emitter.onNext("费七");
Thread.sleep(100);
emitter.onNext("陈八");
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER)
.debounce(300, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
Log.d(TAG, str);
}
});
// 李四
// 王五
// 陈八
这边我们规定的时间为300毫秒,
- 当我们发送了张三的时候,经过了299毫秒,小于300,又发送了李四,张三丢掉
- 当发送了李四,经过300毫秒,又发送了王五 满足条件,李四保留,王五同理,赵六,费七同样不满足条件,舍弃,陈八最后一个发送 后面没数据了,肯定也满足条件保留,所以打印出来的结果就是:李四,王五,陈八
- 另外:如果用flowable配合create操作符,是要指定背压模式的.
distinct
去重,单纯的调用distinct(),就是去掉重复发射的元素
List<Integer> distinctList = Arrays.asList(1, 1, 3, 3, 5);
Flowable.fromIterable(distinctList)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG,integer+"");
}
});
// 1
// 3
// 5
还有一个重载的方法可以根据变量去重
List<DistinctBean> distinctBeans = new ArrayList<>();
distinctBeans.add(new DistinctBean("aaaa"));
distinctBeans.add(new DistinctBean("bbbb"));
distinctBeans.add(new DistinctBean("aaaa"));
distinctBeans.add(new DistinctBean("cccc"));
distinctBeans.add(new DistinctBean("bbbb"));
Flowable.fromIterable(distinctBeans)
.distinct(new Function<DistinctBean, String>() {
@Override
public String apply(DistinctBean distinctBean) throws Exception {
return distinctBean.str;
}
})
.subscribe(distinctBean -> Log.d(TAG, distinctBean.toString()));
// DistinctBean{str='aaaa'}
// DistinctBean{str='bbbb'}
// DistinctBean{str='cccc'}
也就是我们可以在apply方法中去指定一个key,来判断是否是重复的数据
ElementAt
获取指定位置的发射数据,索引是从0开始
List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
Flowable.fromIterable(mList)
.elementAt(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integer + "");
}
});
// 4
- IgnoreElements 如果用这个的话,就不发送任何数据了
first
只发射第一个数据
List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
Flowable.fromIterable(mList)
.first(1)
.subscribe(integer -> Log.d(TAG, integer + ""));
// 1
如果用first的话,有个参数,这个参数是默认的item索引,也就是说原数据一直没发送,那么会发送默认索引上的值,不要参数的话可以换成firstElement
last
只发射最后一个数据
Flowable.fromIterable(mList)
.last(1)
.subscribe(integer -> Log.d(TAG, integer + ""));
// 5
跟first完全一致,不要参数的话换成 lastElement
sample
sample就是抽样的意思,我们可以指定在某一个时间,对发射的数据进行采集,采集数据的标准时之前发射阶段的最有一个数据,比如:
Flowable.interval(1000, TimeUnit.MILLISECONDS)
.sample(3000, TimeUnit.MILLISECONDS)
.subscribe(aLong -> Log.d(TAG, aLong + ""));
// 1
// 4
// 7
// 10
// ..
我们开一个定时任务,1秒钟发送一个数字(从0开始),3秒之后采集到最后一个发射数据1,再过3秒采集到了4,依次类推..
skip/skipLast
- skip以一个值为标准,之前的抑制,之后的发射
Flowable.fromIterable(mList).skip(2)
.subscribe(integer -> Log.d(TAG,integer.toString()));
// 3
// 4
// 5
另外,我们要说一下skip的重载方法
- Javadoc: skipLast(long,TimeUnit)
- Javadoc: skipLast(long,TimeUnit,Scheduler)
举个例子
Flowable.interval(1000,TimeUnit.MILLISECONDS)
.skip(3000,TimeUnit.MILLISECONDS)
.subscribe(aLong -> Log.d(TAG, aLong.toString()+",当前线程:"
+Thread.currentThread().getName()));
// 3,当前线程:RxComputationThreadPool-2
// 4,当前线程:RxComputationThreadPool-2
// 5,当前线程:RxComputationThreadPool-2
// ...
这里我们用到了两个参数的重载方法,我们很明显可以看到这个方法skip的值为时间值,示例为跳过前3秒的发送数据,而且输出环境在子线程里,如果想指定线程的话,可以用第三个参数进行指定.
- skipLast与skip正相反,以一个值为标准,抑制后面的发射数据,而其重载方法与skip是一致的.
Flowable.fromIterable(mList).skipLast(2)
.subscribe(integer -> Log.d(TAG,integer.toString()));
// 1
// 2
// 3
take/takeLast
一个是只发送前面n项,一个正相反,发送后面n项
Flowable.fromIterable(mList).take(2)
.subscribe(integer -> Log.d(TAG,integer.toString()));
// 1
// 2
Flowable.fromIterable(mList).takeLast(2)
.subscribe(integer -> Log.d(TAG,integer.toString()));
// 4
// 5
take/takeLast也是有重载方法的,其默认线程也在子线程中(computation),跟skip没区别,我们就不在举例了
组合
zip
官方翻译:zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据,什么意思?先举一个简单的小例子:
Flowable.zip(Flowable.just("1"), Flowable.just("A", "B", "C"),
(s1, s2) -> s1 + "-----" + s2).subscribe(s -> Log.d(TAG, s));
//1----A
- 就是将发射的多个Flowable/Observable的数据按顺序组合在一起,并且因为第一个Flowable里面只有一个数据,所以第二个Flowable的"B","C"就都不发射了.
zip是rxJava2.0+才有的,它可以解决了复杂页面多接口调用的问题,也就是说一个页面好多接口,如果都响应完了才展示页面的话,就不用同步去请求,我们可以用zip同时发,其原理是把多个Observable组合成新的Observable,比如首页获取个人信息,获取banner,获取活动列表三个接口,就可以:
Flowable<BaseDataResponse<List<Banner>>> bannerFlowable = ApiService.getbanner();
Flowable<BaseDataResponse<List<Activity>>> activityFlowable = ApiService.getActivityInfo();
Flowable<BaseDataResponse<Member>> memberFlowable = ApiService.getMemberInfo();
Flowable.zip(bannerFlowable,
activityFlowable,
memberFlowable,
new Function3<BaseDataResponse<List<Banner>>,
BaseDataResponse<List<Activity>>,
BaseDataResponse<Member>, Map<String, Object>>() {
@Override
public Map<String, Object> apply(BaseDataResponse<List<Banner>> bannerResponse,
BaseDataResponse<List<Activity>> activityResponse,
BaseDataResponse<Member> memberResponse) throws Exception {
HashMap<String, Object> map = new HashMap<>(3);
map.put("HOME_BANNER", bannerResponse);
map.put("HOME_ACTIVITY", activityResponse);
map.put("HOME_MEMBER", memberResponse);
return map;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Map<String, Object>>() {
//1.正常这边应该有一个自定义的Observer,可以用subscribeWith 自定义一个Observer,处理数据异
//2.由于每个公司的数据形式也不统一,我们举例子就不写了
@Override
public void accept(Map<String, Object> dataMap) throws Exception {
BaseDataResponse < List < Banner >> BannerResponse = cast(dataMap.get("HOME_BANNER"));
if(BannerResponse!=null){
List<Banner> bannerList = BannerResponse.getData();
// mView.showBanner(bannerList);
}
// ..
}
});
首先定义三个接口flowable,然后分别作为zip方法的前三个参数,而function方法中,通常会定义一个基本的数据类型封装类,其泛型为三个接口返回的数据类型,第四个是做为apply方法的返回值的,我们可以定义Map,List,Object,都没有问题,其主要是封装返回值,这里我们用的是map,
接下来线程转换,在accept中取出每一个response 转换成我们要的数据,展示,也可以用lamada简化一下:
Flowable.zip(bannerFlowable, activityFlowable, memberFlowable,
(bannerResponse, activityResponse, memberResponse) -> {
HashMap<String, Object> map = new HashMap<>(3);
map.put("HOME_BANNER", bannerResponse);
map.put("HOME_ACTIVITY", activityResponse);
map.put("HOME_MEMBER", memberResponse);
return map;
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(dataMap -> {
BaseDataResponse<List<Banner>> BannerResponse = cast(dataMap.get("HOME_BANNER"));
if (BannerResponse != null) {
List<Banner> bannerList = BannerResponse.getData();
// mView.showBanner(bannerList);
}
// ...
});
merge
merge就是合并的意思,上文用zip实现的例子同样也可以用merge实现,比如:
Flowable<BaseDataResponse<List<Banner>>> bannerFlowable = ApiService.getbanner();
Flowable<BaseDataResponse<List<Activity>>> activityFlowable = ApiService.getActivityInfo();
Flowable<BaseDataResponse<Member>> memberFlowable = ApiService.getMemberInfo();
Flowable.merge(bannerFlowable, activityFlowable, memberFlowable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<BaseDataResponse<? extends Object>>() {
@Override
public void accept(BaseDataResponse<?> baseDataResponse) throws Exception {
//baseDataResponse->banner
//baseDataResponse->activity
//baseDataResponse->member
}
});
不同的是,zip是打包封装(我们封装成了map一并返回),而merge中的accept方法要走三次,需要我们进行分别判断,我们可以看一下merge的源码:
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> merge(Publisher<? extends T> source1, Publisher<? extends T> source2, Publisher<? extends T> source3) {
ObjectHelper.requireNonNull(source1, "source1 is null");
ObjectHelper.requireNonNull(source2, "source2 is null");
ObjectHelper.requireNonNull(source3, "source3 is null");
return fromArray(source1, source2, source3).flatMap((Function)Functions.identity(), false, 3);
}
很明显 返回值的是通过fromArray轮流发送在进行flatMap进行变换,再举个例子就很清楚了
Flowable.merge(Flowable.just("1"), Flowable.just("A", "B", "C"))
.subscribe(s -> {
Log.d(TAG, s);
//1
//A
//B
//C
});
这里有两个问题需要说一下
1.merge是轮流发的,如果有10个要发送,那么第一个抛异常,第二个就不走了,如果想让下面的继续发,需要将merge换成MergeDelayError
2.merge一但我们在发送过程中延迟发送那么发送的数据会显示在最后面,如果想保证顺序,需要将merge换成concat
combineLatest
这个操作符基本用于校验,zip是组合每一个发射的Flowable,combineLatest是只要其中一个发射的时候,他会找其他已经发射过的Flowable的最后一个数据,进行组合,比如:
Flowable<Integer> flowable1 = Flowable.just(1, 2, 3, 4, 5);
Flowable<String> flowable2 = Flowable.just("A", "B", "C");
Flowable<String> flowable3 = Flowable.just("100", "200");
Flowable.combineLatest(flowable1, flowable2, flowable3, new Function3<Integer, String, String, String>() {
@Override
public String apply(Integer integer, String s, String s2) throws Exception {
return integer + ":" + s + ":" + s2;
}
}).subscribe(string -> Log.d(TAG, string));
// 5,C,100
// 5:C:200
flowable1, flowable2都已经发射过了,到flowable3,当它发射100的时候,5,"c",分别对应了最近发射的数据,所以按apply中的方式进行组合在一起输出,那实际运用当中,我可以用它进行多项条件的判定,都满足,再进行下一步
join
官方解读:任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据,等于加了个时间的范畴,也就是两个Observable,在发射的数据都是有有效期的,同在有效期之内,就组合,比如:
Flowable<Long> baseFlowable = Flowable.interval(1000, TimeUnit.MILLISECONDS);
Flowable<String> flowable = Flowable.just("A", "B", "C", "D");
baseFlowable.join(flowable, new Function<Long, Publisher<Long>>() {
@Override
public Publisher<Long> apply(Long aLong) throws Exception {
Log.d(TAG, "===left:" + aLong);
return Flowable.timer(2000, TimeUnit.MILLISECONDS);
}
}, new Function<String, Publisher<Long>>() {
@Override
public Publisher<Long> apply(String s) throws Exception {
Log.d(TAG, "===right:" + s);
return Flowable.timer(5000, TimeUnit.MILLISECONDS);
}
}, new BiFunction<Long, String, String>() {
@Override
public String apply(Long aLong, String s) throws Exception {
return aLong + "----" + s;
}
}).subscribe(string -> Log.d(TAG, string));
// ===right:A
// ===right:B
// ===right:C
// ===right:D
// ==left:0
// 0----A
// 0----B
// 0----C
// 0----D
// ===left:1
// 1----A
// 1----B
// 1----C
// 1----D
// ...
// 4----A
// 4----B
// 4----C
// 4----D
// ===left:5
// ===left:6
// ===left:7
// ....
我们通过观察不难发现
- 首先将第二个flowable发射的数据依次设置了5秒的有效期
- 然后从base开始依次发射0,1,2..并给予其2秒有效期(这个写多少都可以)
- 所以都在5秒有效期之内,分别跟A,B,C,D进行组合,输出结果
startWith
在数据序列的开头插入一条指定的项
List<Integer> mList = Arrays.asList(1, 2, 3, 4, 5);
Flowable.fromIterable(mList).startWith(100).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, integer.toString());
}
});
// 100
// 1
// 2
// ...
当然,还可以这样:
Flowable.fromIterable(mList).startWith(Arrays.asList(100,200))
.subscribe(integer -> Log.d(TAG, integer.toString()));
// 100
// 200
// 1
// 2
// ...
Flowable.fromIterable(mList).startWith(Flowable.just(1000,2000))
.subscribe(integer -> Log.d(TAG, integer.toString()));
// 1000
// 2000
// 1
// 2
// ...
switchOnNext
订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个,Switch返回的这个Observable取消订阅前一个发射数据的Observable,开始发射最近的Observable发射的数据。注意:当原始Observable发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅之前的那个Observable。这意味着,在后来那个Observable产生之后到它开始发射数据之前的这段时间里,前一个Observable发射的数据将被丢弃.
在这边我们借鉴了一下其他人写的例子:
Flowable<Flowable<Long>> flowable = Flowable.interval(500, TimeUnit.MILLISECONDS)
.map(new Function<Long, Flowable<Long>>() {
@Override
public Flowable<Long> apply(Long aLong) throws Exception {
Log.d(TAG, "====fu: "+aLong );
return Flowable.interval(0,200,TimeUnit.MILLISECONDS)
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) throws Exception {
Log.d(TAG, "===zi: "+aLong );
return aLong * 10;
}
}).take(5);
}
}).take(2);
Flowable.switchOnNext(flowable)
.subscribe(aLong -> Log.d(TAG, "onNext: SwitchOnNext "+aLong));
// ====fu: 0
// ===zi: 0
// onNext: SwitchOnNext 0
// ===zi: 1
// onNext: SwitchOnNext 10
// ===zi: 2
// onNext: SwitchOnNext 20
// ====fu: 1
// ===zi: 0
// onNext: SwitchOnNext 0
// ===zi: 1
// onNext: SwitchOnNext 10
// ===zi: 2
// onNext: SwitchOnNext 20
// ===zi: 3
// onNext: SwitchOnNext 30
// ===zi: 4
// onNext: SwitchOnNext 40
这个例子写的非常好,等于是一个Flowable循环嵌套,最外层只发送两次,当第二次发送数据,内层订阅之后,第一次还没来得及发的数据就被舍弃掉了.
到这边Rxjava的操作符基本都写完了,还有一些辅助,订阅,切换线程之类的比较简单,而且我们在示例中都有使用,就不再单独拎出来一一说明了.能看到这的绝对是真爱,爱你!!!
用的测试代码,请戳这里