简述
在上一篇文章JMCR 约束求解原理中,我们通过约束求解的方式得到了一个新的前缀。本文将探讨JMCR如何使程序按照规定的序列进行调度
提纲
- 插桩前后比较
- before 系列函数
- 自定义调度的实现
系列文章:
1. JMCR 简介
2. JMCR 字节码插桩(一)
3. JMCR 字节码插桩(二)
4. JMCR 约束求解原理
5. JMCR 线程调度
一、插装前后对比
结合之前的插装和约束求解中定义的事件,JMCR 在每一个定义的事件发生之前都会通过字节码插装来调用一系列的函数... 就像这样:
插装前:
package edu.tamu.aser.tests;
import edu.tamu.aser.reex.JUnit4MCRRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(JUnit4MCRRunner.class)
public class MyTest {
static int a = 0;
static int b = 1;
@Test
public void test() throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
a = 1;
System.out.println("b : " + b);
}
}).start();
Thread.sleep(10);
if(a == 0){ //Read a 0
b = 2; //Write b 2
} else {
System.err.println("Error! Read a != 0");
System.exit(-1);
}
System.out.println(b); // Read b 2
}
}
后:
package edu.tamu.aser.tests;
import edu.tamu.aser.reex.JUnit4MCRRunner;
import edu.tamu.aser.reex.Scheduler;
import edu.tamu.aser.runtime.RVRunTime;
import java.io.PrintStream;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(JUnit4MCRRunner.class)
public class MyTest {
static int a;
static int b;
public MyTest() {
}
@Test
public void test() throws InterruptedException {
Thread var6;
Thread var10000 = var6 = new Thread(new Runnable() {
public void run() {
if (this instanceof Runnable) {
RVRunTime.logThreadBegin();
}
try {
RVRunTime.logSleep();
Thread.sleep(10L);
} catch (InterruptedException var9) {
var9.printStackTrace();
}
byte var6 = 1;
Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "a", "I");
MyTest.a = 1;
RVRunTime.logFieldAcc(30, (Object)null, 1, Integer.valueOf(var6), true);
Scheduler.beforeFieldAccess(true, "java/lang/System", "out", "Ljava/io/PrintStream;");
PrintStream var7;
PrintStream var10000 = var7 = System.out;
RVRunTime.logFieldAcc(31, (Object)null, 4, var7, false);
StringBuilder var10001 = (new StringBuilder()).append("b : ");
Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "b", "I");
int var8;
int var10002 = var8 = MyTest.b;
RVRunTime.logFieldAcc(34, (Object)null, 2, var8, false);
var10000.println(var10001.append(var10002).toString());
if (this instanceof Runnable) {
RVRunTime.logThreadEnd();
}
}
});
RVRunTime.logBeforeStart(5, var6);
var10000.start();
RVRunTime.logSleep();
Thread.sleep(10L);
Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "a", "I");
int var7;
int var12 = var7 = a;
RVRunTime.logFieldAcc(7, (Object)null, 1, var7, false);
PrintStream var13;
if (var12 == 0) {
byte var8 = 2;
Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "b", "I");
b = 2;
RVRunTime.logFieldAcc(10, (Object)null, 2, Integer.valueOf(var8), true);
} else {
Scheduler.beforeFieldAccess(true, "java/lang/System", "err", "Ljava/io/PrintStream;");
PrintStream var9;
var13 = var9 = System.err;
RVRunTime.logFieldAcc(11, (Object)null, 3, var9, false);
var13.println("Error! Read a != 0");
System.exit(-1);
}
Scheduler.beforeFieldAccess(true, "java/lang/System", "out", "Ljava/io/PrintStream;");
PrintStream var10;
var13 = var10 = System.out;
RVRunTime.logFieldAcc(15, (Object)null, 4, var10, false);
Scheduler.beforeFieldAccess(true, "edu/tamu/aser/tests/MyTest", "b", "I");
int var11;
int var10001 = var11 = b;
RVRunTime.logFieldAcc(16, (Object)null, 2, var11, false);
var13.println(var10001);
}
static {
byte var6 = 0;
Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "a", "I");
a = 0;
RVRunTime.logInitialWrite(20, (Object)null, 1, Integer.valueOf(var6));
byte var7 = 1;
Scheduler.beforeFieldAccess(false, "edu/tamu/aser/tests/MyTest", "b", "I");
b = 1;
RVRunTime.logInitialWrite(22, (Object)null, 2, Integer.valueOf(var7));
}
}
二、before 系列函数
在所有 beforeXX
系列的函数中,他们无一例外的调用了 beforeEvent
这个函数。
事已至此,自然要去这个 beforeEvent 里一探究竟:
/**
* Helper method called before a schedule relevant event.
*
* @param eventDesc
* {@link EventDesc} describing the schedule relevant event.
* @param pause
* whether to pause the current thread before the event.
*/
private static void beforeEvent(EventDesc eventDesc, boolean pause) {
ThreadInfo currentThreadInfo;
schedulerStateLock.lock(); //保证beforeEvent操作的原子性
try {
//...
currentThreadInfo = liveThreadInfos.get(Thread.currentThread());
if(currentThreadInfo!=null)
{
//...
if (pause) {
pausedThreadInfos.add(currentThreadInfo); // 1
}
}
} finally {
if (pause) {
schedulerWakeupCondition.signal();
}
schedulerStateLock.unlock();
}
try {
if (pause) {
if(currentThreadInfo!=null){
currentThreadInfo.getPausingSemaphore().acquire(); // 2
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 代码里面那些
schedulerStateXXX.XXX()
函数是为了保证测试代码中的原子性。 - 代码标注为1的那行代码将当前线程ThreadInfo加入了pausedThreadInfo中。ThreadInfo 是对于 Thread 类的拓展,包含了线程本身、线程当前执行行数、和一个初始值为 0 的信号量,其中信号量可以通过
getPausingSemaphore()
获得,且每一个 Thread 对应一个 ThreadInfo。 - 代码标注为 2 的地方,调用了
ThreadInfo.getPausingSemaphore().acquire()
使调用beforeEvent()
的线程阻塞。 - pause 参数是用来判断是否需要在插装点运行之前阻塞线程,即我们并不需要调度所有的节点都暂停。
点到声明 PausedThreadInfos 的地方附近
private static Map<Thread, ThreadInfo> liveThreadInfos;
private static SortedSet<ThreadInfo> pausedThreadInfos;
private static Set<ThreadInfo> blockedThreadInfos;
通过查看这些 Map 在代码中的引用情况,可以得知:
- liveThreadInfo 是程序中所有的线程
- pausedThreadInfo 是由于下一步即将运行到某种插装点之前而人为暂停的线程
- blockedThreadInfo 是由于程序运行而 block 掉的线程。
随后可以开启后台进程,配合之前的操作,进行调度。
三、调度实现
观察在 Scheduler 中的这段代码:
static {
Thread schedulerThread = new Thread(new Runnable() {
@Override
public void run() {
boolean timeout = false;
while (true) {
schedulerStateLock.lock();
try {
if (!liveThreadInfos.isEmpty()) {
if (liveThreadInfos.size() == blockedThreadInfos.size()) {
deadlockOrFinishNotifier.release();//当前存活线程数等于阻塞线程数,表示死锁或结束(0 == 0)
}
if (!pausedThreadInfos.isEmpty() &&
pausedThreadInfos.size() == liveThreadInfos.size() - blockedThreadInfos.size()) {
//选择一个要执行的线程
if (pausedThreadInfos.size() > 1){
System.out.println(1);
}
ThreadInfo chosenPausedThreadInfo = (ThreadInfo) choose(pausedThreadInfos, ChoiceType.THREAD_TO_SCHEDULE);
if(chosenPausedThreadInfo!=null)
{
pausedThreadInfos.remove(chosenPausedThreadInfo);//在pausingthread中去除选择的线程
chosenPausedThreadInfo.getPausingSemaphore().release();//并令其通行
}
//something wrong
//just release the lock, wait for the thread to be added to the paused thread
}
else if(timeout && !pausedThreadInfos.isEmpty())//JEFF
{
//..处理超时
}
timeout = !schedulerWakeupCondition.await(500, Reex_TimeUnit.MILLISECONDS);
} catch (Throwable exp) {
System.out.flush();
System.err.println("Uncaught exception in scheduler thread:");
exp.printStackTrace(System.err);
System.exit(2);
} finally {
schedulerStateLock.unlock();
}
} //end while
}
}, "Scheduler");
schedulerThread.setDaemon(true);
schedulerThread.start();
}
这个后台进程会一直循环,循环中做了这么几件事。
- 首先判断
liveThreadInfos
的大小和blockedThreadInfos
大小是否相等,如果相等,则有可能在程序中出现了死锁,或者程序运行结束,此时释放deadlockOrFinishNotifier
,运行死锁检测程序。 - 如果
pausedThreadInfos
不是一个空的Map,且liveThreadInfos
的大小等于pausedThreadInfos
和blockedThreadInfos
的大小之和,就调用strategy .choose()
方法从pausedThreadInfos
中选择出一个ThreadInfo,调用ThreadInfo.getSemaphore().release()
方法。换成人话来说,就是当前存在一些我们在调度点之前调用 acquire 而阻塞的线程,且现在程序里面所有进程不是被手动阻塞在了调度点之前,就是原来代码执行导致的阻塞,这时候我们就应该去从那些被阻塞到了调度点的线程中选择一个线程,使其放行。然后这个线程运行到下一个调度点之前,继续被阻塞,这里就会执行新的一轮放行。如此往复,便实现了对于线程的可控调度。 choose 使用了策略模式,具体实现方法不展开描述。默认使用的是MCRStrategy,这里也是使用了 IoC 控制反转。