JMCR 线程调度

简述

在上一篇文章JMCR 约束求解原理中,我们通过约束求解的方式得到了一个新的前缀。本文将探讨JMCR如何使程序按照规定的序列进行调度

提纲

  • 插桩前后比较
  • before 系列函数
  • 自定义调度的实现

系列文章:
1. JMCR 简介
2. JMCR 字节码插桩(一)
3. JMCR 字节码插桩(二)
4. JMCR 约束求解原理
5. JMCR 线程调度

一、插装前后对比

结合之前的插装和约束求解中定义的事件,JMCR 在每一个定义的事件发生之前都会通过字节码插装来调用一系列的函数... 就像这样:
插装前:

package edu.tamu.aser.tests;

import edu.tamu.aser.reex.JUnit4MCRRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnit4MCRRunner.class)
public class MyTest {
    static int a = 0;
    static int b = 1;

    @Test
    public void test() throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                a = 1;
                System.out.println("b : " + b);
            }
        }).start();
        Thread.sleep(10);
        if(a == 0){ //Read  a 0
            b = 2;   //Write b 2
        } else {
            System.err.println("Error! Read a != 0");
            System.exit(-1);
        }
        System.out.println(b); // Read b 2
    }
}

后:

package edu.tamu.aser.tests;

import edu.tamu.aser.reex.JUnit4MCRRunner;
import edu.tamu.aser.reex.Scheduler;
import edu.tamu.aser.runtime.RVRunTime;
import java.io.PrintStream;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JUnit4MCRRunner.class)
public class MyTest {
    static int a;
    static int b;

    public MyTest() {
    }

    @Test
    public void test() throws InterruptedException {
        Thread var6;
        Thread var10000 = var6 = new Thread(new Runnable() {
            public void run() {
                if (this instanceof Runnable) {
                    RVRunTime.logThreadBegin();
                }

                try {
                    RVRunTime.logSleep();
                    Thread.sleep(10L);
                } catch (InterruptedException var9) {
                    var9.printStackTrace();
                }

                byte var6 = 1;
                Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "a", "I");
                MyTest.a = 1;
                RVRunTime.logFieldAcc(30, (Object)null, 1, Integer.valueOf(var6), true);
                Scheduler.beforeFieldAccess(true, "java/lang/System", "out", "Ljava/io/PrintStream;");
                PrintStream var7;
                PrintStream var10000 = var7 = System.out;
                RVRunTime.logFieldAcc(31, (Object)null, 4, var7, false);
                StringBuilder var10001 = (new StringBuilder()).append("b : ");
                Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "b", "I");
                int var8;
                int var10002 = var8 = MyTest.b;
                RVRunTime.logFieldAcc(34, (Object)null, 2, var8, false);
                var10000.println(var10001.append(var10002).toString());
                if (this instanceof Runnable) {
                    RVRunTime.logThreadEnd();
                }

            }
        });
        RVRunTime.logBeforeStart(5, var6);
        var10000.start();
        RVRunTime.logSleep();
        Thread.sleep(10L);
        Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "a", "I");
        int var7;
        int var12 = var7 = a;
        RVRunTime.logFieldAcc(7, (Object)null, 1, var7, false);
        PrintStream var13;
        if (var12 == 0) {
            byte var8 = 2;
            Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "b", "I");
            b = 2;
            RVRunTime.logFieldAcc(10, (Object)null, 2, Integer.valueOf(var8), true);
        } else {
            Scheduler.beforeFieldAccess(true, "java/lang/System", "err", "Ljava/io/PrintStream;");
            PrintStream var9;
            var13 = var9 = System.err;
            RVRunTime.logFieldAcc(11, (Object)null, 3, var9, false);
            var13.println("Error! Read a != 0");
            System.exit(-1);
        }

        Scheduler.beforeFieldAccess(true, "java/lang/System", "out", "Ljava/io/PrintStream;");
        PrintStream var10;
        var13 = var10 = System.out;
        RVRunTime.logFieldAcc(15, (Object)null, 4, var10, false);
        Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "b", "I");
        int var11;
        int var10001 = var11 = b;
        RVRunTime.logFieldAcc(16, (Object)null, 2, var11, false);
        var13.println(var10001);
    }

    static {
        byte var6 = 0;
        Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "a", "I");
        a = 0;
        RVRunTime.logInitialWrite(20, (Object)null, 1, Integer.valueOf(var6));
        byte var7 = 1;
        Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "b", "I");
        b = 1;
        RVRunTime.logInitialWrite(22, (Object)null, 2, Integer.valueOf(var7));
    }
}

二、before 系列函数

在所有 beforeXX 系列的函数中,他们无一例外的调用了 beforeEvent 这个函数。

TIM截图20191124194204.png

事已至此,自然要去这个 beforeEvent 里一探究竟:

/**
     * Helper method called before a schedule relevant event.
     * 
     * @param eventDesc
     *            {@link EventDesc} describing the schedule relevant event.
     * @param pause
     *            whether to pause the current thread before the event.
     */
    private static void beforeEvent(EventDesc eventDesc, boolean pause) {
        ThreadInfo currentThreadInfo;
        schedulerStateLock.lock();       //保证beforeEvent操作的原子性
        try {
            //...
            currentThreadInfo = liveThreadInfos.get(Thread.currentThread());
            if(currentThreadInfo!=null)
            {
                //...
                if (pause) {
                    pausedThreadInfos.add(currentThreadInfo); // 1
                }
            }
        } finally {
            if (pause) {
                schedulerWakeupCondition.signal();
            }
            schedulerStateLock.unlock();
        }
        
        try {
            if (pause) {
                if(currentThreadInfo!=null){
                    currentThreadInfo.getPausingSemaphore().acquire(); // 2
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
  1. 代码里面那些schedulerStateXXX.XXX() 函数是为了保证测试代码中的原子性。
  2. 代码标注为1的那行代码将当前线程ThreadInfo加入了pausedThreadInfo中。ThreadInfo 是对于 Thread 类的拓展,包含了线程本身、线程当前执行行数、和一个初始值为 0 的信号量,其中信号量可以通过 getPausingSemaphore() 获得,且每一个 Thread 对应一个 ThreadInfo。
  3. 代码标注为 2 的地方,调用了 ThreadInfo.getPausingSemaphore().acquire() 使调用 beforeEvent() 的线程阻塞。
  4. pause 参数是用来判断是否需要在插装点运行之前阻塞线程,即我们并不需要调度所有的节点都暂停。

点到声明 PausedThreadInfos 的地方附近

    private static Map<Thread, ThreadInfo> liveThreadInfos;
    private static SortedSet<ThreadInfo> pausedThreadInfos;
    private static Set<ThreadInfo> blockedThreadInfos;

通过查看这些 Map 在代码中的引用情况,可以得知:

  • liveThreadInfo 是程序中所有的线程
  • pausedThreadInfo 是由于下一步即将运行到某种插装点之前而人为暂停的线程
  • blockedThreadInfo 是由于程序运行而 block 掉的线程。

随后可以开启后台进程,配合之前的操作,进行调度。

三、调度实现

观察在 Scheduler 中的这段代码:

    static {
        Thread schedulerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                boolean timeout = false;
                while (true) {  
                    schedulerStateLock.lock();
                    try {
                        if (!liveThreadInfos.isEmpty()) {
                            if (liveThreadInfos.size() == blockedThreadInfos.size()) {
                                deadlockOrFinishNotifier.release();//当前存活线程数等于阻塞线程数,表示死锁或结束(0 == 0)
                            }
                            if (!pausedThreadInfos.isEmpty() && 
                                    pausedThreadInfos.size() == liveThreadInfos.size() - blockedThreadInfos.size()) {
                                //选择一个要执行的线程
                                if (pausedThreadInfos.size() > 1){
                                    System.out.println(1);
                                }
                                ThreadInfo chosenPausedThreadInfo = (ThreadInfo) choose(pausedThreadInfos, ChoiceType.THREAD_TO_SCHEDULE);
                                if(chosenPausedThreadInfo!=null)
                                {
                                    pausedThreadInfos.remove(chosenPausedThreadInfo);//在pausingthread中去除选择的线程
                                    chosenPausedThreadInfo.getPausingSemaphore().release();//并令其通行
                                }
                                //something wrong
                                //just release the lock, wait for the thread to be added to the paused thread
                            }
                            else if(timeout && !pausedThreadInfos.isEmpty())//JEFF
                            {
                                //..处理超时  
                            }
                       timeout = !schedulerWakeupCondition.await(500, Reex_TimeUnit.MILLISECONDS);
                        
                    } catch (Throwable exp) {
                        System.out.flush();
                        System.err.println("Uncaught exception in scheduler thread:");
                        exp.printStackTrace(System.err);
                        System.exit(2);
                    } finally {
                        schedulerStateLock.unlock();
                    }
                } //end while
            }
        }, "Scheduler");
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

这个后台进程会一直循环,循环中做了这么几件事。

  1. 首先判断 liveThreadInfos的大小和 blockedThreadInfos 大小是否相等,如果相等,则有可能在程序中出现了死锁,或者程序运行结束,此时释放 deadlockOrFinishNotifier,运行死锁检测程序。
  2. 如果 pausedThreadInfos 不是一个空的Map,且 liveThreadInfos 的大小等于pausedThreadInfosblockedThreadInfos的大小之和,就调用 strategy .choose() 方法从 pausedThreadInfos 中选择出一个ThreadInfo,调用ThreadInfo.getSemaphore().release() 方法。换成人话来说,就是当前存在一些我们在调度点之前调用 acquire 而阻塞的线程,且现在程序里面所有进程不是被手动阻塞在了调度点之前,就是原来代码执行导致的阻塞,这时候我们就应该去从那些被阻塞到了调度点的线程中选择一个线程,使其放行。然后这个线程运行到下一个调度点之前,继续被阻塞,这里就会执行新的一轮放行。如此往复,便实现了对于线程的可控调度。 choose 使用了策略模式,具体实现方法不展开描述。默认使用的是MCRStrategy,这里也是使用了 IoC 控制反转。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,094评论 1 32
  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 5,962评论 3 28
  • Java多线程学习 [-] 一扩展javalangThread类 二实现javalangRunnable接口 三T...
    影驰阅读 2,952评论 1 18
  • 本文主要讲了java中多线程的使用方法、线程同步、线程数据传递、线程状态及相应的一些线程函数用法、概述等。 首先讲...
    李欣阳阅读 2,448评论 1 15
  • 线程 在传统操作系统中,每个进程有一个地址空间和一个控制线程。事实上这几乎就是进程的定义。不过经常存在在同一个地址...
    伊恩的道歉阅读 2,222评论 0 6