Rxjava的粘性Subject,防止消息被遗漏

...

之前测试反馈了一个bug,接口请求到了需要更新但是dialog却没有弹出,简单的看了一下,更新的逻辑是依赖于一个类似Eventbus的内容实现,在activity进入的时候绑定,在退出的时候取消绑定,而首页会跳转InitActivity进行app的初始化,导致绑定暂时失效,所以没有接受到更新的message,之前的send实现是这样

private static final Subject<Object> messenger =    PublishSubject.create();

public static void send(Object message) {
    messenger.onNext(message);
}

那么确认bug所在之后提出的需求也很简单,需要对被遗漏的message做缓存,
最简单的方法自然是直接维护一个cache[],在send的时候将之前的内容贴上去


    private static List<Object> msgSticky = new ArrayList<>();

    public static void send(Object message) {
        messenger.onNext(message);
    }
 
    private static final int ADD = 1;
    
    private static final int REMOVE = 2;

    public static void sendSticky(Object message) {
        //sendmessage
        messenger.onNext(message);
        //add to cache
        operate(message, ADD);
    }
    
    
        private static void subscribeInternal(final Receiver receiver) {
        Disposable disposable = changeScheduler(receiver.scheduler)
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        if (receiver.origin == null || receiver.origin.get() == null) {
                            return;
                        }
                        CompositeDisposable s = subscriverDisposableMap
                                .get(getKey(receiver.origin.get(), receiver.originClass));
                        if (s == null || s.isDisposed()) {
                        //delete from cache
                            operate(o, REMOVE);
                            return;
                        }
                        if (o.getClass().equals(receiver.messageType)) {
                        //delete from cache
                            operate(o, REMOVE);
                            // 消息, 原订阅者对象
                            receiver.method.invoke(receiver.proxy, o, receiver.origin.get());
                        }
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) {
                    }
                });
        addDisposable(receiver, disposable);
        
        
        
        
private static void addDisposable(Receiver receiver, Disposable disposable) {
        CompositeDisposable compositeDisposable = subscriverDisposableMap
                .get(getKey(receiver.origin.get(), receiver.originClass));
        if (compositeDisposable == null) {
            compositeDisposable = new CompositeDisposable();
            subscriverDisposableMap.put(getKey(receiver.origin.get(), receiver.originClass)
                    , compositeDisposable);
        }
        compositeDisposable.add(disposable);
        for (Object msg : msgSticky) {
            messenger.onNext(msg);
        }
    }

    }

发送粘性消息时添加一份备份到cacheList中,并在sendMessage中将粘性消息取出遍历发送,需求自然是实现了,但是很明显不太美观,所以最好的方法自然是修改Subject的实现,Rxjava初始有以下几个实现


image.png

具体每个Subject的职责也能从名字看出来,Replay自然就是所有消息都会被备份,但是无法被消费释放,所以不能满足我们的要求,所以需要自己实现一个Subject,思路也很简单,总而言之就是维护一个线程安全的Subject
首先实现一个Sticky的Disposable

    static final class StickyDisposable<T extends Message> extends AtomicBoolean implements Disposable {

        final Observer<? super T> actual;

        final StickySubject<T> parent;

        StickyDisposable(Observer<? super T> actual, StickySubject<T> parent) {
            super(true);
            this.actual = actual;
            this.parent = parent;
        }

        void onNext(T t) {
            if (get()) {
                this.actual.onNext(t);
            }
        }

        void onNext(T[] t) {
            if (get()) {
                for (T s : t) {
                    this.actual.onNext(s);
                }
            }
        }

        void onComplete() {
            if (get()) {
                this.actual.onComplete();
            }
        }

        void onError(Throwable t) {
            if (get()) {
                this.actual.onError(t);
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            if (compareAndSet(true, false)) {
                parent.remove(this);
            }
        }

        @Override
        public boolean isDisposed() {
            return !get();
        }

同常规的disposable一样,继承原子boolean类保证线程安全,且当dispose时从parentSubject中移除。
接着是消息的消费者抽象类MessageConsumer,这点没什么特殊,唯一要实现的就是accept消息时需要保证消息还未被消费,逻辑由subject实现
最后就是具体的subject:

public final class StickySubject<T extends Message> extends Subject<T> {

    private final AtomicReference<Message[]> stickyMsg;

    private final AtomicReference<StickyDisposable<T>[]> subscribers;

    private static final StickyDisposable[] EMPTY = new StickyDisposable[0];

    private static final StickyDisposable[] TERMINATED = new StickyDisposable[0];

    public static <T extends Message> StickySubject<T> create() {
        return new StickySubject<T>();
    }

    private StickySubject() {
        this.stickyMsg = new AtomicReference<>(new Message[0]);
        this.subscribers = new AtomicReference<StickyDisposable<T>[]>(EMPTY);
    }

    private boolean add(StickyDisposable<T> disposable) {
        for (;;) {
            StickyDisposable[] subscribers = this.subscribers.get();
            if (subscribers == TERMINATED) {
                return false;
            }
            int length = subscribers.length;
            StickyDisposable[] tmp = new StickyDisposable[length + 1];
            System.arraycopy(subscribers, 0, tmp, 0, length);
            tmp[length] = disposable;
            if (this.subscribers.compareAndSet(subscribers, tmp)) {
                offerMsg(tmp);
                return true;
            }
        }
    }

    private void offerMsg(StickyDisposable[] tmp) {
        Message[] msg = stickyMsg.get();
        for (StickyDisposable disposable : tmp) {
            disposable.onNext(msg);
        }
    }

    private synchronized boolean consume(Message msg) {
        for (;;) {
            StickyDisposable[] disposables = this.subscribers.get();
            if (disposables == TERMINATED) {
                return false;
            }
            Message[] msgCache = this.stickyMsg.get();
            int position = 0;
            for (;;) {
                if (position >= msgCache.length) {
                    return false;
                }
                if (msgCache[position].equals(msg)) {
                    break;
                }
                position ++;
            }
            Message[] after = new Message[msgCache.length - 1];
            if (msgCache.length > 1) {
                System.arraycopy(msgCache, 0, after, 0, position);
                System.arraycopy(msgCache, position + 1, after, position, msgCache.length - position - 1);
            }
            if (this.stickyMsg.compareAndSet(msgCache, after)) {
                return true;
            }
        }
    }

    private synchronized boolean hasConsume(Message msg) {
        Message[] msgs = this.stickyMsg.get();
        for (Message sMsg : msgs) {
            if (sMsg.equals(msg)) {
                return false;
            }
        }
        return true;
    }

    @Override
    public boolean hasObservers() {
        return this.subscribers.get().length > 0;
    }

    @Override
    public boolean hasThrowable() {
        return false;
    }

    @Override
    public boolean hasComplete() {
        return this.subscribers.get() == TERMINATED;
    }

    @Override
    public Throwable getThrowable() {
        return null;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        StickyDisposable<T> disposable = new StickyDisposable<T>(observer, this);
        observer.onSubscribe(disposable);
        if (add(disposable)) {
            if (disposable.isDisposed()) {
                remove(disposable);
            }
        } else {
            observer.onComplete();
        }
    }

    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public synchronized void onNext(T t) {
        StickyDisposable[] stickyDisposables = this.subscribers.get();
        if (t.isSticky()) {
            for (;;) {
                Message[] msgCache = this.stickyMsg.get();
                Message[] tempCache = new Message[msgCache.length + 1];
                System.arraycopy(msgCache, 0, tempCache, 0, msgCache.length);
                tempCache[msgCache.length] = t;
                if (stickyMsg.compareAndSet(msgCache, tempCache)) {
                    break;
                }
            }
        }
        for (StickyDisposable disposable : stickyDisposables) {
            disposable.onNext(t);
        }
    }

    @Override
    public void onError(Throwable e) {
        RxJavaPlugins.onError(e);
    }

    @Override
    public void onComplete() {
        StickyDisposable[] disposables = this.subscribers.get();
        for (StickyDisposable disposable : disposables) {
            disposable.onComplete();
        }
        this.subscribers.set(TERMINATED);
    }

    public void remove(StickyDisposable<T> disposable) {
        for (;;) {
            StickyDisposable[] disposables = this.subscribers.get();
            if (disposables == EMPTY || disposables == TERMINATED) {
                return;
            }
            int position = 0;
            while (position <= disposables.length - 1 && disposable != disposables[position]) {
                position++;
            }
            if (position > disposables.length - 1){
                return;
            }
            StickyDisposable[] temp = new StickyDisposable[disposables.length - 1];
            if (disposables.length == 1) {
                temp = EMPTY;
            } else {
                System.arraycopy(disposables, 0, temp, 0, position);
                System.arraycopy(disposables, position + 1, temp, position, disposables.length - 1 - position);
            }
            if (this.subscribers.compareAndSet(disposables, temp)) {
                return;
            }
        }
    }

因为代码写的有点久了,也没有什么复杂的逻辑,主要是做个记录,有什么问题或者漏洞欢迎指出,最后贴个链接~:

github-MessageSample

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 16,023评论 2 11
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,148评论 1 32
  • 为了更好的理解 Looper 的工作原理,我们需要对 ThreadLocal 进行了解,如果对 ThreadLoc...
    墨染书阅读 1,519评论 0 3
  • 2017.02.22 可以练习,每当这个时候,脑袋就犯困,我这脑袋真是神奇呀,一说让你做事情,你就犯困,你可不要太...
    Carden阅读 1,383评论 0 1
  • 日常里的亲子互动中,保护于孩子相处而建立在孩子心中的信任很重要。在很多的亲子关系里,孩子已经不愿对父...
    荀晓英阅读 371评论 0 6