XX-JOB阅读笔记(一):执行器注册到管理器实现方式

这里不再对开源框架XX-JOB做介绍,单纯介绍部分功能实现原理。本篇记录执行器Executor如何注册到任务管理器Admin。
本系列文章基于V2.1.0版本介绍,附github上架构图。

image.png

Executor将本执行器的ip地址及端口号注册到Admin,实际是保存在数据库表xxl_job_registry中,保存后地址如:
image.png

然后Admin在根据不同策略获取这些地址。

整体大概流程

image.png

工程目录

image.png

以springboot为例

初始化配置文件XxlJobConfig.java -->创建XxlJobSpringExecutor.java(set 执行器和管理器各种信息) bean -->指定init和destory方法 --> XxlJobSpringExecutor执行start()

//从application.properties文件中读取admin和executor信息,并初始化到XxlJobSpringExecutor类中,指定init和destory方法
@Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        //设置admin地址,eg:http://127.0.0.1:8080/xxl-job-admin
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        //设置执行器名称,eg:xxl-job-executor-sample
        xxlJobSpringExecutor.setAppName(appName);
        //设置执行器ip和port
        xxlJobSpringExecutor.setIp(ip);       
        xxlJobSpringExecutor.setPort(port);
         //设置执行器访问口令
        xxlJobSpringExecutor.setAccessToken(accessToken);
       //设置日志保存路径
        xxlJobSpringExecutor.setLogPath(logPath);
         //设置日志保存天数
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

接着XxlJobSpringExecutor执行start方法

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
    @Override
    public void start() throws Exception {
        // 初始化执行器上面的任务
        initJobHandlerRepository(applicationContext);
        // refresh GlueFactory
        GlueFactory.refreshInstance(1);
        // super start
        super.start();
    }

接着父类XxlJobExecutor 执行start方法

// ---------------------- start + stop ----------------------
    public void start() throws Exception {
        //设置日志路径
        // init logpath
        XxlJobFileAppender.initLogPath(logPath);
        //设置admin地址及执行器访问口令
        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken);
        //设置日志清理线程参数
        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
        //任务执行结果回调线程(包含回调失败后重试机制)
        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();
        // init executor-server
        //设置执行器ip和port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        //注册执行器
        initRpcProvider(ip, port, appName, accessToken);
    }

接着看上面的initAdminBizList(adminAddresses, accessToken)方法,这一步是初始化Admin的值,以及初始化执行器访问口令,下面看具体执行逻辑

//初始化各种rpc的各种协议
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
                    String addressUrl = address.concat(AdminBiz.MAPPING);
                    // 这里的getObject() 返回的是一个动态代理对象,代理对象在使用方法时,并不是真实的自己调用,而是委托尤其关联到的hander对象的invoke方法来调用
                    AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
                            NetEnum.NETTY_HTTP,
                            serializer,
                            CallType.SYNC,
                            LoadBalance.ROUND,
                            AdminBiz.class,
                            null,
                            3000,
                            addressUrl,
                            accessToken,
                            null,
                            null
                    ).getObject();//getObject方法比较重要
                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }
public Object getObject() {
                //使用动态代理,通过此方法发送请求到Admin的/api接口,api接口收到请求后,解析出具体的方法和参数,获取到对应的Bean,通过反射执行具体的方法,最终实现调用AdminBizImpl.registry()
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

接着看initRpcProvider(ip, port, appName, accessToken)方法:

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
        // init, provider factory
        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);
        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        //指定执行器注册类为ExecutorServiceRegistry
        xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
        // add services
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
        //启动执行器注册工厂
        // start
        xxlRpcProviderFactory.start();

    }

接着是 xxlRpcProviderFactory.start()方法

public void start() throws Exception {
        // start server
        serviceAddress = IpUtil.getIpPort(this.ip, port);
        server = netType.serverClass.newInstance();
        server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                                        //执行器注册类启动
                    serviceRegistry.start(serviceRegistryParam);
                    if (serviceData.size() > 0) {
                        serviceRegistry.registry(serviceData.keySet(), serviceAddress);
                    }
                }
            }
        });
        server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        serviceRegistry.remove(serviceData.keySet(), serviceAddress);
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });
        server.start(this);
    }

回到ExecutorServiceRegistry的start方法,

public static class ExecutorServiceRegistry extends ServiceRegistry {
        @Override
        public void start(Map<String, String> param) {
            //此处进行注册
            // start registry
            ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
        }
        @Override
        public void stop() {
            // stop registry
            ExecutorRegistryThread.getInstance().toStop();
        }
        @Override
        public boolean registry(Set<String> keys, String value) {
            return false;
        }
        @Override
        public boolean remove(Set<String> keys, String value) {
            return false;
        }
        @Override
        public Map<String, TreeSet<String>> discovery(Set<String> keys) {
            return null;
        }
        @Override
        public TreeSet<String> discovery(String key) {
            return null;
        }
    }

进入ExecutorRegistryThread.start()方法,

 registryThread = new Thread(new Runnable() {
            @Override
            public void run() {
                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {//此处注册
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

Admin中接受Executor请求的入口

/**
 * Executor调用Admin入口,接受到请求后,在进行反思操作,实现调用具体方法
 * Created by xuxueli on 17/5/10.
 */
@Controller
public class JobApiController implements InitializingBean {
    @Override
    public void afterPropertiesSet() throws Exception {
    }
    //执行器调用管理器方法入口
    @RequestMapping(AdminBiz.MAPPING)
    @PermissionLimit(limit=false)
    public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {        
        XxlJobScheduler.invokeAdminService(request, response);
    }
}

XxlJobScheduler.invokeAdminService(request, response) ->servletServerHandler.handle(null, request, response)->xxlRpcProviderFactory.invokeService(xxlRpcRequest)

public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {
        //  make response
        XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
        xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());

        // match service bean 获取匹配的Bean
        String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());
        Object serviceBean = serviceData.get(serviceKey);

        // valid
        if (serviceBean == null) {
            xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");
            return xxlRpcResponse;
        }

        if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {
            xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");
            return xxlRpcResponse;
        }
        if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {
            xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");
            return xxlRpcResponse;
        }

        try {
            // invoke
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = xxlRpcRequest.getMethodName();
            Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();
            Object[] parameters = xxlRpcRequest.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            //反射调用具体方法
            Object result = method.invoke(serviceBean, parameters);

            /*FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            Object result = serviceFastMethod.invoke(serviceBean, parameters);*/

            xxlRpcResponse.setResult(result);
        } catch (Throwable t) {
            // catch error
            logger.error("xxl-rpc provider invokeService error.", t);
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));
        }

        return xxlRpcResponse;
    }

Admin中实现类

@Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        //注册信息入库
        int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
        if (ret < 1) {
            xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());

            // fresh
            freshGroupRegistryInfo(registryParam);
        }
        return ReturnT.SUCCESS;
    }

其中Executor动态代理AdminBiz接口和Admin的/api动态反射执行具体方法属于作者自研RPC框架部分,本篇只做了注册部分和解析部分的介绍,后续会单独介绍自研RPC框架部分。
到此,执行器的地址就已经完全注册到管理器中。

阅读原文关注公众号,更多文章持续更新中,原文地址:
https://mp.weixin.qq.com/s/sJarz6_zBWzKtIr-qFIbsw

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容