XXL定时任务-注册过程

一、启用

  1. 配置XxlJobConfig

  2. 示例代码:

    @Configuration
    public class XxlJobConfig {
        /**配置参数 - 实例化XxlJobSpringExecutor所需要的参数 **/
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
        ......
        /** 启动xxl后续过程,需要实例化XxlJobSpringExecutor **/    
        @Bean(initMethod = "start", destroyMethod = "destroy")
        public XxlJobSpringExecutor xxlJobExecutor() {
            ......
        }
    }
    
  3. 这里关注@Bean initMethod destroyMethod

    需要注意的是:

    ​ 单实例bean:容器启动时创建对象

    ​ 多实例bean:每次获取时创建对象

    初始化:

    ​ 对象创建完成,赋值完成,调用初始化方法

    销毁:

    ​ 单实例:容器关闭时调用

    ​ 多实例:容器不会销毁,只能手动调用销毁方法

  4. 这里我们进入类 : XxlJobSpringExecutor

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware {
        @Override
        public void start() throws Exception {
         /** 重点看这个方法 **/
            // init JobHandler Repository
            initJobHandlerRepository(applicationContext);
            // refresh GlueFactory
            GlueFactory.refreshInstance(1);
            /** 之后跟踪父类方法 **/
            // super start
            super.start();
        }
        /** 初始化本地JobHandler存储 **/
        private void initJobHandlerRepository(ApplicationContext applicationContext){
            if (applicationContext == null) {
                return;
            }
         /** 拿到spring上下文,获取注解了JobHandler的实例 **/
            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
    
            if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    if (serviceBean instanceof IJobHandler){
                        /** 这里拿到我们自定义的JobHandler名称**/
                        String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
                        IJobHandler handler = (IJobHandler) serviceBean;
                        /** 判断本地缓存是否已经存在将要注册的名称 **/
                        if (loadJobHandler(name) != null) {
                            throw new RuntimeException("xxl-job jobhandler naming conflicts.");
                        }
                        /** 调用父类(XxlJobExecutor)的方法,缓存servie到本地 **/
                        registJobHandler(name, handler);
                    }
                }
            }
        }
        
        
        /** 获取spring上下文 **/
        private static ApplicationContext applicationContext;
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    
  1. 父类解析 - XxlJobExecutor (由于这个类过长,我们逐个分析)

    /** 用于缓存JobHandler的map **/
    private static ConcurrentHashMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
     /** 缓存JobHandler信息到本地缓存 **/
        public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
            logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
            return jobHandlerRepository.put(name, jobHandler);
        }
     /** 根据自定义的JobHandler名称获取JobHandler实例**/
        public static IJobHandler loadJobHandler(String name){
            return jobHandlerRepository.get(name);
        }
    

    上面过程结束,本地的缓存就结束了,也就结束了启用的过程,下一步我们将继续跟踪后面的过程,注册;

二、注册

​ 首先看 XxlJobExecutor 中的start()方法和destroy() :

// ---------------------- start + stop ----------------------
    public void start() throws Exception {
        // init logpath - 日志路径初始化,由于logPath可以默认为空,且不是我们讨论主要流程,这里先不看他
        XxlJobFileAppender.initLogPath(logPath);
        // init admin-client 
        initAdminBizList(adminAddresses, accessToken);
        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(logRetentionDays);
        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();
        // init executor-server
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
        initRpcProvider(ip, port, appName, accessToken);
    }

针对start()方法中的 initAdminBizList 调用:

/** **/
private static List<AdminBiz> adminBizList;
/** adminAddresses:http://192.168.1.33:8099/xxl-job-admin **/
/** accessToken: 没有使用**/
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {
                String addressUrl = address.concat(AdminBiz.MAPPING);
                /** 组装adminBiz实例 - 这里用了动态代理 **/
                AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), CallType.SYNC,AdminBiz.class, null, 10000, addressUrl, accessToken, null).getObject();
                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                adminBizList.add(adminBiz);
            }
        }
    }
}

AdminBiz:

private NetEnum netType;    - JETTY
private Serializer serializer; - HESSIAN
private CallType callType; - SYNC
private Class<?> iface; - AdminBiz
private String version;
private long timeout;
private String address; - http://192.168.1.33:8099/xxl-job-admin/api
private String accessToken; - 
private XxlRpcInvokeCallback invokeCallback; - null
client : Client client 

重点关注:initRpcProvider()方法

private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
        // init invoker factory -- 此处暂时不用
        xxlRpcInvokerFactory = new XxlRpcInvokerFactory();
        // init, provider factory
        String address = IpUtil.getIpPort(ip, port);
        Map<String, String> serviceRegistryParam = new HashMap<String, String>();
        serviceRegistryParam.put("appName", appName);
        serviceRegistryParam.put("address", address);

        xxlRpcProviderFactory = new XxlRpcProviderFactory();
        xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);

        // add services
        xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());

        // start - 此处调用 
       xxlRpcProviderFactory.start();

    }

类:XxlRpcProviderFactory

public void start() throws Exception {
        // start server
        server = netType.serverClass.newInstance();
        server.setStartedCallback(new BaseCallback() {      // serviceRegistry started
            @Override
            public void run() throws Exception {
                // start registry
                if (serviceRegistryClass != null) {
                    serviceRegistry = serviceRegistryClass.newInstance();
                      /** 此处调用执行具体的 ExecutorServiceRegistry - start() 
                      ExecutorRegistryThread - start() 方法
                      **/
                    serviceRegistry.start(serviceRegistryParam);

                    if (serviceData.size() > 0) {
                        String ipPort = IpUtil.getIpPort(ip, port);
                        for (String serviceKey :serviceData.keySet()) {
                            serviceRegistry.registry(serviceKey, ipPort);
                        }
                    }
                }
            }
        });
        server.setStopedCallback(new BaseCallback() {       // serviceRegistry stoped
            @Override
            public void run() {
                // stop registry
                if (serviceRegistry != null) {
                    if (serviceData.size() > 0) {
                        String ipPort = IpUtil.getIpPort(ip, port);
                        for (String serviceKey :serviceData.keySet()) {
                            serviceRegistry.remove(serviceKey, ipPort);
                        }
                    }
                    serviceRegistry.stop();
                    serviceRegistry = null;
                }
            }
        });
        server.start(this);
    }

再看: ExecutorRegistryThread - start() - 每隔30s发一次的注册信息,就在这个方法里面完成

public void start(final String appName, final String address){
        // valid
        if (appName==null || appName.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
            return;
        }
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
            return;
        }

        registryThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                /** 重点关注这个调用 在调用adminBiz.registry的时候,会触发 initAdminBizList 中 XxlRpcReferenceBean 的 getObject()方法的调用 ,下面给出这个方法**/
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }

                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }

                // registry remove
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

            }
        });
        registryThread.setDaemon(true);
        registryThread.start();
    }
public Object getObject() {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        String className = method.getDeclaringClass().getName();

                        // filter method like "Object.toString()"
                        if (Object.class.getName().equals(className)) {
                            logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());
                            throw new XxlRpcException("xxl-rpc proxy class-method not support");
                        }

                        // address
                        String address = routeAddress();
                        if (address==null || address.trim().length()==0) {
                            throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");
                        }

                        // request
                        XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();
                        xxlRpcRequest.setRequestId(UUID.randomUUID().toString());
                        xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());
                        xxlRpcRequest.setAccessToken(accessToken);
                        xxlRpcRequest.setClassName(className);
                        xxlRpcRequest.setMethodName(method.getName());
                        xxlRpcRequest.setParameterTypes(method.getParameterTypes());
                        xxlRpcRequest.setParameters(args);
                        /** 由于默认调用这个逻辑,就给出了这个逻辑 **/
                        // send
                        if (CallType.SYNC == callType) {
                            try {
                                // future set
                                XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);
                                /** 重点看这个逻辑,进行远程调用的 接着看 通信和序列化**/
                                // do invoke 
                                client.asyncSend(address, xxlRpcRequest);
                                // future get
                                XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
                                if (xxlRpcResponse.getErrorMsg() != null) {
                                    throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
                                }
                                return xxlRpcResponse.getResult();
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);

                                throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);
                            } finally{
                                // remove-InvokerFuture
                                XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());
                            }
                        }
                });
    }

三、通信&序列化

通篇都是再说注册的过程,这里通信和序列化,通信框架的部分,由于默认使用的Jetty,我们就把Jetty的部分拿出来看下:JettyClient - postRequestAsync()

private void postRequestAsync(String address, XxlRpcRequest xxlRpcRequest) throws Exception {
        // reqURL
        String reqURL = address;
        if (!address.toLowerCase().startsWith("http")) {
            reqURL = "http://" + address;   // IP:PORT, need parse to url
        }
        /** 框架默认的使用Hession进行的序列化 **/
        // serialize request
        byte[] requestBytes = xxlRpcReferenceBean.getSerializer().serialize(xxlRpcRequest);

        // httpclient
        HttpClient httpClient = getJettyHttpClient();

        // request
        Request request = httpClient.newRequest(reqURL);
        request.method(HttpMethod.POST);
        request.timeout(xxlRpcReferenceBean.getTimeout() + 500, TimeUnit.MILLISECONDS);     // async, not need timeout
        request.content(new BytesContentProvider(requestBytes));
       /** 调用send方法,发送消息到远程服务端注册,到此结束 **/
        // invoke
        request.send(new BufferingResponseListener() {
            /** 这里处理 调用完成的结果 **/
            @Override
            public void onComplete(Result result) {
                } catch (Exception e){

                  
        });
    }

结束语:

  1. 注解的定义和解析
  2. RPC框架支持
  3. 动态代理的使用
  4. 工厂模式的使用
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容

  • 一. Java基础部分.................................................
    wy_sure阅读 3,805评论 0 11
  • IoC 容器 Bean 的作用域 自定义作用域实现 org.springframework.beans.facto...
    Hsinwong阅读 2,461评论 0 7
  • 小编费力收集:给你想要的面试集合 1.C++或Java中的异常处理机制的简单原理和应用。 当JAVA程序违反了JA...
    八爷君阅读 4,580评论 1 114
  • 1、面向对象的特征有哪些方面 1.抽象:抽象就是忽略一个主题中与当前目标无关的那些方面,以便更充分地注意与当前目标...
    michaelgong阅读 815评论 0 1
  • 25个经典的Spring面试问答 本人收集了一些在大家在面试时被经常问及的关于Spring的主要问题,这些问题有可...
    杀小贼阅读 692评论 0 2