前言
项目中,往往少不了事件总线,以前一般使用EventBus,但现在由于Rx系列的强大,也就投入了RxBus的怀抱。下面就来说说RxBus
背压策略
在RxJava1.x版本的时候,可能会遇到MissingBackpressureException和OOM这样的问题,这是很多事件不能被正常的背压处理的导致的,在RxJava2.x版本引入了Flowable来处理背压的问题,那么,什么是背压呢?
背压策略概念:
RxJava在异步事件处理并且被观察者处理事件的速度比观察者处理事件的速度快的情况下,通知被观察者降低发送速度的策略
主要的实现原理就是:
Observables分为两种主要类型:Hot Observables和Cold Observables,Cold Observables是有订阅事件的观察者才会开始发送事件,比如from,create,just,timer,interval,range,zip等操作符;而Hot Observables不管是否有订阅者,创建完后就直接发送事件。所以Hot Observable是不支持背压式,而Cold Observable是部分操作符支持背压式策略的。而不支持背压式的操作符可以通过sample( ) 、 throttleLast( )、 throttleFirst( )、 throttleWithTimeout( ) 、 debounce( )等等这些节流操作符去控制观察者的发送事件速度。关于这方面的知识,这篇文章关于RxJava最友好的文章——背压(Backpressure)就写的很不错。
发送事件
实现步骤
- 创建一个
FlowableProcessor(Any) -
post(o:Any)方法,发送事件函数, 同时传入需要发送的信息 -
toFlowable()返回FlowableProcessor<Any>对象
最后得到的代码如下:
object RxBus{
private val bus:FlowableProcessor<Any> = PublishProcessor.create<Any>().toSerialized()
fun post(t:Any){
bus.onNext(t)
}
fun toFlowable(): FlowableProcessor<Any> {
return bus
}
}
接收事件
实现步骤
- 通过
toFlowable()获取到FlowableProcessor<Any>对象, - 传入需要接收的信息类型,通过
ofType(classType)获取 -
subscribe订阅,获取到信息后进一步的操作。
最后得到的代码如下:
class Bus(val sub: CompositeDisposable){
fun <T : Event> subscribe(classType: Class<T>, onNext:((res: T) ->Unit)?=null,
onError:((e: Throwable) ->Unit)?=null,
onComplete:(() ->Unit)?=null) {
val subscribe = RxBus.toFlowable()
.ofType(classType)
.subscribe({
if(onNext!=null) onNext(it)
},{
if(onError!=null) onError(it)
},{
if(onComplete!=null) onComplete()
})
sub.add(subscribe)
}
}
fun BaseActivity.bus(init: Bus.() -> Unit): Bus {
val bus = Bus(sub)
bus.init()
return bus
}
fun BaseFragment.bus(init: Bus.() -> Unit): Bus {
val bus = Bus(sub)
bus.init()
return bus
}
使用
定义一个Event
open class Event
class LogoutEvent : Event()
然后发送信息
RxBus.post(LogoutEvent())
最后接收信息
bus {
subscribe(LogoutEvent::class.java,{
//进行退出登录操作
})
}
在获取信息的时候传入的CompositeDisposable是BaseActivity/BaseFragment的CompositeDisposable,所以需要在BaseActivity/BaseFragment的onDestroy()里面进行clear
override fun onDestroy() {
sub.clear()
super.onDestroy()
}
这样,一个简单的RxBus就封装好了。