学习资料
- YoKey大神:用RxJava实现事件总线(Event Bus)
- 谢三弟大神:从 RxBus 这辆兰博基尼深入进去
1. RxJava1.0版本使用
RxJava 2.0
版本与1.0
有些不同
1.1 RxBus简单实现
主要用于发送
和接收
public class RxBus {
private final Subject<Object, Object> bus;
private static RxBus defaultRxBus;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getInstance() {
if (null == defaultRxBus) {
synchronized (RxBus.class) {
if (null == defaultRxBus) {
defaultRxBus = new RxBus();
}
}
}
return defaultRxBus;
}
/*
* 发送
*/
public void post(Object o) {
bus.onNext(o);
}
/*
* 是否有Observable订阅
*/
public boolean hasObservable() {
return bus.hasObservers();
}
/*
* 转换为特定类型的Obserbale
*/
public <T> Observable<T> toObservable(Class<T> type) {
return bus.ofType(type);
}
}
post()
方法就是使用onNext(Object o)
方法,将Event对象
发送出去
toObservable(Class<T> type)
,转换为特定类型的Obserbale。方法内使用了ofType
操作符,ofType
源码内就是filter + cast
的组合,只发送特定的类型。fliter
用来判断是否为指定的类型,cast
将一个Observable
转换为指定的特殊Observable
1.2 Activity中使用
最简单的使用,只是为了使用而使用,没有考虑任何场景
public class RxBusActivity extends AppCompatActivity {
private CompositeSubscription mCom;
private TextView tv;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rx_bus);
initView();
}
private void initView() {
tv = (TextView) findViewById(R.id.rxbus_activity_tv);
Button button = (Button) findViewById(R.id.rxbus_activity_send);
//点击发送事件
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBus.getInstance().post(new Event("RxBus Info!!!"));
}
});
//接收
mCom = new CompositeSubscription();
//Event 类型的 Observable 订阅
Subscription subscription = RxBus.getInstance().toObservable(Event.class).subscribe(new Action1<Event>() {
@Override
public void call(Event event) {
tv.setText(event.info);
}
});
//将 Subscription 添加进 CompositeSubscription中
mCom.add(subscription); }
class Event {
String info;
Event(String info) {
this.info = info;
}
}
// @Override
// protected void onDestroy() {
// super.onDestroy();
// if (null != mCom && !mCom.isUnsubscribed()) {
// mCom.unsubscribe();
// }
// }
}
Activity
中有一个Button
,点击Button
后发送一个携带字符串RxBus Info!!!
的Event
对象
CompositeSubscription
看作是用来收集Subscription
容器,内部有一个new HashSet<Subscription>(4)
1.2.1 未取消订阅发生内存泄漏
当成功接收到字符串信息后,点击返回键,关闭当前使用了RxBus
接收Event
的Activity
,过了5秒钟左右,使用的LeakCanary
,便给出了内存泄漏信息
原因:未取消订阅
@Override
protected void onDestroy() {
super.onDestroy();
if (null != mCom && !mCom.isUnsubscribed()) {
mCom.unsubscribe();
}
}
记得在onDestory()
取消订阅,CompositeSubscription
会将内部所有的Subscription
取消订阅
1.3 认识车零件
在RxBus
中涉及到3个重要零件
- Subject
桥梁或者代理;既是Observable
又是Observer
。作为一个Observer
,可以被一个或多个Observable
进行subscribe订阅
;作为一个Observable
,可以转发收到(Observe)的数据,也可以发射新的数据
- SerializedSubject
把 Subject
当作一个 Subscriber
使用时,注意不要从多个线程中调用它的onNext()
方法,也包括其它的onXX()
系列方法。这可能导致同时(非顺序)调用,这会违反Observable
协议,给Subject
的结果增加了不确定性。需要将Subscriber
转换为一个SerializedSubject
mySafeSubject = new SerializedSubject( myUnsafeSubject )
- PublishSubject
只会把在订阅发生的时间点之后来自原始Observable
的数据发送给Observer
PublishSubject
可能会一创建完成,就开始发送数据。这可能会导致在Subject
被创建到有观察者订阅它之前这个时间段内,有数据丢失。为了确保原始Observable
的所有数据都被分发,可以使用PublishSubject.create()
,以便手动给它引入冷Observable
的行为,也就是当所有观察者都已经订阅时才开始发射数据
2. RxJava2.0版本
2.0
版本的RxBus
有些改动,但代码实现思路应该还是一样的。
RxJava1.0
都只是了解一些基础,2.0
更不了解,需要再学习很长一段时间
2.1 RxBus2
代码:
public class RxBus2 {
private static RxBus2 defaultRxBus;
private Subject<Object> bus;
private RxBus2() {
bus = PublishSubject.create().toSerialized();
}
public static RxBus2 getInstance() {
if (null == defaultRxBus) {
synchronized (RxBus2.class) {
if (null == defaultRxBus) {
defaultRxBus = new RxBus2();
}
}
}
return defaultRxBus;
}
public void post(Object o){
bus.onNext(o);
}
public boolean hasObservable() {
return bus.hasObservers();
}
/*
* 转换为特定类型的Obserbale
*/
public <T> Observable<T> toObservable(Class<T> type) {
return bus.ofType(type);
}
}
SerializedSubject
不再通过直接new
的方式,而是使用PublishSubject.create().toSerialized()
的方式
2.2 Activity使用
代码:
public class RxBus2Activity extends AppCompatActivity {
private TextView textView;
private CompositeDisposable compositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rx_bus2);
initView();
}
private void initView() {
textView = (TextView) findViewById(R.id.rx_activity_tv);
Button button = (Button) findViewById(R.id.rx_activity_bt);
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBus2.getInstance().post(new Event("RxBus2 Info!!!"));
}
});
compositeDisposable = new CompositeDisposable();
RxBus2.getInstance().toObservable(Event.class).subscribe(new Observer<Event>() {
@Override
public void onSubscribe(Disposable d) {
compositeDisposable.add(d);
}
@Override
public void onNext(Event event) {
textView.setText(event.info);
}
@Override
public void onError(Throwable e) {
textView.setText(e.getMessage());
}
@Override
public void onComplete() {
}
});
}
class Event {
String info;
Event(String info) {
this.info = info;
}
}
@Override
protected void onDestroy() {
super.onDestroy();
if (null != compositeDisposable && !compositeDisposable.isDisposed()) {
compositeDisposable.clear();
}
}
}
subscribe()订阅
方法返回值为void
,不再是Subscription
。而取消订阅的控制权则交给了
onSubscribe(Disposable d)
方法中的Disposable
。使用CompositeDisposable
可以将所有的订阅关系清除
3. 最后
过完年,第一篇博客,哈哈。看到朋友圈都在晒2017第一个工作日,作为一个没有找到工作的人,表示各种羡慕
有错误,请指出
共勉 :)