RxBus2.0 Rxjava2.0 使用

最近项目中把rxjava 切换到2.0 所以相对应的一些都要做出改变 新版本的 独立出来一个Flowable 来处理背压事件.
下面就是修改过的 适用于Rxjava2.0 版本的 rxbus

/**
 * Created by storm on 2017/9/20.
 * <p>
 * Rxbus
 */

public class RxBus {

    private static final String TAG = RxBus.class.getSimpleName();


    private static volatile RxBus mInstance;

    /**
     * 默认 bus ;
     */

    private Subject<Object> _mBus;

    /**
     * 背压
     */
    private FlowableProcessor<Object> _mBackPressureBus;


    private Map<Object, CompositeDisposable> mSubscription;


    private RxBus() {

        _mBus = PublishSubject.create().toSerialized();

        _mBackPressureBus = PublishProcessor.create().toSerialized();
    }


    public static RxBus getInstance() {

        if (mInstance == null) {

            synchronized (RxBus.class) {
                if (mInstance == null) {

                    mInstance = new RxBus();
                }
            }
        }

        return mInstance;
    }


    /**
     * 发送普通事件
     */
    public void send(Object event) {

        _mBus.onNext(event);

    }


    /**
     * 发送背压事件
     */
    public void sendByBackPressure(Object event) {
        _mBackPressureBus.onNext(event);

    }


    /**
     * 接收普通事件
     */
    public <T> Observable<T> toObservable(Class<T> eventType) {

        return _mBus.ofType(eventType);
    }


    /**
     * 接受背压事件
     */
    public <T> Flowable<T> toFlowable(Class<T> eventType) {

        return _mBackPressureBus.ofType(eventType);
    }


    /**
     * 普通事件的处理
     */
    public <T> Disposable doSubscribe(Class<T> eventType, Consumer<T> next, Consumer<Throwable> error) {

        return toObservable(eventType)
                .compose(RxHelper.<T>IO_Main())
                .subscribe(next, error);
    }


    /**
     * 背压事件处理
     */
    public <T> Flowable doFlowable(Class<T> eventType, Subscriber<T> tSubscriber) {

        toFlowable(eventType)
                .onBackpressureLatest() //背压策略
                .compose(RxHelper.<T>IO_Main_Flowable())
                .subscribeWith(tSubscriber);

        return toFlowable(eventType);
    }


    /**
     * 是否有订阅者
     */
    public  boolean hasSubscribers(boolean isBackPressure) {

        if (!isBackPressure)
            return _mBus.hasObservers();
        else
            return _mBackPressureBus.hasSubscribers();
    }


    /**
     * 背压解除订阅
     */
    public void unSubscription(){

        _mBackPressureBus.onComplete();

    }


    /**
     * 添加订阅到集合(一般事件)
     */
    public void addSubscriptions(Object o, Disposable disposable) {

        if (mSubscription == null) {
            mSubscription = new HashMap<>();
        }

        String key = o.getClass().getName();

        if (mSubscription.get(key) != null) {
            mSubscription.get(key).add(disposable);

        } else {
            CompositeDisposable compositeDisposable = new CompositeDisposable();

            compositeDisposable.add(disposable);
            mSubscription.put(key, compositeDisposable);
        }


    }


    /**
     * 解除订阅
     * 一般事件的解除订阅
     *
     * @param o
     */
    public void clearSubscriptions(Object o) {
        if (mSubscription == null) {
            return;
        }


        String key = o.getClass().getName();

        if (!mSubscription.containsKey(key)) {
            return;
        }


        if (mSubscription.get(key) != null) {
            mSubscription.get(key).dispose();

        }

        mSubscription.remove(key);
    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 173,142评论 25 708
  • 前言 如果你对RxJava1.x还不是了解,可以参考下面文章。 1. RxJava使用介绍 【视频教程】2. Rx...
    jdsjlzx阅读 21,221评论 3 78
  • 文字版: 图片版:这次的输出是用keynote制作的,导出的是普清图片:
    Aria_zhang阅读 225评论 0 0
  • 笔尖在纸上飞快地滑动着,充满力量而又轻盈流畅。仿佛在写作中,她的生活挺起胸脯竖了起来,还发出“咯硌”的笑声。
    一些信仰的的声音阅读 257评论 0 0
  • 这是一颗有爱的心 不舍昼夜地培养 对每个我们所遇见的人 付出喜悦和爱 你看那热恋的人 都有一颗快乐的心 这是一颗感...
    七徽阅读 661评论 4 3