Harmony鸿蒙 专属RxHarmony

前言

鸿蒙也支持Java语言开发,所以也可以使用RxJava,RxJava除了有众多操作符外,还有便捷的线程切换,例如子线程执行完耗时操作,在主线程中更新UI

Android中,与RxJava配套的有RxAndroid,提供了主线程Scheduler调度器,而鸿蒙也有主线程的概念,API也比较类似,按道理是可以按葫芦画瓢,实现一个RxHarmony

代码原理

要写一个RxRxHarmony库,必须先了解RxAndroid,了解后,才能理解怎么实现。原理可以参考这篇文章,RxAndroid 源码分析

大概原理就是,RxAndroid使用Handler把异步任务发消息到主线程处理,实现线程切换。而RxHarmony则使用EventHandler发消息到主线程,原理基本一致

类结构

  • RxHarmonyPlugins,工具类,提供一系列的static方法,外部可以调用进行配置,在特定时机,进行hook和处理
  • HarmonySchedulers,线程调度器工厂
  • EventHandlerScheduler,Harmony主线程调度器,内部通过EventHandler,发送消息到主线程,对要执行的异步任务进行处理,实现线程切换
  • MainThreadDisposable,主线程Disposable实现类,一般配合RxBinding使用

RxHarmonyPlugins

/**
 * 工具类,提供一系列的static方法,外部可以调用进行配置,在特定时机,进行hook和处理
 */
public final class RxHarmonyPlugins {
    private static volatile Function<Callable<Scheduler>, Scheduler> onInitMainThreadHandler;
    private static volatile Function<Scheduler, Scheduler> onMainThreadHandler;

    /**
     * 工具类,隐藏构造方法,如果被反射,抛出异常
     */
    private RxHarmonyPlugins() {
        throw new AssertionError("No instances.");
    }

    /**
     * 初始化主线程调度器
     *
     * @param scheduler 默认的调度器,被Callable回调包裹
     * @return 要被应用的主线程调度器
     */
    public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
        //判空
        if (scheduler == null) {
            throw new NullPointerException("scheduler == null");
        }
        //如果有通过setInitMainThreadSchedulerHandler()方法设置了,hook回调函数,则通过hook回调函数进行处理,再返回调度器
        Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
        if (f == null) {
            //没有设置,判空后,再返回
            return callRequireNonNull(scheduler);
        }
        //设置了,调用设置的回调函数,进行处理
        return applyRequireNonNull(f, scheduler);
    }

    /**
     * 设置主线程调度器hook回调函数
     */
    public static void setMainThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
        onMainThreadHandler = handler;
    }

    /**
     * 处理传入的Scheduler调度器
     */
    public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
        if (scheduler == null) {
            throw new NullPointerException("scheduler == null");
        }
        Function<Scheduler, Scheduler> f = onMainThreadHandler;
        if (f == null) {
            return scheduler;
        }
        return apply(f, scheduler);
    }

    /**
     * 获取hook回调函数,可能为null
     */
    public static Function<Callable<Scheduler>, Scheduler> getInitMainThreadSchedulerHandler() {
        return onInitMainThreadHandler;
    }

    /**
     * 设置hook回调函数,可以对设置的调度器进行处理,再返回
     */
    public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
        onInitMainThreadHandler = handler;
    }

    /**
     * 返回配置hook回调函数
     *
     * @return 返回hook回调函数,可能为null
     */
    public static Function<Scheduler, Scheduler> getOnMainThreadSchedulerHandler() {
        return onMainThreadHandler;
    }

    /**
     * 重置所有配置
     */
    public static void reset() {
        setInitMainThreadSchedulerHandler(null);
        setMainThreadSchedulerHandler(null);
    }

    /**
     * 判空获取的Scheduler调度器,非null,则返回,为null则抛异常
     */
    static Scheduler callRequireNonNull(Callable<Scheduler> s) {
        try {
            Scheduler scheduler = s.call();
            if (scheduler == null) {
                throw new NullPointerException("Scheduler Callable returned null");
            }
            return scheduler;
        } catch (Throwable ex) {
            throw Exceptions.propagate(ex);
        }
    }

    /**
     * 调用传入的Function回调函数,对Scheduler进行处理
     *
     * @param f 回调函数
     * @param s Scheduler调度器
     * @return 要应用的调度器
     */
    static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
        Scheduler scheduler = apply(f, s);
        if (scheduler == null) {
            throw new NullPointerException("Scheduler Callable returned null");
        }
        return scheduler;
    }

    /**
     * 调用回调函数对目标对象进行处理
     */
    static <T, R> R apply(Function<T, R> f, T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw Exceptions.propagate(ex);
        }
    }
}

HarmonySchedulers

/**
 * HarmonyOS,线程调度器工厂
 */
public final class HarmonySchedulers {
    /**
     * 主线程调度器
     */
    private static final Scheduler MAIN_THREAD =
            RxHarmonyPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);

    /**
     * 单例,保存主线程调度器
     */
    private static final class MainHolder {
        static final Scheduler DEFAULT = new EventHandlerScheduler(new EventHandlerScheduler.WithIdEventHandler(EventRunner.current()));
    }

    /**
     * 工具类,隐藏构造方法,如果被反射,抛出异常
     */
    private HarmonySchedulers() {
        throw new AssertionError("No instances.");
    }

    /**
     * 获取主线程调度器
     */
    public static Scheduler mainThread() {
        return RxHarmonyPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /**
     * 可以指定EventRunner,类似Android中的Looper轮训器,所以调度器可以绑定在非主线程中
     */
    public static Scheduler from(EventRunner eventRunner) {
        //判空
        if (eventRunner == null) throw new NullPointerException("looper == null");
        //根据配置,创建对应的主线程调度器
        return new EventHandlerScheduler(new EventHandlerScheduler.WithIdEventHandler(eventRunner));
    }
}
  • EventHandlerScheduler
/**
 * Harmony主线程调度器,内部通过EventHandler,发送消息到主线程,对要执行的异步任务进行处理,实现线程切换
 */
public class EventHandlerScheduler extends Scheduler {
    private final WithIdEventHandler eventHandler;

    /**
     * 构造方法,保存传入的EventHandler实例
     */
    EventHandlerScheduler(WithIdEventHandler eventHandler) {
        this.eventHandler = eventHandler;
    }

    /**
     * 调度方法
     *
     * @param run   要执行的异步任务
     * @param delay 延时时间
     * @param unit  时间单位
     * @return 返回Disposable实例,被对任务进行取消
     */
    @NonNull
    @Override
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //判空
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        //执行hook回调函数,如果没有设置,会返回传入的Runnable实例
        run = RxJavaPlugins.onSchedule(run);

        //对异步任务进行包装,ScheduledRunnable实现了Runnable和Disposable接口,可以对异步任务进行取消
        ScheduledRunnable scheduled = new ScheduledRunnable(eventHandler, run);

        //获取一个InnerEvent消息
        InnerEvent innerEvent = InnerEvent.get(scheduled);
        //发送消息到主线程
        eventHandler.sendEvent(innerEvent, unit.toMillis(delay));
        //返回任务,外部可以对该任务进行取消
        return scheduled;
    }

    /**
     * 创建Worker实例,会创建HandlerWorker实例并返回
     */
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventHandlerWorker(eventHandler);
    }

    /**
     * Worker子类
     */
    private static final class EventHandlerWorker extends Worker {
        private final WithIdEventHandler handler;

        /**
         * 是否被切断标志位
         */
        private volatile boolean disposed;

        /**
         * 参数生成器,用于在 dispose() 方法中,移除该Worker调度的任务
         */
        private static final AtomicLong paramsCreator = new AtomicLong();
        /**
         * 参数
         */
        private final long params;

        EventHandlerWorker(WithIdEventHandler eventHandler) {
            this.handler = eventHandler;
            params = paramsCreator.incrementAndGet();
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            //判空
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            //如果被切断,直接返回
            if (disposed) {
                return Disposables.disposed();
            }

            //执行hook函数,如果没有设置,则会返回传入的Runnable对象
            run = RxJavaPlugins.onSchedule(run);

            //对异步任务进行包装
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            //获取一个InnerEvent消息
            InnerEvent innerEvent = InnerEvent.get(scheduled);
            //保存异步任务到事件中,在EventHandler的processEvent()回调时,进行处理
            innerEvent.object = this;
            //保存Worker的参数到消息,在dispose()中,移除消息
            innerEvent.param = params;

            //发送消息到主线程进行执行
            handler.sendEvent(innerEvent, unit.toMillis(delay));

            //再次检查是否被切断,如果被切断,则取消任务,直接返回
            if (disposed) {
                handler.removeTask(scheduled);
                return Disposables.disposed();
            }
            return scheduled;
        }

        @Override
        public void dispose() {
            //被切断了,记录标志位,移除任务
            disposed = true;
            //通过 param 参数移除该 Worker 调度,单未执行的 InnerEvent
            handler.removeEvent(handler.id, params);
        }

        @Override
        public boolean isDisposed() {
            //返回切断标志位
            return disposed;
        }
    }

    /**
     * 对异步任务进行包装,实现Disposable接口,提供可取消任务的功能
     */
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final EventHandler handler;
        private final Runnable delegate;

        /**
         * 切断标志
         */
        private volatile boolean disposed;

        ScheduledRunnable(EventHandler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                //执行异步任务
                delegate.run();
            } catch (Throwable t) {
                //如果出现异常,交给hook回调函数处理
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            //被切断,移除任务
            handler.removeTask(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            //返回是否被切断
            return disposed;
        }
    }

    /**
     * EventHandler 带一个消息Id
     */
    static class WithIdEventHandler extends EventHandler {
        private static final AtomicInteger idCreator = new AtomicInteger();
        /**
         * 每个EventHandler都配置一个id,通过这个EventHandler发送的事件,它的eventId都统一为这个id
         */
        private final int id;

        public WithIdEventHandler(EventRunner runner) throws IllegalArgumentException {
            super(runner);
            //生成Id
            id = idCreator.incrementAndGet();
        }

        @Override
        protected void processEvent(InnerEvent event) {
            super.processEvent(event);
            //非当前Handler的事件不处理
            if (event.eventId != id) {
                return;
            }
            Object obj = event.object;
            if (obj instanceof Runnable) {
                ((Runnable) obj).run();
            }
        }
    }
}

MainThreadDisposable

public abstract class MainThreadDisposable implements Disposable {
    //工具方法,可以检查是否是主线程,非主线程会抛出一个IllegalStateException异常
    public static void verifyMainThread() {
        if (EventRunner.current() != EventRunner.getMainEventRunner()) {
            throw new IllegalStateException(
                "Expected to be called on the main thread but was " + Thread.currentThread().getName());
        }
    }

    //原子变量,保证多线程安全
    private final AtomicBoolean unsubscribed = new AtomicBoolean();

    @Override
    public final boolean isDisposed() {
        //返回是否切换
        return unsubscribed.get();
    }

    @Override
    public final void dispose() {
        //被切断,确保只能切断一次,原子标志只能设置一次,从false变true
        if (unsubscribed.compareAndSet(false, true)) {
            //主线程,直接切断,非主线程则通过主线程调度器发送任务进行切断
            if (EventRunner.current() == EventRunner.getMainEventRunner()) {
                onDispose();
            } else {
                //非主线程,通过调度器,在主线程中切断
                HarmonySchedulers.mainThread().scheduleDirect(this::onDispose);
            }
        }
    }

    protected abstract void onDispose();
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容