我以为,学习java编程,关键在于三点,OOP,并发和JVM。最后两点其实是联系比较紧密的,并且是属于java语言特有的属性。之前在学习java的时候对并发的理解比较粗浅,利用这一段时间,进一步的学习了java并发的定义,原理和例子,写篇文章总结一下。
之前看过一篇关于Java多线程编程的文章,里面有一句话特别好,说的是多线程编程看上去很深奥,在学习概念,理解和设计并发程序上很容易出错。但是
从根本上来看,所谓的多线程编程,不过是JVM或者说当前的计算机体系结构无法处理好多线程下资源竞争的情况而人为加上的一些处理方法。这样的方法导致在实现相同功能时候会产生很多复杂的,让开发者难以理解或者设计缺陷,仅此而已。
有了这样的前提,我们可以认为,多线程编程无非是为了更好的压榨CPU的性能,人为设计出来的补偿机制。不过在宏观上,我们可以藐视这样的机制,但是在工作里,还是不能避免要用到它,而且还要用好它。
先说说一些基本定义,这些定义在无数的博客和书籍上都有解释,假定读者已经有所了解,这里只是枚举出最简洁的几点。
进程与线程的区别(面试题常见题,但是一般问出这个问题的面试官要么是真没实际开发多线程程序的经验,要么是对面试者比较失望,问个简单的理论问题凑个数。。。)
-
线程的状态
-
Java中多线程的实现
- Interface: Runnable, Callable, Future, ExecutorService
- Class: Thread, FutureTask
-
说明
- Thread,实现类,start()方法将线程变为可运行状态,在运行态的时候调用定义的run()方法。不过一般不会有人用定义子类的方式定义一个线程
- Runnable, 接口,通常实现这个接口,然后作为构造参数新建一个线程实例
- Callable, Java 1.5, java.util.concurrent, 与runnable类似,call()方法可以返回线程运行的状态,并且可以抛出异常
- FutureTask,包装器,处于thread和callable的中间,它通过接受Callable来创建,同时实现了Future和Runnable接口,可以检查线程的状态
Java语法中的多线程机制
synchronized 关键字
synchronized关键字是Java 1.0就有的语法元素。在Java中,所有的object实例(class也是一种object)都可以作为多线程环境下得竞争资源,所以每个oject上都有一个锁的标记,在执行关键代码的时候,对非null的object加上synchronize关键字,标记一个代码块,可以自动对某个对象加解锁。
举个小栗子:
public class SynchronizedTest {
private int count = 0;
public synchronized void increaseCount() {
count++;
}
public int getCount() {
return count;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedTest synchronizedTest = new SynchronizedTest();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 5000; i++) {
synchronizedTest.increaseCount();
}
}
}, "Thread1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 5000; i++) {
synchronizedTest.increaseCount();
}
}
}, "Thread2");
thread1.start();
thread2.start();
Thread.currentThread().join(2000);//main thread waiting for sub-threads perform
System.out.println("count is: " + synchronizedTest.getCount());
}
}
启两个线程,并发的对一个变量做自增操作,这个操作在synchronized标识下变为串行的过程,最后输出10000,如果不加synchronized,结果会小于10000。
synchronized可能产生死锁
public class DeadLock {
private Object a = new Object();
private Object b = new Object();
public static void main(String[] args) throws InterruptedException {
DeadLock deadLock = new DeadLock();
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (deadLock.a) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (deadLock.b) {
System.out.println("Thread 1 enter");
}
}
}
}, "Thread1");
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (deadLock.b) {
synchronized (deadLock.a) {
System.out.println("Thread 2 enter");
}
}
}
}, "Thread2");
thread1.start();
thread2.start();
}
}
以上,thread1在请求到a的锁之后会带着锁睡一会儿,然后再请求b的锁,但是这是b的锁已经在thread2手里了,同时thread2还在请求a的锁,变成了循环等待并且是无限等待,于是产生了死锁。运行结果是两个线程都无限的等待下去。要想解这样的死锁,可以在竞争资源上加上是否被锁的标记位,然后引入等待超时的机制,使得有一方在请求资源超时之后做出让步,把手上已有的锁也释放了,改变循环等待的状态。但是,即使有了超时机制,也需要注意有过度退让的情况存在,形象的说,好比在一个只能容纳一个人通过的窄巷里,你和另一个人迎面走来,然后你们发现这样谁也过不去,于是都高风亮节的往后退出巷子,然后等待一会儿,又很默契的一起走了进去,结果是悲剧的又发生了死锁的情况,而且会持续下去。这就需要两个线程之间需要知道对方的情况而不是盲目的退让。
Synchronize的可重入性
所以可重入性,是指在某个线程得到某个对象的锁之后,不需要额外申请该对象的锁也可以进入关键代码块。
Synchronized的JVM层实现
Synchronized在设计之初被实现为一种重量锁,每次做互斥系统开销很大。在Java 1.6之后做了优化调整,加入锁升级的机制去减小每次锁的开销。在JVM中,每个object都有一个header,保存object的一些信息,普通对象头的长度为两个字,数组对象头的长度为三个字(JVM内存字长等于虚拟机位数,32位虚拟机即32位一字,64位亦然),其中有两个bit位记录了对象的锁类型:
偏向锁
锁对象第一次被线程获取的时候,虚拟机把对象头的status设置为"01",偏向锁状态,当发生锁重入时,只需要检查MarkValue中的ThreadID是否与当前线程ID相同即可,相同即可直接重入。偏向锁的释放不需要做任何事情,这也就意味着加过偏向锁的MarkValue会一直保留偏向锁的状态,因此即便同一个线程持续不断地加锁解锁,也是没有开销的。
一般偏向锁是在有不同线程申请锁时升级为轻量锁,这也就意味着假如一个对象先被线程1加锁解锁,再被线程2加锁解锁,这过程中没有锁冲突,也一样会发生偏向锁失效,不同的是这回要先退化为无锁的状态,再加轻量锁。
引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗必须小于节省下来的CAS原子指令的性能消耗)。
偏向锁获取过程:
(1) 访问Mark Word中偏向锁的标识是否设置成1,锁标志位是否为01——确认为可偏向状态。
(2) 如果为可偏向状态,则测试线程ID是否指向当前线程,如果是,进入步骤(5),否则进入步骤(3)。
(3) 如果线程ID并未指向当前线程,则通过CAS操作竞争锁。如果竞争成功,则将Mark Word中线程ID设置为当前线程ID,然后执行(5);如果竞争失败,执行(4)。
(4)如果CAS获取偏向锁失败,则表示有竞争。当到达全局安全点(safepoint)时获得偏向锁的线程被挂起,偏向锁升级为轻量级锁,然后被阻塞在安全点的线程继续往下执行同步代码。
(5) 执行同步代码。偏向锁的释放:
偏向锁的撤销在上述第四步骤中有提到。偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
轻量级锁
“轻量级”是相对于使用操作系统互斥量来实现的传统锁而言的。但是,首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。在解释轻量级锁的执行过程之前,先明白一点,轻量级锁所适应的场景是线程交替执行同步块的情况,如果存在同一时间访问同一锁的情况,就会导致轻量级锁膨胀为重量级锁。
-
轻量级锁的加锁过程
(1)在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,官方称之为 Displaced Mark Word。这时候线程堆栈与对象头的状态如图2.1所示。
(2)拷贝对象头中的Mark Word复制到锁记录中。
(3)拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock record里的owner指针指向object mark word。如果更新成功,则执行步骤(3),否则执行步骤(4)。
(4)如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态,这时候线程堆栈与对象头的状态如图2.2所示。
(5)如果这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为“10”,Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也要进入阻塞状态。 而当前线程便尝试使用自旋来获取锁,自旋就是为了不让线程阻塞,而采用循环去获取锁的过程。
- 轻量级锁的解锁过程:
(1)通过CAS操作尝试把线程中复制的Displaced Mark Word对象替换当前的Mark Word。
(2)如果替换成功,整个同步过程就完成了。
(3)如果替换失败,说明有其他线程尝试过获取该锁(此时锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程。
重量级锁
TBD
重量级锁、轻量级锁和偏向锁之间转换
volatile 关键字
又是面试中经常会被问到的一个Java关键字,如果用volatile声明一个变量为共享的,一个线程修改了某个变量的值,这个更新的值会立即写入内存中,从而对其他线程来说是立即可见的。
然而比较坑的结果,volatile只能在很小的范围内保证互斥性,如果对volatile变量本身的操作不是线程安全的,比如++,那么同样是有问题的。
volatile仅仅用来保证该变量对所有线程的可见性,但不保证原子性
具体的使用场景参照Java 理论与实践: 正确使用 Volatile 变量。不过我肤浅的总结起来就是尽量不要用volatile来实现互斥。。。。
还有个典型的使用volatile的场景实在多线程环境下lazy load的单例模式,参考Java 单例真的写对了么?
volatile底层实现
虽然volatile不建议使用,但是还是有必要探究一下它在底层是如何实现的,因为有助于更好的理解JVM的编译机制。
为了优化性能,编译器和CPU可能对某些指令进行重排。大家都知道java代码最终会被编译成汇编指令,而一条java语句可能对应多条汇编指令。为了优化性能,CPU和编译器会对这些指令重排,volatile的变量在进行操作只会在尾部添加一个内存屏障(Memory Barrier),lock addl $0x0,(%rsp)。它可以:a) 确保一些特定操作执行的顺序; b) 影响一些数据的可见性(可能是某些指令执行后的结果)。编译器和CPU可以在保证输出结果一样的情况下对指令重排序,使性能得到优化。插入一个内存屏障,相当于告诉CPU和编译器先于这个命令的必须先执行,后于这个命令的必须后执行。内存屏障另一个作用是强制更新一次不同CPU的缓存。例如,一个写屏障会把这个屏障前写入的数据刷新到缓存,这样任何试图读取该数据的线程将得到最新值,而不用考虑到底是被哪个cpu核心或者哪颗CPU执行的。所以一旦你完成写入,任何访问这个变量的线程将会得到最新的值。而且在你写入前,会保证所有之前发生的事已经发生,并且任何更新过的数据值也是可见的,因为内存屏障会把之前的写入值都刷新到缓存。
更多volatile的细节,请看深入理解Java内存模型(四)——volatile
concurrent包
java.util.concurrent在java 1.5以后提供了另一种多线程编程的方式,主要提供了各种锁,线程池和基本类型的atomic版本。
大致的读了一下代码,以下通过一些源码探索底层的实现。
ReentrentLock
ReentrentLock和synchronized关键字类似,也是实现一种可重入的锁。
ReentrentLock实现了Lock接口,内部定义了两个Sync类,FairSync和NonFairSync,分别实现公平锁和非公平锁。 该类又继承自AbstractQueuedSynchronizer类。类图如下:
AbstractQueuedSynchronizer实际上在内部维护了一个列表形式的等待队列,每个node都记录了一个线程和等待的状态。
关键代码:
ReentrantLock
public void lock() {
sync.lock();
}
Sync
abstract void lock();
抽象方法,在FairSync和NonFairSync里都有相应的实现,先看FairSync
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
关键的地方来了,tryAcquire里有个compareAndState方法。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
它会调用unsafe的compareAndSwapInt,查看一下Unsafe类,这是一系列基于JNI的API定义类,其中有一些compareAndSwap方法,缩写为CAS。这个方法会在CPU级别来支持原子性。
C++代码
// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
// alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果)。
更多CAS的原理,可以参考 JAVA CAS原理深度分析
回到FairSync中,tryAcquire方法会检查:
- 如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程
- 如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1(可重入)。如果不是, 则将该线程封装在一个Node内,并加入到等待队列中去, 等待被其前一个线程节点唤醒。
如果第一时间没有获取到锁,没关系,接着
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这种死循环的方式申请锁有个好听的名字,叫“自旋”。
lock看完了再看看unlock,原理类似:
Reentrant
public void unlock() {
sync.release(1);
}
Sync
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
LockSupport.park()、LockSupport.unpark()底层还是调用Unsafe累的park/unpark方法,作用分别是阻塞线程和解除阻塞线程,且park()和unpark()不会遇到“Thread.suspend ()和 Thread.resume所可能引发的死锁”问题。