1.Android为例, 一个Activity的所有动作默认都是在主线程中运行的, 所有耗时操作要在线程实现. 所以在开发的时候,经常来来切换线程. 导致项目的代码的难以维护. 而Rxjava 很好的解决这个线程切换.
在RxJava 在切换线程时用到了两个方法 subscribeOn() 和 observeOn() 下面来分别解释一下这两个方法
subscribeOn:
影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用.
observeOn:
影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 ####observeOn;
默认情况下, 事件序列操作的线程与调用.subscribe()的线程一致
写一个栗子来实践一下.
//创建一个被观察者
Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("---->", "我在什么线程" + Thread.currentThread().getName());
Log.d("---->", "发射数据" + 1);
emitter.onNext(1);
}
});
//创建观察者
Observer<Integer> observer = new Observer<Integer>(){
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d("---->", "我在什么线程" + Thread.currentThread().getName());
Log.d("---->","结果"+integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//创建联系
integerObservable.subscribe(observer);
在主线程中分别创建被观察者和观察者, 然后将他们连接在一起, 同时分别打印出它们所在的线程, 运行结果为:
06-01 20:13:40.100 1441-1441/view.dome.com.rxjavadome D/---->: 我在什么线程main
发射数据1
我在什么线程main
结果1
说明默认是在同一个线程工作.
这样肯定是满足不了我们的需求的, 我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI.
当使用了 RXJava 里面
//创建联系
integerObservable
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
结果就不一样了
06-01 20:25:00.210 1657-1671/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
发射数据1
06-01 20:25:00.330 1657-1657/view.dome.com.rxjavadome D/---->: 我在什么线程main
结果1
发现 第一次进那个是新的线程,而结果是的数据在还在主线,
当我在subscribeOn 里面多次切换线程, observeOn 在里面也多次切换线程会是怎么样.
//创建联系
integerObservable
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(observer);
结果:
06-01 20:38:23.930 1756-1771/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
发射数据1
06-01 20:38:24.070 1756-1772/view.dome.com.rxjavadome D/---->: 我在什么线程RxCachedThreadScheduler-2
结果1
发现 只是最后的一个线程起作用了,
当我们把observeOn 里面的线程位置换一下,看一下
integerObservable
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
结果
06-01 20:41:42.510 1823-1838/view.dome.com.rxjavadome D/---->: 我在什么线程RxNewThreadScheduler-1
发射数据1
06-01 20:41:42.630 1823-1823/view.dome.com.rxjavadome D/---->: 我在什么线程main
结果1
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
在Retrofit2.0 跟RxJava 结合使用.
- gradle 的依赖添加
// Android 支持 Rxjava
implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
// Retrofit库
implementation 'com.squareup.retrofit2:retrofit:2.4.0'
// Okhttp库
implementation 'com.squareup.okhttp3:okhttp:3.10.0'
//Gson解析
implementation'com.squareup.retrofit2:converter-gson:2.4.0'
// 此处一定要注意使用RxJava2的版本
implementation 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
//Log
implementation 'com.squareup.okhttp3:logging-interceptor:3.9.0'
2.数据结构.
{
"status":1,
"content":{
"from":"en-EU",
"to":"zh-CN",
"vendor":"tencent",
"out":"嗨世界",
"ciba_use":"来自机器翻译。",
"ciba_out":"",
"err_no":0
}
}
编写 数据Bean
package view.dome.com.rxjavadome.bean;
/**
* 数据bean
*/
public class Translation {
/**
* status : 1
* content : {"from":"en-EU","to":"zh-CN","vendor":"tencent","out":"嗨世界","ciba_use":"来自机器翻译。","ciba_out":"","err_no":0}
*/
private int status;
private ContentBean content;
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public ContentBean getContent() {
return content;
}
public void setContent(ContentBean content) {
this.content = content;
}
public static class ContentBean {
/**
* from : en-EU
* to : zh-CN
* vendor : tencent
* out : 嗨世界
* ciba_use : 来自机器翻译。
* ciba_out :
* err_no : 0
*/
private String from;
private String to;
private String vendor;
private String out;
private String ciba_use;
private String ciba_out;
private int err_no;
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to;
}
public void setTo(String to) {
this.to = to;
}
public String getVendor() {
return vendor;
}
public void setVendor(String vendor) {
this.vendor = vendor;
}
public String getOut() {
return out;
}
public void setOut(String out) {
this.out = out;
}
public String getCiba_use() {
return ciba_use;
}
public void setCiba_use(String ciba_use) {
this.ciba_use = ciba_use;
}
public String getCiba_out() {
return ciba_out;
}
public void setCiba_out(String ciba_out) {
this.ciba_out = ciba_out;
}
public int getErr_no() {
return err_no;
}
public void setErr_no(int err_no) {
this.err_no = err_no;
}
}
}
3.写Observable<..>接口形式
public interface GetRequest_Interface {
@GET("ajax.php?a=fy&f=auto&t=auto&w=hi%20world")
Observable<Translation> getCall();
// 注解里传入 网络请求 的部分URL地址
// Retrofit把网络请求的URL分成了两部分:一部分放在Retrofit对象里,另一部分放在网络请求接口里
// 如果接口里的url是一个完整的网址,那么放在Retrofit对象里的URL可以忽略
// 采用Observable<...>接口
// getCall()是接受网络请求数据的方法
}
4.编写Retrofit跟Rx结合使用
//定制OkHttp
OkHttpClient.Builder httpClientBuilder = new OkHttpClient
.Builder();
if (BuildConfig.DEBUG) {//发布版本不再打印
// 日志显示级别
HttpLoggingInterceptor.Level level= HttpLoggingInterceptor.Level.BODY;
//新建log拦截器
HttpLoggingInterceptor loggingInterceptor=new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() {
@Override
public void log(String message) {
Log.e("Tag",message);
}
});
loggingInterceptor.setLevel(level);
//OkHttp进行添加拦截器loggingInterceptor
httpClientBuilder.addInterceptor(loggingInterceptor);
}
OkHttpClient client = httpClientBuilder.build();
Retrofit retrofit = new Retrofit.Builder() //创建retrofit
.baseUrl("http://fy.iciba.com/") // 设置 网络请求 Url
.addConverterFactory(GsonConverterFactory.create()) //设置使用Gson解析(记得加入依赖)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // 支持RxJava
.client(client) //添加拦截器
.build();
// 创建 网络请求接口 的实例
GetRequest_Interface request = retrofit.create(GetRequest_Interface.class);
//采用Observable<...>形式 对 网络请求 进行封装
Observable<Translation> observable = request.getCall();
observable.subscribeOn(Schedulers.io()) //在子线程请求网络
.observeOn(AndroidSchedulers.mainThread()) //在主线更新数据
.subscribe(new Observer<Translation>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("----->", "开始采用subscribe连接");
}
@Override
public void onNext(Translation translation) {
Log.d("----->", "对返回的数据进行处理");
if (translation.getStatus()==1){ //成功
Translation.ContentBean content = translation.getContent();
if (content!=null){
String ciba_out = content.getOut();
Toast.makeText(MainActivity.this,ciba_out,Toast.LENGTH_SHORT).show();
}
}else {
Toast.makeText(MainActivity.this,"请求失败",Toast.LENGTH_SHORT).show();
}
}
@Override
public void onError(Throwable e) {
Log.d("----->", "请求失败"+e.toString());
}
@Override
public void onComplete() {
Log.d("----->", "请求成功");
}
});