java多线程的常用锁
synchronized 关键字
若是对象锁,则每个对象都持有一把自己的独一无二的锁,且对象之间的锁互不影响 。若是类锁,所有该类的对象共用这把锁。
一个线程获取一把锁,没有得到锁的线程只能排队等待;
synchronized 是可重入锁,避免很多情况下的死锁发生。
synchronized 方法若发生异常,则JVM会自动释放锁。
锁对象不能为空,否则抛出NPE(NullPointerException)
同步本身是不具备继承性的:即父类的synchronized 方法,子类重写该方法,分情况讨论:没有synchonized修饰,则该子类方法不是线程同步的。(PS :涉及同步继承性的问题要分情况)
synchronized本身修饰的范围越小越好。毕竟是同步阻塞。跑不快还占着超车道…
synchronized 底层对应的 JVM 模型为 objectMonitor,使用了3个双向链表来存放被阻塞的线程:_cxq(Contention queue)、_EntryList(EntryList)、_WaitSet(WaitSet)。
当线程获取锁失败进入阻塞后,首先会被加入到 _cxq 链表,_cxq 链表的节点会在某个时刻被进一步转移到 _EntryList 链表。
当持有锁的线程释放锁后,_EntryList 链表头结点的线程会被唤醒,该线程称为 successor(假定继承者),然后该线程会尝试抢占锁。
当我们调用 wait() 时,线程会被放入 _WaitSet,直到调用了 notify()/notifyAll() 后,线程才被重新放入 _cxq 或 _EntryList,默认放入 _cxq 链表头部。
objectMonitor 的整体流程如下图:
锁升级的流程如下图:
注:图片转载出处(https://zhuanlan.zhihu.com/p/378429667)
ReentrantLock JDK锁
ReentrantLock先通过CAS尝试获取锁,如果获取了就将锁状态state设置为1
如果此时锁已经被占用,
被自己占用:判断当前的锁是否是自己占用了,如果是的话就锁计数器会state++(可重入性)
被其他线程占用:该线程加入AQS队列并wait()
当前驱线程的锁被释放,一直到state==0,挂在CLH队列为首的线程就会被notify(),然后继续CAS尝试获取锁,此时:
非公平锁,如果有其他线程尝试lock(),有可能被其他刚好申请锁的线程抢占
公平锁,只有在CLH队列头的线程才可以获取锁,新来的线程只能插入到队尾。
ReadAndWriteLock 读写锁
每一个ReentrantLock自身维护一个AQS队列记录申请锁的线程信息;
通过大量CAS保证多个线程竞争锁的时候的并发安全;
可重入的功能是通过维护state变量来记录重入次数实现的。
公平锁需要维护队列,通过AQS队列的先后顺序获取锁,缺点是会造成大量线程上下文切换;
非公平锁可以直接抢占,所以效率更高;
CountDownLatch 门栓
1.使用方法
CountDownLatch,每执行一次countDown() 就会将设置的值-1,减到0,.await()的方法即可往下执行
void test() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
System.out.println("线程启动");
try {
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("线程停止");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
countDownLatch.await();
System.out.println("主线程停止");
}
2.底层实现
底层是内部维护了一个Sync并且继承了AQS(阻塞队列+CAS操作)
CAS维护状态位,并且利用CAS操作向AQS的阻塞队列的队尾添加元素
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
// 获取当前状态
int getCount() {
return getState();
}
// 当前状态是否往下继续运行
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 修改状态值
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CyclicBarrier 线程栅栏
1.使用方法
CyclicBarrier,每执行一次await() 就会将设置的值+1,加到设置的值,.await()的方法即可往下执行
@Test
void test() throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
new Thread(()->{
System.out.println("线程启动");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
System.out.println("线程停止");
}).start();
}
Thread.sleep(10000);
System.out.println("主线程停止");
}
2.底层实现
CyclicBarrier中包含ReentrantLock和Condition
用ReentrantLock保证count值的原子性操作,Condition来唤醒等待的线程阻塞队列
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
LongAdder 线程安全的数值类
1.使用方法
使用LongAdder来替代超高并发下的AtomicLong,使用adder()和sum()进行使用
@Test
void test() throws InterruptedException {
LongAdder adder = new LongAdder();
for (int i = 0; i < 1000; i++) {
new Thread(()->{
System.out.println("线程启动");
adder.increment();
System.out.println("线程停止");
}).start();
}
Thread.sleep(10000);
System.out.println(adder.sum());
}
2.底层实现
LongAdder相对于有多个AtomicLong,将高并发降低为cell,每个cell内部又会用到CAS操作来实现原子性操作,最终使用sum()求和来获取最终数值
LongAdder在没有线程竞争的时候,只使用base值,此时的情况就类似与AtomicLong。但LongAdder的高明之处在于,发生线程竞争时,便会使用到Cell数组,所以该数组是惰性加载的。
abstract class Striped64 extends Number {
@sun.misc.Contended static final class Cell {}
}
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
Phaser 阶段器
1.使用方法
与CountDownLanuch和CyclicBarrier不同的是 里面的parties是动态配置,Phaser可以动态注册需要协调的线程,相比CountDownLatch和CyclicBarrier就会变得更加灵活。
public class Phaser001 {
public static void main(String[] args) {
Phaser phaser = new Phaser();
IntStream.rangeClosed(1,10).forEach(i->new MyTask(phaser).start());
//等待注册的任务全部完成
phaser.arriveAndAwaitAdvance();
System.out.println("任务全部完成");
}
}
class MyTask extends Thread{
public Phaser phaser;
public MyTask(Phaser phaser) {
this.phaser = phaser;
this.phaser.register();
System.out.println("任务注册");
}
@Override
public void run() {
System.out.println("开始执行任务");
System.out.println("第一阶段任务执行完成");
//当前注册任务已经到达
this.phaser.arrive();
}
}
2.底层实现
cas操作
private int doRegister(int registrations) {
// adjustment to state
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
else if (parent == null) { // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
Semaphore 信号量
1.使用方法
有点像队列的感觉,定义一个总的信号量,若当前acquire线程达到信号量,则再进行acquire就会进入等待队列,可以用release释放,具体例子像数据库的链接池
public class Semaphore001 extends Thread{
static Semaphore semaphore;
@Override
public void run() {
try {
//开启获取
this.semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"开始执行");
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName()+"结束执行");
this.semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
semaphore = new Semaphore(2);
IntStream.rangeClosed(0,20).forEach(i ->{
new Semaphore001().start();
});
}
}
2.底层实现
AQS实现
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
Exchanger 交换者
1.使用方法
第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
public class Exchanger001 {
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"线程启动");
int a = Integer.parseInt(exchanger.exchange(1).toString());
System.out.println(Thread.currentThread().getName()+"获取值为"+a);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+"线程启动");
Thread.sleep(3000);
int a = Integer.parseInt(exchanger.exchange(2).toString());
System.out.println(Thread.currentThread().getName()+"获取值为"+a);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
底层实现
利用ThreadLocal实现
LockSupport
1.使用方法
利用park()和unpark()实现对指定线程的阻塞和唤醒
public class LockSupport001{
static Thread t1;
static Thread t2;
public static void main(String[] args) {
t1 = new Thread(()->{
while(true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LockSupport.unpark(t2);
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "线程0结束");
}
});
t2 = new Thread(()->{
while(true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LockSupport.park();
System.out.println(Thread.currentThread().getName() + "线程1结束");
LockSupport.unpark(t1);
}
});
t1.start();
t2.start();
}
}
2.底层实现
public static void park() {
UNSAFE.park(false, 0L);
}