Oozie-Service-CoordMaterializeTriggerService

功能:
定期去扫描元数据表CoordinatorJobBean,将满足条件的Job实体化;

// default is 300sec (5min)
int schedulingInterval = Services.get().getConf().getInt(CONF_SCHEDULING_INTERVAL, lookupInterval);
Runnable lookupTriggerJobsRunnable = new CoordMaterializeTriggerRunnable(materializationWindow, lookupInterval);
services.get(SchedulerService.class).schedule(lookupTriggerJobsRunnable, 10, schedulingInterval,SchedulerService.Unit.SEC);

CoordMaterializeTriggerRunnable:

oozie里面的触发机制:

 1、启动一个定时执行器,每隔 schedulingInterval 时间运行一次 CoordMaterializeTriggerRunnable;
 2、查询CoordinatorJobBean根据时间和条数限制获取本次需要实例化的CoordinatorJobBean;
    技巧: 在CoordinatorJobBean中有个字段 nextMaterializedTimestamp 表明,在这个时间点之前的实例已经存在,为了尽可能的让任务的实例尽可能在触发之前
    实例就产生,而不是事后产生,在查询CoordinatorJobBean使用的时候不是使用当前时间,而是使用未来的一个时间: new Date().getTime() + lookupInterval * 1000
 3、异步去实例化 CoordinatorJobBean 下的 action;
 4、在实例化action的时候,生成的是窗口期内的一批实例,在类 CoordMaterializeTransitionXCommand中实现;
 5、loadState():加载 CoordinatorJobBean 信息,计算这次要实例化的时间窗口: startMatdTime 和 endMatdTime;
 6、materialize():根据不同的计算方式(cron、自定义)来在这个时间窗口内循环的产生action的实例;
 7、更新  CoordinatorJobBean 状态,记录 endMatdTime、lastActionNumber(时间累加)、状态置为running状态、如果jobEndTime< endMatdTime
    说明这个 CoordinatorJobBean 实例化工作已经结束,标记 job.setDoneMaterialization();
 8、performWrites():更新数据库
 9、notifyParent():通知上层结构 bundle;
/** * This runnable class will run in every "interval" to queue CoordMaterializeTransitionXCommand. 
*/
static class CoordMaterializeTriggerRunnable implements Runnable {
    private int materializationWindow;
    private int lookupInterval;
    private long delay = 0;
    private List<XCallable<Void>> callables;
    private List<XCallable<Void>> delayedCallables;
    private XLog LOG = XLog.getLog(getClass());
    public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) {
        this.materializationWindow = materializationWindow;
        this.lookupInterval = lookupInterval;
    }
    @Override
    public void run() {
        LockToken lock = null;
        // first check if there is some other running instance from the same service;
        try {
            lock = Services.get().get(MemoryLocksService.class)
                    .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
            if (lock != null) {
                runCoordJobMatLookup();
                if (null != callables) {
                    boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
                    if (ret == false) {
                        XLog.getLog(getClass()).warn(
                                "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
                                        + "Most possibly command queue is full. Queue size is :"
                                        + Services.get().get(CallableQueueService.class).queueSize());
                    }
                    callables = null;
                }
                if (null != delayedCallables) {
                    boolean ret = Services.get().get(CallableQueueService.class)
                            .queueSerial(delayedCallables, this.delay);
                    if (ret == false) {
                        XLog.getLog(getClass()).warn(
                                "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. "
                                        + "Most possibly Callable queue is full. Queue size is :"
                                        + Services.get().get(CallableQueueService.class).queueSize());
                    }
                    delayedCallables = null;
                    this.delay = 0;
                }
            }
            else {
                LOG.debug("Can't obtain lock, skipping");
            }
        }
        catch (Exception e) {
            LOG.error("Exception", e);
        }
        finally {
            if (lock != null) {
                lock.release();
                LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName());
            }
        }
    }
    /**
     * Recover coordinator jobs that should be materialized
     * @throws JPAExecutorException
     */
    private void runCoordJobMatLookup() throws JPAExecutorException {
        List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();        XLog.Info.get().clear();
        XLog LOG = XLog.getLog(getClass());
        try {
            // get current date
            Date currDate = new Date(new Date().getTime() + lookupInterval * 1000);
            // get list of all jobs that have actions that should be materialized.
            int materializationLimit = ConfigurationService.getInt(CONF_MATERIALIZATION_SYSTEM_LIMIT);
            materializeCoordJobs(currDate, materializationLimit, LOG, updateList);
        }
        catch (Exception ex) {
            LOG.error("Exception while attempting to materialize coordinator jobs, {0}", ex.getMessage(), ex);
        }
        finally {
            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
        }
    }
    private void materializeCoordJobs(Date currDate, int limit, XLog LOG, List<UpdateEntry> updateList)
            throws JPAExecutorException {
        try {
            List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList(
                    CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit);
            LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate)
                    + ", Num jobs to materialize = " + materializeJobs.size());
            for (CoordinatorJobBean coordJob : materializeJobs) {
                Services.get().get(InstrumentationService.class).get()
                        .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1);
                queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
                coordJob.setLastModifiedTime(new Date());
                updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
                        coordJob));
            }
        }
        catch (JPAExecutorException jex) {
            LOG.warn("JPAExecutorException while attempting to materialize coordinator jobs", jex);
        }
    }
    /**
     * Adds callables to a list. If the number of callables in the list reaches {@link
     * CoordMaterializeTriggerService#CONF_CALLABLE_BATCH_SIZE}, the entire batch is queued and the callables list
     * is reset.
     *
     * @param callable the callable to queue. 
    */
    private void queueCallable(XCallable<Void> callable) {
        if (callables == null) {
            callables = new ArrayList<XCallable<Void>>();
        }
        callables.add(callable);
        if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables);
            if (ret == false) {
                XLog.getLog(getClass()).warn(
                        "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. "
                                + "Most possibly command queue is full. Queue size is :"
                                + Services.get().get(CallableQueueService.class).queueSize());
            }
            callables = new ArrayList<XCallable<Void>>();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 飘飘落落进入尘土 谁能知道它的苦 失去庇护了的孩子 如同宇宙一粒珠 在那无限大的疆土 只有靠自己去征途 无论无何 ...
    希雨啾阅读 122评论 0 1
  • 起因 由于强迫症的缘故,所以我电脑dns都是用dnsspeeder来缓存dnsdnsspeeder大概放了10个d...
    庄msia阅读 7,454评论 0 0
  • 早安。 早起吃完早饭,室友还在睡,窗帘拉着,宿舍漆黑安静。 在空间看到她的说说,一张合照,上面的男生却不是你。我...
    薄暮凉夏阅读 135评论 0 0
  • 空空法师,年有几何,没人知道。他反正没有头发,也就没有了白发。偏偏他又长着一个没有年龄的脸。真是让人生气。 他最喜...
    草小孟阅读 848评论 0 0