前言
通过前两篇文章对于RxJava概念,原理及使用的学习,想必各位码友已经基本掌握RxJava的逻辑与功能了,那么从这篇文章开始我们来研究RxJava的各类操作符。
什么是操作符?通过之前的学习,我们发现Observable负责发送事件,Observer负责接收事件,而这个过程中想要对事件数据做出修改就需要交给操作符来负责啦。主流RxJava中操作符主要分为三类:转换操作符,过滤操作符,组合操作符。而我个人将用来创建Observable的操作符归为了一个新类型。本篇我们就来看看常用的创建操作符都有哪些以及如何使用。
创建操作符
Create
上一篇文章的例子中我们已经实践了create操作符如何使用了,这里我们介绍一种便捷的创建Observer的方式。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext("xulei" + i);
}
subscriber.onCompleted();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("rx_test", "create:" + s);
}
});
subscribe()的入参使用Action1代替原来的Observer,只需重写一个call()方法,等同于原Observer中onNext()方法。如果需要onComplete与onError状态,还可以如下:
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("rx_test", "create:" + s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e("rx_test", "onError:" + throwable.getMessage());
}
}, new Action0() {
@Override
public void call() {
Log.e("rx_test", "onCompleted");
}
});
使用new ActionX代替new Observer,代码是不是看起来更加灵活与简洁呢。
Just
just操作符可将某个或某些对象转化为Observable对象,并将其发射出去。参数可为一个或多个数字,字符串。也可为集合,数组,Iterate对象等。
Observable.just(1, 2, 3, 4, 5, 6).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("rx_test", "just:数字:" + integer);
//数字或者字符串都是单个发射多次
}
});
List<String> stringList = new ArrayList<>();
stringList.add("Hello");
stringList.add("Ha");
stringList.add("RxJava");
Observable.just(stringList).subscribe(new Action1<List<String>>() {
@Override
public void call(List<String> strings) {
Log.e("rx_test", "just:集合:" + strings.toString());
//集合或数组是直接发射集合整体,不会拆分
}
});
输出结果:
just:数字:1
just:数字:2
just:数字:3
just:数字:4
just:数字:5
just:数字:6
just:集合:[Hello, Ha, RxJava]
From
from操作符可将某个对象转化为Observable对象,并且将其发射出去。不同于just,他接收集合或数组,并可将集合数组遍历之后拆分发送。
List<String> stringList = new ArrayList<>();
stringList.add("Hello");
stringList.add("Ha");
stringList.add("RxJava");
Observable.from(stringList).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("rx_test", "from:" + s);
}
});
输出结果:
from:Hello
from:Ha
from:RxJava
Range
range(int start, int count)操作符,根据初始值start,与数量count,发射count次以start为基数依次增加的值。
Observable.range(4, 5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("rx_test", "range:" + integer);
}
});
输出结果:
range:4
range:5
range:6
range:7
range:8
Defer
defer操作符功能类似于just操作符,不同之处在于defer只有在调用subscribe()方法进行订阅时才创建Observable,而just操作符在初始化Observable就已经创建了,且只创建一个Observable实例。这里我们通过与just对比进行实践。
Action1<String> action1 = new Action1<String>() {
@Override
public void call(String s) {
Log.e("rx_test", s);
}
};
//defer
Observable<String> deferObservable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
Object o = new Object();
return Observable.just("defer:hashCode:" + o.hashCode());
}
});
deferObservable.subscribe(action1);
deferObservable.subscribe(action1);
deferObservable.subscribe(action1);
//just
Observable<String> justObservable = Observable.just("just:hashCode:" + new Object().hashCode());
justObservable.subscribe(action1);
justObservable.subscribe(action1);
justObservable.subscribe(action1);
输出结果:
defer:hashCode:112449879
defer:hashCode:118897732
defer:hashCode:191664429
just:hashCode:121878114
just:hashCode:121878114
just:hashCode:121878114
由输出结果我们可以看出defer每次输出的Observable哈西值是不同的,说明其每subscribe订阅一次都会创建一个新的Observable,从而可保证Observable中的数据都是最新的。而just只有初始化的时候创建一次Observable。
Interval
interval创建操作符,创建一个Observabel并每隔一段时间周期发射一个由0开始增加的数字。
注意:此Observabel是运行在新的线程,所以更新UI需要在主线程中订阅
//每隔100ms发射一个数字,从0自增
Observable.interval(100, TimeUnit.MILLISECONDS) //单位为毫秒
.observeOn(AndroidSchedulers.mainThread())
.take(5) //取前5次事件发射,take为过滤操作符,后期会详细讲
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("rx_test", "interval:" + aLong);
}
});
输出结果:
100ms后...
interval:0
100ms后...
interval:1
100ms后...
interval:2
100ms后...
interval:3
100ms后...
interval:4
Timer
timer操作符,创建一个Observable并隔一段时间后发射一个特殊的值,仅发射一次。
注意:此Observabel是运行在新的线程,所以更新UI需要在主线程中订阅
//隔1s后发射一个数字
Observable.timer(1, TimeUnit.SECONDS) //单位为秒
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e("rx_test", "timer:" + aLong);
}
});
输出结果:
1秒后...
timer:0
Delay
delay操作符,可用于延迟一定时长再发送事件。
//延迟2秒后发射事件
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e("rx_test", "delay:" + integer);
}
});
输出结果:
2秒后...
delay:1
delay:2
delay:3
Repeat
repeat(long count)操作符,将Observable重复发射count次。
//重复发射5次“Sherlock”
Observable.just("Sherlock").repeat(5)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("rx_test", "repeat:" + s);
}
});
输出结果:
repeat:Sherlock
repeat:Sherlock
repeat:Sherlock
repeat:Sherlock
repeat:Sherlock
以上就是常用的一些创建操作符,类似但不常用的还有empty、never、error等等就不一一介绍了,有兴趣的码友可以自行百度。
线程调度Scheduler
RxJava就是用来处理异步任务的,所以就牵扯到生产事件所在线程,处理事件所在线程的问题,下面来看一下RxJava提供的线程调度Scheduler都有哪些。
有了Scheduler,RxJava当然也提供了方法来使用它们。
.subscribeOn()指定被观察者Observable的执行线程。
.observeOn()指定观察者Observer的执行线程。
如第一篇文章中的例子:
//获取要查询的小区集合
Observable.from(getCommunitiesFromServer())
.flatMap(new Func1<Community, Observable<House>>() {
@Override
public Observable<House> call(Community community) {
return Observable.from(community.getHouses());
}
})
.filter(new Func1<House, Boolean>() {
@Override
public Boolean call(House house) {
return house.getPrice() < 200;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<House>() {
@Override
public void call(House house) {
//显示查询出来的房源信息
ShowSearchedHousesMessage();
}
});
其中.subscribeOn(Schedulers.io())
指定了Observable在io线程运行,通常用来执行从服务器获取数据,数据库加载等耗时操作。.observeOn(AndroidSchedulers.mainThread())
指定了Observer在Android环境下的UI线程运行,通常用来获取到数据后进行UI刷新的操作。可根据实际需求选择不同线程类型。
总结
到此,本篇关于RxJava的创建类操作符以及线程调度就讲解完毕了,下一篇我们将一起研究RxJava的四类操作符中的转换操作符都有哪些以及如何使用。
技术渣一枚,有写的不对的地方欢迎大神们留言指正,有什么疑惑或者建议也可以在我Github上RxJavaDemo项目Issues中提出,我会及时回复。
附上RxJavaDemo的地址:
RxJavaDemo