由于网络上搜索RxBus的文章都只支持RxJava1,而RxJava2已经在16年底正式发布了,我们现在来支持一下RxJava2版本的RxBus
引入
dependencies {
// rxjava and rxandroid
compile 'io.reactivex.rxjava2:rxjava:2.0.4'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
RxBus类
package com.eggsy.framework.bus;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.operators.flowable.FlowableOnBackpressureError;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
/**
* Created by eggsy on 17-1-6.
*/
public class RxBus {
private static RxBus instance;
private Subject<Object> subjectBus;
private FlowableProcessor<Object> processorBus;
public static RxBus getDefault() {
if (instance == null) {
synchronized (RxBus.class) {
if (instance == null) {
RxBus tempInstance = new RxBus();
tempInstance.subjectBus = PublishSubject.create().toSerialized();
tempInstance.processorBus = PublishProcessor.create().toSerialized();
instance = tempInstance;
}
}
}
return instance;
}
public Disposable register(Class eventType, Consumer observer) {
return toObserverable(eventType).subscribe(observer);
}
public Disposable register(Class eventType, Consumer observer, Scheduler scheduler) {
return toObserverable(eventType).observeOn(scheduler).subscribe(observer);
}
public Disposable register(Class eventType, Consumer observer,Scheduler scheduler, BackpressureStrategy strategy){
Flowable o = toFlowable(eventType);
switch (strategy) {
case DROP:
o = o.onBackpressureDrop();
case LATEST:
o = o.onBackpressureLatest();
case MISSING:
o = o;
case ERROR:
o = RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(o));
default:
o = o.onBackpressureBuffer();
}
if(scheduler!=null){
o.observeOn(scheduler);
}
return o.subscribe(observer);
}
public Disposable register(Class eventType, Consumer observer,BackpressureStrategy strategy){
return register(eventType,observer,null,strategy);
}
public void unRegister(Disposable disposable) {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
public void unRegister(CompositeDisposable compositeDisposable) {
if (compositeDisposable != null) {
compositeDisposable.dispose();
}
}
public void post(final Object event) {
subjectBus.onNext(event);
processorBus.onNext(event);
}
private Observable toObserverable(Class cls) {
return subjectBus.ofType(cls);
}
private Flowable toFlowable(Class cls) {
return processorBus.ofType(cls);
}
public boolean hasObservers() {
return subjectBus.hasObservers();
}
public boolean hasSubscribers() {
return processorBus.hasSubscribers();
}
}
以上是对RxJava2的RxBus的简单封装,在此基础上可以使用apt来对类进行进一步的封装,请参考我的另外一篇文章基于APT的RxBus库,最后我将它放到github上并且补充相应的示例,欢迎fork和star写的不好或者不对的地方,欢迎留言评论交流~~~