Rxbus 的封装及其使用,支持背压

Rxbus的使用:

1、发送消息:

RxBus.getDefault().post(type, object);

2、接收消息:

Disposable  disposable = RxBus.getDefault().register(type, object.class)
.subscribe(o -> {
      //todo
});

3、注销消息:

RxBus.getDefault().unregister(disposable);

4、封装Rxbus

private static volatile RxBus  mInstance;

private final FlowableProcessorbus = PublishProcessor.create().toSerialized();

public static RxBus getDefault() {
        if (mInstance ==null) {
              synchronized (RxBus.class) {
                 if (mInstance ==null) {
                      mInstance =new RxBus();
                    }
             }
      }
       return mInstance;
 }

   /**
   * @param tag    tag必须保持一致
   * @param object 发送的数据类型和接受的数据类型必须要一致,要不然接受不到消息
   */
public void post(@NonNull String tag, @NonNull Object object) {
    RxBusEntity entity =new RxBusEntity();
    entity.setTag(tag);
    entity.setObject(object);
    bus.onNext(entity);
}
/**
* 注册 rxbus
*
* @param tag
* @param tClass
* @param <T>
* @return
*/
public Flowableregister(@NonNull final String tag, @NonNull final Class tClass) {
      return register(tag, tClass, AndroidSchedulers.mainThread());
}
public Flowableregister(@NonNull final String tag, final Class tClass, Scheduler scheduler) {
     return bus.filter(o -> {
            if (oinstanceof RxBusEntity) {
                 RxBusEntity entity = (RxBusEntity) o;
                 if (TextUtils.isEmpty(entity.getTag())) {
                        return false;  
                 }
                if (entity.getTag().equals(tag)) {
                       return tClass.isInstance(entity.getObject());
                 }
          }
           return false;
    }).map(o -> ((RxBusEntity) o).getObject()).cast(tClass).observeOn(scheduler);
}

public void unregister(Disposable... disposables) {
      if (disposables ==null){
            return;
        }
      for (Disposable disposable : disposables) {
           if (disposable ==null || disposable.isDisposed()) {
                 continue;
            }
          disposable.dispose();
    }
}

5、RxBusEntity:

private StringmTag;
private ObjectmObject;
public StringgetTag() {
  return mTag;
}
public void setTag(String tag) {
    this.mTag = tag;
}
public ObjectgetObject() {
    return mObject;
}
public void setObject(Object object) {
    this.mObject = object;
}

6、以上就是支持背压的rxbus封装,及其好用,没有太多的花里花哨,消息稳定、灵活使用、线程切换,无所不能。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,674评论 1 32
  • 原文地址:http://gank.io/post/560e15be2dca930e00da1083 前言 我从去年...
    AFinalStone阅读 2,328评论 5 23
  • 5月23日 想起了,就更新加入标签的快递100查询网页,捕捉到上午10点多快递员正在派发的信息,暗自欣喜这似乎离我...
    屋檐下听着雨阅读 368评论 0 0
  • “某天我们来交换看一天的日记吧。等到那一天,我会随便和你说个日期,你也可以和我说个日期,我们就要把那一天的日记拿给...
    秋薁阅读 3,972评论 2 4
  • <转载> 作者:彭明輝,台湾清华大学 在知識分工越來越精細,且職場競爭越來越激烈的21世紀,「通識課該教什麼」這個...
    北冥有啥阅读 566评论 0 0

友情链接更多精彩内容