create
create操作符是所有创建型操作符的“根”,也就是说其他创建型操作符最后都是通过create操作符来创建Observable的
调用例子如下:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
} ).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Overridepublicvoid onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Overridepublicvoid onCompleted() {
System.out.println("Sequence complete.");
}
});
运行结果如下:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
from
from操作符是把其他类型的对象和数据类型转化成Observable
调用例子如下:
Integer[] items = { 0, 1, 2, 3, 4, 5 };
Observable myObservable = Observable.from(items);
myObservable.subscribe(
new Action1<Integer>() {
@Override
publicvoid call(Integer item) {
System.out.println(item);
}
},
new Action1<Throwable>() {
@Override
publicvoid call(Throwable error) {
System.out.println("Error encountered: " + error.getMessage());
}
},
new Action0() {
@Override
publicvoid call() {
System.out.println("Sequence complete");
}
}
);
运行结果如下:
0
1
2
3
4
5
Sequence complete
just
just操作符也是把其他类型的对象和数据类型转化成Observable,它和from操作符很像,只是方法的参数有所差别
调用例子如下:
Observable.just(1, 2, 3)
.subscribe(new Subscriber<Integer>() {
@Overridepublicvoid onNext(Integer item) {
System.out.println("Next: " + item);
}
@Overridepublicvoid onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Overridepublicvoid onCompleted() {
System.out.println("Sequence complete.");
}
});
运行结果如下:
Next: 1
Next: 2
Next: 3
Sequence complete.
defer
defer操作符是直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行,defer操作符能够保证Observable的状态是最新的
下面通过比较defer操作符和just操作符的运行结果作比较
Integer i = 10;
@SuppressWarnings("unchecked")
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable justObservable = Observable.just(i);
i = 12;
Observable deferObservable = Observable
.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(i);
}
});
i = 15;
justObservable.subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println("just result:" + o.toString());
}
});
deferObservable.subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
System.out.println("defer result:" + o.toString());
}
});
}
运行结果如下:
just result:10
defer result:15
可以看到,just操作符是在创建Observable就进行了赋值操作,而defer是在订阅者订阅时才创建Observable,此时才进行真正的赋值操作
注意:不能用int
timer
timer操作符是创建一串连续的数字,产生这些数字的时间间隔是一定的;这里有两种情况:
一种是隔一段时间产生一个数字,然后就结束,可以理解为延迟产生数字
一种是每隔一段时间就产生一个数字,没有结束符,也就是是可以产生无限个连续的数字
timer操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。
下面是调用例子:
Subscription observable = null;
@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.timer(2, 2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Long arg0) {
// TODO Auto-generated method stub
System.out.println("Next:" + arg0.toString());
}
});
}
@Override
protected void onPause() {
// TODO Auto-generated method stub
super.onPause();
if (observable != null) {
observable.unsubscribe();
}
}
运行结果如下:
Next:0
Next:1
Next:2
Next:3
……
注意:Observable.subscribe()方法可以返回一个Subscription的对象,即我们每次订阅都会返回,可以通过该对象,取消订阅。
interval
interval操作符是每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大;interval操作符的实现效果跟上面的timer操作符的第二种情形一样。
Subscription observable = null;
@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.interval(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Long arg0) {
// TODO Auto-generated method stub
System.out.println("Next:" + arg0.toString());
}
});
}
@Override
protected void onPause() {
// TODO Auto-generated method stub
super.onPause();
if (observable != null) {
observable.unsubscribe();
}
}
运行结果:
若不调用取消订阅,则会一直增加。
range
range操作符是创建一组在从n开始,个数为m的连续数字,比如range(3,10),就是创建3、4、5…12的一组数字
调用例子如下:
Subscription observable = null;
@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.range(0, 10).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Integer arg0) {
// TODO Auto-generated method stub
System.out.println(""+arg0);
}
});
}
运行结果:
repeat/repeatWhen
repeat操作符是对某一个Observable,重复产生多次结果
repeatWhen操作符是对某一个Observable,有条件地重新订阅从而产生多次结果
repeat和repeatWhen操作符默认情况下是运行在一个新线程上的,当然你可以通过传入参数来修改其运行的线程。
repeat调用例子如下:
//连续产生两组(1,2,3)的数字
Subscription observable = null;
@SuppressWarnings({ "unchecked", "deprecation" })
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
observable = Observable.range(1, 3).repeat(2).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
// TODO Auto-generated method stub
System.out.println("onCompleted");
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Integer arg0) {
// TODO Auto-generated method stub
System.out.println(""+arg0);
}});
}
运行结果如下:
说点什么吧
不得不说,Rx的操作符真的是太强大了,当然,也太庞大了,所以,每个都记住也是不可能的,至少我不行。
所以,我都是记录和学习一些常用的,很多用不到的暂时也不记录了,有需要再Google一波,或者直接去这里看
(英文原版)
http://reactivex.io/documentation/operators.html#alphabetical
额,英文不好的,也不慌,看这个,毕竟大多程序员英文....(并不包括我)
https://mcxiaoke.gitbooks.io/rxdocs/content/index.html
这些都详细的一批了。
这个RxJava的系列,最后一个文也是关于Rx操作符的,我也把这些操作符都写在了一个Demo(操作符大全)上,有兴趣的(额,当然,都是烂大街的了。。。)我就不说了吧。
https://github.com/GitHuborz/RxJavaOPDemo
更多操作符,请看下回分解吧~