最近再看一个项目,但是那个项目里面的Rxjava是1.x版本的,由于最近又有一个项目要开始了,在封装各种基类,所以我准备将项目中的Rxbus用Rxjava2.x来修改一下继续用,由于2.x的Rxjava进行了代码的重构,所以在这里写下我的一些收集与写法
什么是Rxbus
先说说Rxjava
官方解释:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava是Reactive Extensions的Java VM实现:用于通过使用observable序列来组合异步和基于事件的程序的库。
关于Rxjava的使用和理解可以参看:https://github.com/ReactiveX/RxJava
那我们的Rxbus呢?
RxBus:处理应用程序间各个组件的通信,或者组件与组建之间的数据传递。
其实说到底,RxBus学的是一种思路,而并不是给你一个现成的库,然后直接去调用。需要的是自己来封装。
Rxbus的简单实现
导包
compile 'io.reactivex.rxjava2:rxjava:2.1.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Rxbus类
public class RxBus {
// 主题
private final Subject<Object> bus;
// PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
private RxBus() {
bus = PublishSubject.create().toSerialized();
}
// 静态内部类(单例模式的内部类实现方法)
private static class SingletonHolder {
public final static RxBus sInstance = new RxBus();
}
// 单例RxBus
public static RxBus getDefault() {
return SingletonHolder.sInstance;
}
/**
* 提供了一个新的事件
* 发布
* @param o
*/
public void post(Object o) {
bus.onNext(o);
}
// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObserverable(Class<T> eventType) {
return bus.ofType(eventType); //判断接收事件类型
}
}
不侮辱大家智商,配上注释应该大家都懂了,就不解释了
Rxjava1.x -> 2.x问题
如果大家有兴趣的话,最好还是阅读源码。
有两篇官方文档值得阅读:
what's different in 2.0:https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0
文档:http://reactivex.io/RxJava/2.x/javadoc/
参考文档:http://www.jianshu.com/p/2badfbb3a33b
1. 修改EventThread.class,删除Schedulers.immediate()相关
因为在2.0中删除了Schedulers.immediate()这个线程的切换
2. //存放订阅者信息
private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();
修改为:
private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();
3. CompositeSubscription.unsubscribe();
修改为
CompositeDisposable.dispose();
4. 在2.0中增加了Flowable 这样就把 backpressure 的问题放到了Flowable中来处理,
而Observable 不对backpressure进行处理了。但是使用Flowable还是要注意对backpressure
的处理,不然还是会出现以前的问题。
5. Subscription subscription = toObservable(sub.tag(), cla);
修改为:
Disposable disposable = tObservable(sub.tag(), cla);
6. SerializedSubject 已经变为非public类,可以通过
bus = PublishSubject.create().toSerialized();
的方式获取线程安全 的对象。
7. private final PublishSubject<Object> bus = PublishSubject.create();
final Subject<Object> subject = bus.toSerialized();
修改为:
bus = PublishSubject.create().toSerialized();
toSerialized() 方法中其实就是之前序列化安全对象的写法。而SerializedSubject类已经变成了非public的
8.//1.X public class AppBaseActivity extends AppCompatActivity {
...
private CompositeSubscription mCompositeSubscription;
protected void addSubscription(Subscription subscription) {
if (null == mCompositeSubscription) {
mCompositeSubscription = new CompositeSubscription();
}
mCompositeSubscription.add(subscription);
}
@Override
protected void onDestroy() {
if (null != mCompositeSubscription) {
mCompositeSubscription.unsubscribe();
}
super.onDestroy();
}
...
}
//2.X
public class AppBaseActivity extends AppCompatActivity {
...
private CompositeDisposable mCompositeDisposable;
protected void addDisposable(Disposable disposable) {
if (null == mCompositeDisposable) {
mCompositeDisposable = new CompositeDisposable();
}
mCompositeDisposable.add(disposable);
}
@Override
protected void onDestroy() {
if (null != mCompositeDisposable) {
mCompositeDisposable.clear();
}
super.onDestroy();
}
...
}