3. xxl-job原理-- 执行器注册

xxl-job: v2.0.2 原理 目录学习

环境:

-  idea:2018.3
-  win10
-  maven: 3.5.3
-  jdk:1.8
-  spring cloud:Finchley.RELEASE
-  spring boot: 2.0.8
-  quartz-2.2.3
-  xxl-job:2.0.2

1. 执行器注册

需要有一个调度中心,其他注册器将自己注册到调度中心,然后就可以接任务了。
缺点, 调度中心不能自我注册

2. 注册方式

  • 自动注册
  • 手动录入
主动注册中就是在配置中心,设置配置信息 xxl.job.admin.address=**,   xxl.job.executor.appName=**, 注意appName不可以重复
手动录入,其中的Appname是不能重复的

自动注册流程

在执行器启动时会读取配置,当存在任务调度中心地址会依次向任务调度中心注册其地址.

  1. 填写配置信息
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
##任务调度中心地址,  多个地址有逗号隔开
xxl.job.admin.addresses=http://127.0.0.1:7995/xxl-job-admin/
 
### xxl-job executor address
##任务调度器名称和机器信息
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.ip=127.0.0.1
xxl.job.executor.port=9999

  1. 查找流程

xxl-job-executor-service

--------------------------------------
@Configuration 注解的XxlJobConfig  ---->    @Bean 修饰的xxlJobExecutor方法
-------------------------------------
@Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
       //  xxl-job  配置信息初始化, 初始化 执行器的基本信息, appname, ip, port, accessToken, logpath, logretentionDays
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
-------------------------------------

注意@Bean的 initMethod 与 destroyMethod
xxl-job-core:2.0.2

---------------------
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
    @Override
    public void start() throws Exception {
        // init JobHandler Repository
        initJobHandlerRepository(applicationContext);
        // refresh GlueFactory
        GlueFactory.refreshInstance(1);
        // super start
        super.start();
    }
    private void initJobHandlerRepository(ApplicationContext applicationContext){
       ...
        // init job handler action,  根据应用上下文获取ServiceBeanMap, 
// ServiceBeanMap的value包含IJobHandler的子类
// class DemoJobHandler extends IJobHandler , 该DemoJobHandler  就是
        Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
        if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
            for (Object serviceBean : serviceBeanMap.values()) {
                if (serviceBean instanceof IJobHandler){
// 获取@JobHandler(value="demoJobHandler") 定义的value值
                    String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                    IJobHandler handler = (IJobHandler) serviceBean;
// 从jobHandlerRepository中查找demoJobHandler值, 如果有,抛出异常
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                    }
//   没有就将该demoJobHandler 注册到 jobHandlerRepository中,   xxl-job register jobhandler  success
//  jobHandlerRepository 是有ConcurrentHashMap 类, key: String, value : IJobHandler以及子类
                    registJobHandler(name, handler);
                }
            }
        }
    }
......

}

XxlJobExecutor

  public static IJobHandler loadJobHandler(String name){
        return jobHandlerRepository.get(name);
    }

GlueFactory

// 创建glueFactory实例, 用于根据name生成实例
public static void refreshInstance(int type){
        if (type == 0) {
            glueFactory = new GlueFactory();
        } else if (type == 1) {
// 主要使用SpringGlueFactory
            glueFactory = new SpringGlueFactory();
        }
    }

XxlJobExecutor.start()

// ---------------------- start + stop ----------------------
    public void start() throws Exception {

        // init logpath  
//  初始化log日志文件, 
//  默认的日志路径为/data/applogs/xxl-job/jobhandler
//  如果日志不存在就设置为/data/applogs/xxl-job/jobhandler, 创建响应的目录,
//  如果/data/applogs/xxl-job/jobhandler/gluesource不存在,就创建
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken);


        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        initRpcProvider(ip, port, appName, accessToken);
    }

XxlJobExecutor.initAdminBizList

// ---------------------- admin-client (rpc invoker) ----------------------
    private static List<AdminBiz> adminBizList;
    private static Serializer serializer;
//当存在多个任务调度中心时,创建代理类并注册,在NetComClientProxy
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
// 获取Serializer(抽象类) 子类实例
        serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
// 当存在多个任务调度中心
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
// address 连接上/api
                    String addressUrl = address.concat(AdminBiz.MAPPING);
//  构建 xxlRpcRefrenceBean 实例 , 注意这些属性信息  
                    AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
                          // 使用 netty_http 实现RPC,   在v2.0.1 使用的是 Jetty_http , v2.0.2 使用的是Netty_http
                            NetEnum.NETTY_HTTP,
                          // Serializer  实例
                            serializer, 
                            // CallType 调用方式, SYNC 同步
                            CallType.SYNC,
                         // LoadBalance 方式,  路由策略为 轮询
                            LoadBalance.ROUND,
                          //  AdminBiz 类
                            AdminBiz.class,
                          // version  版本
                            null,
                              // timeout  超时时间
                            10000,
                          // 注册中心地址
                            addressUrl,
                          //  权限控制
                            accessToken,
                            // invokeCallback  回调
                            null,
                            // invokerFactory   调用工厂
                            null
                    ).getObject();  //  使用动态代理生成AdminBiz  实现类
//  存储  adminBiz实例
                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

getObject
XxlRpcReferenceBean

public Object getObject() {
//  使用动态代理,生成代理类
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{this.iface}, new InvocationHandler() {
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                String className = method.getDeclaringClass().getName();
                String varsion_ = XxlRpcReferenceBean.this.version;
                String methodName = method.getName();
                Class<?>[] parameterTypes = method.getParameterTypes();
                Object[] parameters = args;
                if (className.equals(XxlRpcGenericService.class.getName()) && methodName.equals("invoke")) {
                    Class<?>[] paramTypes = null;
                    if (args[3] != null) {
                        String[] paramTypes_str = (String[])((String[])args[3]);
                        if (paramTypes_str.length > 0) {
                            paramTypes = new Class[paramTypes_str.length];

                            for(int i = 0; i < paramTypes_str.length; ++i) {
                                paramTypes[i] = ClassUtil.resolveClass(paramTypes_str[i]);
                            }
                        }
                    }

                    className = (String)args[0];
                    varsion_ = (String)args[1];
                    methodName = (String)args[2];
                    parameterTypes = paramTypes;
                    parameters = (Object[])((Object[])args[4]);
                }

                if (className.equals(Object.class.getName())) {
                    XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}#{}]", className, methodName);
                    throw new XxlRpcException("xxl-rpc proxy class-method not support");
                } else {
                    String finalAddress = XxlRpcReferenceBean.this.address;
                    if ((finalAddress == null || finalAddress.trim().length() == 0) && XxlRpcReferenceBean.this.invokerFactory != null && XxlRpcReferenceBean.this.invokerFactory.getServiceRegistry() != null) {
                        String serviceKey = XxlRpcProviderFactory.makeServiceKey(className, varsion_);
                        TreeSet<String> addressSet = XxlRpcReferenceBean.this.invokerFactory.getServiceRegistry().discovery(serviceKey);
                        if (addressSet != null && addressSet.size() != 0) {
                            if (addressSet.size() == 1) {
                                finalAddress = (String)addressSet.first();
                            } else {
                                finalAddress = XxlRpcReferenceBean.this.loadBalance.xxlRpcInvokerRouter.route(serviceKey, addressSet);
                            }
                        }
                    }

                    if (finalAddress != null && finalAddress.trim().length() != 0) {
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(XxlRpcReferenceBean.this.accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(methodName);
                        xxlRpcRequest.setParameterTypes(parameterTypes);
                        xxlRpcRequest.setParameters(parameters);
                        XxlRpcFutureResponse futureResponse;
//  使用同步调用方式,底层使用netty_http,执行RPC框架, 调用调度中心方法
                        if (CallType.SYNC == XxlRpcReferenceBean.this.callType) {
// 构建xxlRpc 回调的结果对象
                            futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);

                            Object var31;
                            try {
                                XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
// 得到返回结果对象
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(XxlRpcReferenceBean.this.timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
// 返回结果
                                var31 = xxlRpcResponse.getResult();
                            } catch (Exception var21) {
                                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                                throw (Throwable)(var21 instanceof XxlRpcException ? var21 : new XxlRpcException(var21));
                            } finally {
                                futureResponse.removeInvokerFuture();
                            }

                            return var31;
                        } else if (CallType.FUTURE == XxlRpcReferenceBean.this.callType) {
                            futureResponse = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, (XxlRpcInvokeCallback)null);

                            try {
                                XxlRpcInvokeFuture invokeFuture = new XxlRpcInvokeFuture(futureResponse);
                                XxlRpcInvokeFuture.setFuture(invokeFuture);
                                XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
                                return null;
                            } catch (Exception var20) {
                                XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                                futureResponse.removeInvokerFuture();
                                throw (Throwable)(var20 instanceof XxlRpcException ? var20 : new XxlRpcException(var20));
                            }
                        } else if (CallType.CALLBACK == XxlRpcReferenceBean.this.callType) {
                            XxlRpcInvokeCallback finalInvokeCallback = XxlRpcReferenceBean.this.invokeCallback;
                            XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();
                            if (threadInvokeCallback != null) {
                                finalInvokeCallback = threadInvokeCallback;
                            }

                            if (finalInvokeCallback == null) {
                                throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType=" + CallType.CALLBACK.name() + ") cannot be null.");
                            } else {
                                XxlRpcFutureResponse futureResponsex = new XxlRpcFutureResponse(XxlRpcReferenceBean.this.invokerFactory, xxlRpcRequest, finalInvokeCallback);

                                try {
                                    XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
                                    return null;
                                } catch (Exception var19) {
                                    XxlRpcReferenceBean.logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
                                    futureResponsex.removeInvokerFuture();
                                    throw (Throwable)(var19 instanceof XxlRpcException ? var19 : new XxlRpcException(var19));
                                }
                            }
                        } else if (CallType.ONEWAY == XxlRpcReferenceBean.this.callType) {
                            XxlRpcReferenceBean.this.client.asyncSend(finalAddress, xxlRpcRequest);
                            return null;
                        } else {
                            throw new XxlRpcException("xxl-rpc callType[" + XxlRpcReferenceBean.this.callType + "] invalid");
                        }
                    } else {
                        throw new XxlRpcException("xxl-rpc reference bean[" + className + "] address empty");
                    }
                }
            }
        });
    }

通过测试,查看注册的代码走向

1

实际还是调用 AdminBizImpl .registry方法。

如何实现自我注册

最简单的方式:

设置RegistryParam对象属性,
将admin所在机器的地址与配置文件中端口号,配置文件中xxl.job.executor.appname的属性注入其中。
然后调用AdminBizImpl.registry(RegistryParam)方法即可

总结

执行器注册流程如下:

1. init JobHandler Repository 
2. refresh GlueFactory
3. super start
    init logpath
    init invoker, admin-client   利用反射,向注册中心注册自己,并将adminBiz缓存起来
    init JobLogFileCleanThread   处理jobLog日志
    init TriggerCallbackThread   处理 触发周期性 回调,  callback 和 retry 
    init executor-server         initRpcProvider  启动serviceRegister注册  
            init, provider factory       (ExecutorServiceRegistry)      
            add services        将该执行器以服务的形式加入
            start               会利用netty 开启一个server, port: 9999,
                                还会开启有一个ExecutorRegistryThread线程,不断地注册自己     

问题: 自我注册成功, 但是注册之前的执行器丢失, 而且新注册的执行器无法显示

PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”一下,就此谢过!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容

  • 概述 XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源...
    Tian_Peng阅读 410,327评论 2 142
  • 1.简介 1.1概述 XXL-JOB是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易...
    会飞的谭猪猪阅读 8,799评论 1 8
  • 在半个月之前,有幸看了xxl-job源码,原本打算写一篇源码分析文章。结果由于琐碎的事情干扰了,搁浅了。本篇文章先...
    cmazxiaoma阅读 15,794评论 3 95
  • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AM...
    大佛爱读书阅读 2,827评论 0 20
  • 今年5月份的心理咨询师考试明天截止报名,我搭上了最后一班车,报名成功。今天在教室里听了一整天的课,上午9点到12点...
    9c80a2f86c0f阅读 232评论 0 0