1、基本概念
观察者:Observer,观察事件变化并处理的主要角色。消费者(Consumer)也可以理解成一种>特殊的观察者。
被观察者:触发事件并决定什么时候发送事件的主要角色。(异常和完成也是一种事件)●Observable、Flowable、Single、Completable、Maybe都是被观察者。
Flowable是支持背压的一种被观察者。
Single、Completable、Maybe是简化版的Observable。
几种被观察者通过toObservable/toFlowable/toSingle/toCompletable/toMaybe相互转换。
订阅(subscribe):观察者和被观察者建立关联的操作。
2、onError与onComplete为互斥事件。
3、基本操作符
基本操作符(https://blog.csdn.net/m0_46268254/article/details/139834767)
详细使用(https://blog.csdn.net/y2653904/article/details/135892911)
just(T...): 将给定的参数作为事件序列发出。(最多10个) fromArray不受限制
fromIterable(Iterable): 从一个可迭代对象(如 List 或 Set)创建一个 Observable。
create(ObservableOnSubscribe): 使用自定义逻辑创建一个 Observable。
timer操作符:可以做定时操作,就是延迟执行。时间间隔由timer控制。
interval 操作符:定时的周期性操作,与timer的区别就在于它可以重复操作。事件间隔由interval控制
subscribeOn(Scheduler): 指定 Observable 的订阅过程应该在哪个线程执行。
observeOn(Scheduler): 指定 Observer 的回调方法应该在哪个线程执行。来决定下游事件被处理时所处的线程。
4、过滤操作符
filter(Predicate): 只允许满足条件的项通过。
take(int): 只发出指定数量的项。
skip(int): 忽略序列开头的指定数量的项。
distinct(): 确保不会发出重复的项。
firstElement(): 只发出第一个元素或者如果没有元素则不发出任何东西。
lastElement(): 只发出最后一个元素或者如果没有元素则不发出任何东西。
5、转换操作符
map(Func1): ☆☆☆对每个项应用函数,并将结果发送给观察者。
flatMap(Func1): ☆☆☆对每个项应用函数,该函数返回一个新的 Observable,然后合并这些 Observables 的输出。
concatMap(Func1): 类似于 flatMap,但是它会按顺序合并 Observable 的输出。
buffer(): 收集来自原始 Observable 的项,并以批处理形式转发它们。
scan(Func2): 累积地应用函数到前一个结果和当前项上,并发出累积的结果。
6、组合操作符
zip(Observable, Func2): 按数量,将多个 Observables 的项组合在一起,并使用提供的函数来创建新的结果。
combineLatest(Observable, Func2): 按时间,当任意一个 Observable 发出新值时,将最新的值与另一个 Observable 的最新值结合。
startWith(T): 发送事件前追加发送事件,在原始 Observable 的序列之前添加一个或多个项。
按发送顺序:concat(四个)、concatArray(无限)。串行发送。
merge(Observable): 按时间,合并多个 Observables 的输出,但保持原始顺序(如果可能的话)。并行发送。
7、Scheduler线程控制
在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:
- Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;
- Schedulers.newThread(): 开启新线程操作;
- Schedulers.immediate(): 默认指定的线程,也就是当前线程;
- Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;
- AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;
public class MainActivity extends AppCompatActivity {
private final static String IMGPATH = "https://image.baidu.com/search/detail?";
private ImageView img;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
img = findViewById(R.id.img);
//创建Observable
Observable.just(IMGPATH)//发送图片地址
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(IMGPATH);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
})
.subscribeOn(Schedulers.io())//上面是异步
.observeOn(AndroidSchedulers.mainThread())//下面是主线程
.subscribe(new Observer<Bitmap>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bitmap bitmap) {
img.setImageBitmap(bitmap);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
8、未取消订阅而引起的内存泄漏
在Activity#onDestroy()的时候或者不需要继续执行的时候应该取消订阅
Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//发送事件
e.onNext("袁震");
//事件发送完成
e.onComplete();
}
});
observable.unsubscribeOn(AndroidSchedulers.mainThread());