gradle依赖
implementation "io.reactivex.rxjava2:rxjava:2.2.2"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
基本使用
创建被观察者Observale 对象
- 通过 创建操作符 Observale.create() 创建
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
}
});
- 通过 创建操作符 Observale.just()
Observable.just("1", "2");
创建观察者 Observer对象
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
- 订阅
observable.subscribe(observer);
- 不完整定义回调
省略observer, 直接定义onNext, onError(可选), onComplete(可选) 事件的消费者。
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "onNext " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "onError " + throwable);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, "onComplete ");
}
});
操作符
用于创建Observable对象
创建操作符
create: 从 自定义ObservableOnSubscribe对象中创建, 使用发射器 定义发射。
just: 从接收的参数 组成序列
from: 从callable future array对象 创建
interval: 按固定事件间隔 发射整数序列
range: 发射范围内所有整数序列
repeat: 创建重复发射n次的Observable
变换操作符
map: 指定Func对象,将Observable转换为一个新的Observable对象
flatmap: 将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable中。
buffer: 分组发送
groupBy:
过滤操作符
filter:
retrofit + rxjava2
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.4.0'
implementation 'com.squareup.retrofit2:converter-gson:2.4.0'
注: adpater-rxjava2
interface IpServiceForPort {
@FormUrlEncoded
@POST("getIpInfo.php")
Observable<IpModelEerror> getIpMsg(@Field("ip") String first);
}
接口定义
String url = "http://ip.taobao.com/service/";
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(url)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
IpServiceForPort s = retrofit.create(IpServiceForPort.class);
s.getIpMsg("128.2.3.4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<IpModelEerror>() {
@Override
public void accept(IpModelEerror ipModel) throws Exception {
Log.d(TAG, "ipModel " + ipModel);
}
});