基本使用
在build.gradle中加入配置,注意,rxJava和rxAndroid版本一定要相互兼容,不然可能会报错More than one file was found with OS independent path 'META-INF/rxjava.properties'
compile 'io.reactivex.rxjava2:rxjava:2.0.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
被观察者订阅观察者,当被观察者状态改变,可以通知观察者进行操作
第一种写法:
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
try {
//这三个方法分别对应着observer中的三个方
//法,调用哪个就执行observer中相应的方法
observableEmitter.onNext("星期一");
observableEmitter.onComplete();
} catch (Exception e) {
e.printStackTrace();
observableEmitter.onError(new NullPointerException("发生异常"));
}
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
LogUtils.i(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
LogUtils.i(TAG, "onNext");
}
@Override
public void onError(Throwable throwable) {
LogUtils.i(TAG, "onError");
}
@Override
public void onComplete() {
LogUtils.i(TAG, "onComplete");
}
};
observable.subscribe(observer);
第二种写法
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> observableEmitter) throws Exception {
LogUtils.i(TAG, "subscribe");
//这三个方法同样和下边三个是对应的
observableEmitter.onNext("onNext");
observableEmitter.onComplete();
observableEmitter.onError(new IllegalAccessException("发送错误"));
}
}).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
LogUtils.i(TAG, "subscribe(new Consumer() " + o.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
LogUtils.i(TAG, "new Consumer<Throwable>");
}
}, new Action() {
@Override
public void run() throws Exception {
LogUtils.i(TAG, "new Action()");
}
});
线程控制
RxJava如何切换线程?
Schedulers.immerdiate():
直接在当前线程运行,相当于不指定线程,这是默认的Scheduler
Schedulers.newThread():
总是启用新线程,并在新线程执行操作
Schedulers.io():
I/O操作(读写文件,读写数据库,网络信息交互)所使用的Scheduler,行为模式和newThread()差不多,区别在于io的内部实现使用一个无数量上线的线程池,可以重用空闲线程,因此多数情况下io()比newThread更有效率,不要把计算工作放在io()中,可以避免创建不必要的线程
Schedulers.computation():
计算所使用的Scheduler,这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形计算,这个Scheduler使用的固定的线程池,大小为CPU核数,不要把IO操作放在这里,否则等待时间会浪费cpu
AndroidSchedulers.mainThread():(rxAndroid中的类)
它指定的操作将在Android主线程运行
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> observableEmitter) throws Exception {
String url1 = "http://is.snssdk.com/2/essay/discovery/v3/?iid=6152551759&aid=7";
URL url = new URL(url1);
//得到connection对象。
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
//设置请求方式
connection.setRequestMethod("GET");
//连接
connection.connect();
//得到响应码
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
//得到响应流
InputStream inputStream = connection.getInputStream();
//将响应流转换成字符串
String result = stream2String(inputStream);//将流转换为字符串。
LogUtils.i(TAG, "onSuccess");
observableEmitter.onNext(result);
} else {
observableEmitter.onError(new Exception("失败"));
}
}
})
//指定网络请求在io线程,界面更新在主线程
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable disposable) {
LogUtils.i(TAG, "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
LogUtils.i(TAG, "onNext");
mTextView.setText(s);
}
@Override
public void onError(@NonNull Throwable throwable) {
LogUtils.i(TAG, "onError");
throwable.printStackTrace();
}
@Override
public void onComplete() {
LogUtils.i(TAG, "onComplete");
}
});
常用操作符
map
map的作用简单来说就是我输入一个数据类型的对象,转换得到另一个我想要的数据类型的对象,例如下边,传入integer,得到String。
map操作符对原始的Observable发送的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable
Observable.just(1).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return 1 + "哈哈,下雨了";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtils.i(TAG, s);
}
});
flatMap
flatMap操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后flatMap合并这些Observables发射的数据,最后将合并后的结果当作它自己的数据序列发射
Observable.just(getUserParams()).flatMap(new Function<UserParams, ObservableSource<LoginResult>>() {
@Override
public ObservableSource<LoginResult> apply(UserParams userParams) throws Exception {
//do something 模拟登录传入登录参数后获取到服务器返回值
LoginResult result = new LoginResult(userParams);
return Observable.just(result);
}
}).flatMap(new Function<LoginResult, ObservableSource<User>>() {
@Override
public ObservableSource<User> apply(LoginResult loginResult) throws Exception {
//do something 模拟从登录返回值中根据userid获取到用户信息,返回User对象
return Observable.just(new User());
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<User>() {
@Override
public void accept(User user) throws Exception {
//显示用户姓名
LogUtils.i(TAG, user.toString());
}
});
switchMap
将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
Observable.just(s).filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
if (!TextUtils.isEmpty(s) && s.contains("a")) {
return true;
}
return false;
}
})//switchMap和flatMap只有一点区别,在这个场景下,由于每次输入的文字变化都会进行搜索,而且搜索
//结果不一定是先请求的先返回,有可能是本来我要搜索abc,当我输入ab的时候进行了一次搜索,然后输入
//完abc又请求了一次,但是由于各种原因,第一次请求的结果返回晚于第二次,那么第一次搜索的不是我想要
//的结果,但是由于它返回的晚,反而把理想的搜索结果覆盖了,用switchMap可以解决这个问题,它会
//返回最近一次请求的结果,即便是由于上边的原因导致的问题
.switchMap(new Function<String, ObservableSource<List<String>>>() {
@Override
public ObservableSource<List<String>> apply(String s) throws Exception {
List<String> list = new ArrayList<String>();
list.add("搜索结果a");
list.add("搜索结果B");
return Observable.just(list);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
//将搜索结果展示在列表中
for (String string : strings) {
LogUtils.i(TAG, string);
}
}
});
今天忙了很久,也只是在使用上了解了一些rxJava的东西,下一次会从源码层面分析