一、提高锁性能的几点建议
锁的竞争会导致程序整体性能的下降,如何降低锁竞争带来的副作用是我们必须考虑的。下面提出几点锁优化的建议:
1.1 减小锁持有时间
单个线程对锁的持有时间与系统的性能密切相关。如果线程持有锁的时间越长,那么锁的竞争程度就会越激烈。因此,应尽可能减少线程对某个锁的占有时间,进而减少线程间互斥的可能。看下面这段代码:
public synchronized void syncMethod() {
othercode1();
mutexMethod();
othercode2();
}
假设只有mutexMethod()
有同步需要,而othercode1()
和othercode2()
不需要做同步控制。如果othercode1()
和othercode2()
都是重量级的方法,那么就会花费较长的CPU时间。改进后的代码如下:
public void syncMethod() {
othercode1();
synchronized(this) {
mutexMethod();
}
othercode2();
}
只对需要同步的方法进行同步控制,这样锁的占用时间会大大减少,进而提高系统的并行性能。
1.2 减小锁粒度
对于HashMap来说,最重要的两个方法就是get()和put()。一种最自然的的想法就是对整个HashMap加锁,必然可以得到一个线程安全的对象。但是这样做,我们就认为加锁粒度太大。对于ConcurrentHashMap,它内部进一步细分了若干个小的HashMap,称之为段(SEGMENT)。默认情况下,一个ConcurrentHashMap被进一步细分为16个段。
如果需要在ConcurrentHashMap中增加一个新的表项,并不是将整个HashMap加锁,而是首先根据hashcode得到该表项应该被存放到哪个段中,然后对该段加锁,并完成put()操作。在多线程环境中,如果多个线程同时进行put()操作,只要被加入的表项不存放在同一个段中,则线程间便可以做到真正的并行。下面代码展示了put()
操作的过程:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
//获取段的序号
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//得到段
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
但是这样会存在一个问题:当系统需要取得全局锁时,其消耗的资源会比较多。仍然以ConcurrentHashMap类为例,虽然其put()方法很好地分离了锁,但是当试图访问ConcurrentHashMap全局信息时,就会需要同时取得所有段的锁方能顺利实施。比如ConcurrentHashMap的size()方法,它将返回ConcurrentHashMap的有效表项的数量,即ConcurrentHashMap的全部有效表项之和。要获得这个信息需要取得所有子段的锁。下面是size()
方法的部分代码:
sum = 0;
for(int i=0; i<segments.length; ++i)
segments[i].lock();
for(int i=0; i<segments.length; ++i)
sum += segments[i].count;
for(int i=0; i<segments.length; ++i)
segments[i].unlock();
可以看到在计算总数时,先要获得所有段的锁,然后再求和。但是,ConcurrentHashMap的size()方法并不总是这样执行,事实上,size()方法会先使用无锁的方式求和,如果失败才会尝试这种加锁的方法。
1.3 读写分离锁来替换独占锁
在读多写少的场合,读写锁对系统性能是很有好处的。因为如果系统在读写数据时均使用独占锁,那么读操作和写操作间、读操作和读操作间、写操作和写操作间均不能做到真正的并发,并且需要相互等待。而读操作本身不会影响数据的完整性和一致性。因此,理论上,在大部分情况下,应该可以允许多线程同时读,读写锁正是实现了这种功能。
1.4 锁分离
锁分离是读写锁的而进一步延伸。 一个典型的案例就是java.util.concurrent.LinkedBlockingQueue的实现。
在LinkedBlockingQueue的实现中,take()函数和put()函数分别实现了从队列中取得数据和往队列中增加数据的功能。虽然两个函数都对当前队列进行了修改操作,但由于LinkedBlockingQueue是基于链表的,因此,两个操作分别作用于队列的前端和尾端,从理论上说,并不冲突。
如果使用独占锁,则要求在两个操作进行时获取当前队列的独占锁,那么take()和put()操作就不可能真正的并发,在运行时,它们会彼此等待对方释放锁资源。在这种情况下,锁竞争会相对比较激烈,从而影响程序在高并发时的性能。因此,在JDK的实现中,并没有采用这样的方式,取而代之的是两把不同的锁,分离了take()和put()操作。
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
take()
与put()
函数相互独立,不存在锁的竞争关系。只需要在take()和take()间、put()和put()间分别对takeLock和putLock进行竞争。从而,削弱了锁竞争的可能性。
函数take()的实现如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); //不能有两个线程同时取数据
try {
while (count.get() == 0) { //如果当前没有可用数据,一直等待
notEmpty.await(); //等待,put()操作的通知
}
x = dequeue(); //取得第一个数据
c = count.getAndDecrement(); //数量减1,原子操作
if (c > 1)
notEmpty.signal(); //通知其他take()操作
} finally {
takeLock.unlock(); //释放锁
}
if (c == capacity)
signalNotFull(); //通知put()操作,已有空余空间
return x;
}
函数put()的实现如下:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); //不能有两个线程同时进行put()
try {
while (count.get() == capacity) { //如果队列已经满了
notFull.await(); //等待
}
enqueue(node); //插入数据
c = count.getAndIncrement(); //更新总数,变量c是count加1前的值
if (c + 1 < capacity)
notFull.signal(); //有足够的空间,通知其他线程
} finally {
putLock.unlock(); //释放锁
}
if (c == 0)
signalNotEmpty(); //插入成功后,通知take()操作取数据
}
1.4 锁粗化
通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,即在使用完公共资源后,应该立即释放锁。只有这样,等待在这个锁上的其他线程才能尽早地获得资源执行任务。但是,如果对同一个锁不停地进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而不利于性能的优化。
为此,虚拟机在遇到一连串连读地对同一锁不断进行请求和释放的操作时,便会把所有的锁作整合成对锁的一次请求,从而减少对锁的请求同步次数,这个操作叫做锁是粗化。比如代码段:
public void demoMethod() {
synchronized(lock) {
//do sth
}
//做其他不需要的同步的工作,但能很快执行完毕
synchronized(lock) {
//do sth
}
}
按照锁粗化的思想,整合后代码如下:
public void demoMethod() {
synchronized(lock) {
//do sth
//做其他不需要的同步的工作,但能很快执行完毕
}
}
二、Java虚拟机对锁优化所做的努力
2.1 偏向锁
偏向锁的核心思想是:如果一个线程获得了锁,那么锁就进入了偏向模式。当这个线程再次请求锁的时候无需再去做任何同步操作,节省了锁的申请操作,提高程序的性能。偏向锁不适合锁竞争激烈的情况。使用Java虚拟机参数-XX:UseBiasedLocking
可以开启偏向锁。
2.2 轻量级锁
如果偏向锁失败,虚拟机并不会立即挂起线程。它还会使用一种称为轻量级锁的优化手段。轻量级锁的操作也很轻便,它只是简单地将对象头部作为指针,指向持有锁的线程堆栈的内部,来判断一个线程是否持有对象锁。如果线程获得轻量级锁成功,则可以顺利进入临界区。如果轻量级锁加锁失败,则表示其他线程抢先争夺到了锁,那么当前线程的锁请求就会膨胀为重量级锁。
2.3 自旋锁
锁膨胀后,虚拟机为了避免线程真实地在操作系统层面挂起,虚拟机还会在做最后的努力--自旋锁。由于当前线程暂时无法获得锁,但是什么时候可以获得锁是一个未知数。也许在几个CPU时钟周期后,就可以得到锁。如果这样,简单粗暴地挂起线程可能是一种得不偿失的操作。因此,系统会进行一次赌注:它会假设在不久的将来,线程可以得到这把锁。因此,虚拟机会让当前线程做几个空循环,在经过若干次循环后,如果可以得到锁,那么就顺利进入临界区。如果还不能得到锁,才会真实地将线程在操作系统层面挂起。
2.4 锁清除
锁消除是一种更彻底的锁优化。Java虚拟机在JIT编译时,通过对上下文的扫描,去除不可能存在共享资源竞争的锁。通过锁消除,可以节省毫无意义的请求锁时间。
例如,在一个不可能存在并发竞争的场合使用Vector,而Vector内部使用了Synchronized请求锁。比如下面的代码:
public String[] createStrings() {
Vector<String> v = new Vector<String>();
for(int i=0; i<100; i++) {
v.add(Integer.toString(i));
}
return v.toArray(new String[]{});
}
v属于线程私有数据,不可能被其它线程访问。这种情况下,Vector内部所有加锁同步都是没有必要的,虚拟机检测到这种情况就会将这些无用的锁清除掉。
三、ThreadLocal
ThreadLocal是一个本地线程副本变量工具类。主要用于将私有线程和该线程存放的副本对象做一个映射,各个线程之间的变量互不干扰,在高并发场景下,可以实现无状态的调用,特别适用于各个线程依赖不通的变量值完成操作的场景。
3.1 ThreadLocal实现原理
- 每个Thread线程内部都有一个Map;
- Map里面存储线程本地对象(key)和线程的变量副本(value)
- Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。
所以对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成了副本的隔离,互不干扰。
Thread线程内部的Map在类中描述如下:
public class Thread implements Runnable {
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
}
ThreadLocal如何保证这些对象只被当前线程所访问,我们需要关注的是ThreadLocal的set()方法和get()方法。
- get()方法
get()方法用于获取当前线程的副本变量值。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null)
return (T)e.value;
}
return setInitialValue();
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
protected T initialValue() {
return null;
}
get()方法是先取得当前线程的ThreadLocalMap对象。然后,通过将自己作为key取得内部的实际数据。
- set()方法
set()方法用于保存当前线程的副本变量值。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
在set时,首先获得当前线程对象,然后通过getMap()拿到线程的ThreadLocalMap,并将值设入ThreadLocalMap中。
当线程退出时,Thread类会进行一些清理工作,其中就包括清理ThreadLocalMap:
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/* Aggressively null out all reference fields: see bug 4006245 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
如果使用线程池,意味着当前线程未必会退出(比如固定大小的线程池,线程总是存在)如果这样,将一些大大的对象设置到ThreadLocal中(它实际保存在线程持有的threadLocals Map内),可能会使系统出现内存泄漏的可能。此时,如果希望及时回收对象,最好使用ThreadLocal.remove()方法将整个变量移除。
既然Key是弱引用,那么我们要做的事,就是在调用ThreadLocal的get()、set()方法时完成后再调用remove方法,将Entry节点和Map的引用关系移除,这样整个Entry对象在GC Roots分析后就变成不可达了,下次GC的时候就可以被回收。如果使用ThreadLocal的set方法之后,没有显示的调用remove方法,就有可能发生内存泄露,所以养成良好的编程习惯十分重要,使用完ThreadLocal之后,记得调用remove方法。