- 基于RxJava 2.0 实现。
- 代码为Kotlin。
package core.zs.rxbus
import io.reactivex.Flowable
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.processors.FlowableProcessor
import io.reactivex.processors.PublishProcessor
/**
* @function RxBus,使用RxJava实现,用于代替常用的EventBus.
* @author ZhangShuai.
* @created 2018/4/18.
*/
object RxBus {
// 发布处理器。
private val mBus: FlowableProcessor<Any> = PublishProcessor.create<Any>().toSerialized()
private val mDisposeMap = hashMapOf<String, CompositeDisposable>()
/**
* 发送事件。
* @param event 任意的事件对象
*/
fun <T> post(event: T) =
mBus.onNext(event)
/**
* 注册接收事件的类型。
* @param clazz 接收事件的类型
* @return Flowable对象
*/
fun <T> register(clazz: Class<T>): Flowable<T> {
return mBus.ofType(clazz)
}
/**
* 取消所有的订阅关系。
*/
fun unSubsrcibeAll() =
mBus.onComplete()
/**
* 是否存在订阅者
* @return true:存在订阅者 false:不存在订阅者
*/
fun hasSubscribers() =
mBus.hasSubscribers()
/**
* 添加Disposable。
* @param obj 订阅者
* @param dispose Disposable对象
*
*/
fun add(obj: Any, dispose: Disposable) {
val tagName = obj::class.java.simpleName
var compositeDisposable = mDisposeMap[tagName]
if (compositeDisposable == null) {
compositeDisposable = CompositeDisposable()
}
compositeDisposable.add(dispose)
}
/**
* 取消某个订阅者下的所有观察者
* @param obj:观察者实例
*/
fun unRegister(obj: Any) {
val tagName = obj::class.java.simpleName
var compositeDisposable = mDisposeMap[tagName]
compositeDisposable?.dispose()
}
}