Quartz 源码解析(五) —— QuartzSchedulerThread

大概内容

QuartzSchedulerThread

  • 线程的创建和启动
  • 线程run()方法逻辑
  • 线程的协作

线程的创建和启动

这里主要回顾一下QuartzSchedulerThread是在什么时候创建的,又是在什么时候start的。

大概流程如下图

QuartzSchedulerThread的创建和启动
  1. StdSchedulerFactory.instantiate():生产StdScheduler过程中会new一个QuartzScheduler实例
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
  1. 在QuartzScheduler的构造器方法里面可以看到创建QuartzSchedulerThread的代码逻辑,并通过QuartzSchedulerResources对象获取ThreadExecutor对象,最后execute新建的QuartzSchedulerThread。
// QuartzSchedulerThread创建和启动的逻辑
this.schedThread = new QuartzSchedulerThread(this, resources);
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
schedThreadExecutor.execute(this.schedThread);
  1. DefaultThreadExecutor是ThreadExecutor接口的唯一实现类,传入指定的Thread对象,便启动该线程。到这里,QuartzSchedulerThread启动了。
public class DefaultThreadExecutor implements ThreadExecutor {

    public void initialize() {
    }

    public void execute(Thread thread) {
        thread.start();
    }

}

线程run()方法逻辑

上一篇章里面有解析到scheduler.start()方法,最终调用了QuartzSchedulerThread.togglePause()方法,发出了唤醒线程的信号。

线程的协作通过Object sigLock来实现,关于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock唤醒的是只有线程QuartzSchedulerThread。

唤醒线程QuartzSchedulerThread,执行的就是run方法,run()方法的大致流程:

QuartzSchedulerThread.run逻辑

接下来到具体的代码看看具体的逻辑。

public void run() {
    int acquiresFailed = 0;
    // 只有调用了halt()方法,才会退出这个死循环
    while (!halted.get()) {
        try {
            // Part A:如果是暂停状态,那么循环超时等待1000毫秒

            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..

            // blockForAvailableThreads的语义:阻塞直到有空闲的线程可用,然后返回其数量
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) {

                // Part B:获取acquire状态的Trigger列表

                if (triggers != null && !triggers.isEmpty()) {

                    // Part C:获取List第一个Trigger的下次触发时刻

                    // Part D:设置Triggers为'executing'状态

                    // Part E:执行Job

                }
            } else {
                // should never happen, if threadPool.blockForAvailableThreads() follows contract
                continue;
            }

            // Part F:

        } catch(RuntimeException re) {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
    }

    qs = null;
    qsRsrcs = null;
}

创建JobRunShell

JobRunShell实例在initialize()方法就会把包含业务逻辑类的JobDetailImpl设置为它的成员属性,为后面执行业务逻辑代码做准备。
执行业务逻辑代码在runInThread(shell)方法里面。

JobRunShell shell = null;
try {
    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    shell.initialize(qs);
} catch (SchedulerException se) {
    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
    continue;
}

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
    // this case should never happen, as it is indicative of the
    // scheduler being shutdown or a bug in the thread pool or
    // a thread pool being used concurrently - which the docs
    // say not to do...
    getLog().error("ThreadPool.runInThread() return false!");
    qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

ThreadPool.runInThread()

ThreadPool的具体实例是SimpleThreadPool,这个类维护了3个列表:

// 保存所有WorkerThread的集合
private List<WorkerThread> workers;
// 空闲的WorkerThread集合
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
// 忙碌的WorkerThread集合
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

/**
 * 维护workers、availWorkers和busyWorkers三个列表数据
 * 有任务需要一个线程出来执行:availWorkers.removeFirst();busyWorkers.add()
 * 然后调用WorkThread.run(runnable)方法
 */
public boolean runInThread(Runnable runnable) {
    if (runnable == null) {
        return false;
    }

    synchronized (nextRunnableLock) {

        handoffPending = true;

        // Wait until a worker thread is available
        while ((availWorkers.size() < 1) && !isShutdown) {
            try {
                nextRunnableLock.wait(500);
            } catch (InterruptedException ignore) {
            }
        }

        if (!isShutdown) {
            WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
            busyWorkers.add(wt);
            wt.run(runnable);
        } else {
            // If the thread pool is going down, execute the Runnable
            // within a new additional worker thread (no thread from the pool).
            WorkerThread wt = new WorkerThread(this, threadGroup,
                    "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
            busyWorkers.add(wt);
            workers.add(wt);
            wt.start();
        }
        nextRunnableLock.notifyAll();
        handoffPending = false;
    }

    return true;
}

WorkerThread.run(runnable)

WorkerThread是在SimpleThreadPool的内部类。
WorkerThread.run(runnable)主要是赋值并唤醒lock对象的等待线程队列。

public void run(Runnable newRunnable) {
    synchronized(lock) {
        if(runnable != null) {
            throw new IllegalStateException("Already running a Runnable!");
        }

        runnable = newRunnable;
        lock.notifyAll();
    }
}

WorkerThread.run()

上面方法执行lock.notifyAll()后,对应的WorkerThread就会来到run()方法了。
在这里,我们终于来到了执行业务的execute()方法的倒数第二步。
runnable对象是一个JobRunShell对象,下面在看JobRunShell.run()方法。


public void run() {
    boolean ran = false;

    while (run.get()) {
        try {
            synchronized(lock) {
                while (runnable == null && run.get()) {
                    lock.wait(500);
                }

                if (runnable != null) {
                    ran = true;
                    // todo: 这里不需要创建一个Thread对象来执行业务逻辑,使用Thread另起一个执行流,不方便知道执行的结果
                    // 既然不需要启动一个执行流,为什么需要一个Runnable对象?
                    runnable.run();
                }
            }
        } catch (InterruptedException unblock) {
            // do nothing (loop will terminate if shutdown() was called
            try {
                getLog().error("Worker thread was interrupt()'ed.", unblock);
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        } catch (Throwable exceptionInRunnable) {
            try {
                getLog().error("Error while executing the Runnable: ",
                    exceptionInRunnable);
            } catch(Exception e) {
                // ignore to help with a tomcat glitch
            }
        } finally {
            synchronized(lock) {
                runnable = null;
            }
            // repair the thread in case the runnable mucked it up...
            if(getPriority() != tp.getThreadPriority()) {
                setPriority(tp.getThreadPriority());
            }

            if (runOnce) {
                   run.set(false);
                clearFromBusyWorkersList(this);
            } else if(ran) {
                ran = false;
                makeAvailable(this);
            }

        }
    }

    //if (log.isDebugEnabled())
    try {
        getLog().debug("WorkerThread is shut down.");
    } catch(Exception e) {
        // ignore to help with a tomcat glitch
    }
}

JobRunShell.run()

这个方法里面有很多通知Listener的代码逻辑,这不是我们目前的关注重点,下面的代码省略这些代码。
经过前面这么多跋山涉水,我们终于看到了调用业务的execute()方法的代码逻辑,就是那一行"job.execute(jec)"。

public void run() {
    qs.addInternalSchedulerListener(this);

    try {
        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
        JobDetail jobDetail = jec.getJobDetail();

        do {
            // 其他代码

            long startTime = System.currentTimeMillis();
            long endTime = startTime;

            // execute the job
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                job.execute(jec);
                endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                endTime = System.currentTimeMillis();
                jobExEx = jee;
                getLog().info("Job " + jobDetail.getKey() +
                        " threw a JobExecutionException: ", jobExEx);
            } catch (Throwable e) {
                endTime = System.currentTimeMillis();
                getLog().error("Job " + jobDetail.getKey() +
                        " threw an unhandled Exception: ", e);
                SchedulerException se = new SchedulerException(
                        "Job threw an unhandled exception.", e);
                qs.notifySchedulerListenersError("Job ("
                        + jec.getJobDetail().getKey()
                        + " threw an exception.", se);
                jobExEx = new JobExecutionException(se, false);
            }

            jec.setJobRunTime(endTime - startTime);

            // 其他代码
        } while (true);

    } finally {
        qs.removeInternalSchedulerListener(this);
    }
}

WorkerThread是什么时候被初始化的

回过头来看下WorkerThread的初始化代码逻辑。

  • StdSchedulerFactory.instantiate()创建了ThreadPool tp
  • tp.initialize()里面有初始化WorkerThread的逻辑
// create the worker threads and start them
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
    WorkerThread wt = workerThreads.next();
    wt.start();
    availWorkers.add(wt);
}

系列文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容

  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,604评论 18 399
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 11,247评论 4 56
  • 云盘资源:mongodb驱动安装MongoDB数据库 输入mongo进行测试,出现版本号即成功。 笔者推荐,建议安...
    王宝花阅读 5,747评论 0 0
  • 叶落后 枝头上长出新芽 泛黄的纹理 葱郁地代替 一路繁华走来 一如既往地离开 樟树下 滑板车在小孩的脚下 溜远了 ...
    木水爰阅读 399评论 4 2
  • 来到安顺学院后就听说周围有一个湖,名叫娄湖,也叫娄家坡水库,听说在哪里曾经住着娄姓的一族人,才因此而得名。 当时打...
    郝若歆阅读 754评论 0 1