java多线程的常用锁

java多线程的常用锁

synchronized 关键字

若是对象锁,则每个对象都持有一把自己的独一无二的锁,且对象之间的锁互不影响 。若是类锁,所有该类的对象共用这把锁。
一个线程获取一把锁,没有得到锁的线程只能排队等待;
synchronized 是可重入锁,避免很多情况下的死锁发生。
synchronized 方法若发生异常,则JVM会自动释放锁。
锁对象不能为空,否则抛出NPE(NullPointerException)
同步本身是不具备继承性的:即父类的synchronized 方法,子类重写该方法,分情况讨论:没有synchonized修饰,则该子类方法不是线程同步的。(PS :涉及同步继承性的问题要分情况)
synchronized本身修饰的范围越小越好。毕竟是同步阻塞。跑不快还占着超车道…

synchronized 底层对应的 JVM 模型为 objectMonitor,使用了3个双向链表来存放被阻塞的线程:_cxq(Contention queue)、_EntryList(EntryList)、_WaitSet(WaitSet)。

当线程获取锁失败进入阻塞后,首先会被加入到 _cxq 链表,_cxq 链表的节点会在某个时刻被进一步转移到 _EntryList 链表。

当持有锁的线程释放锁后,_EntryList 链表头结点的线程会被唤醒,该线程称为 successor(假定继承者),然后该线程会尝试抢占锁。

当我们调用 wait() 时,线程会被放入 _WaitSet,直到调用了 notify()/notifyAll() 后,线程才被重新放入 _cxq 或 _EntryList,默认放入 _cxq 链表头部。

objectMonitor 的整体流程如下图:

20210605142829731.png

锁升级的流程如下图:
20210605173039809.png

注:图片转载出处(https://zhuanlan.zhihu.com/p/378429667)

ReentrantLock JDK锁

ReentrantLock先通过CAS尝试获取锁,如果获取了就将锁状态state设置为1
如果此时锁已经被占用,
被自己占用:判断当前的锁是否是自己占用了,如果是的话就锁计数器会state++(可重入性)
被其他线程占用:该线程加入AQS队列并wait()
当前驱线程的锁被释放,一直到state==0,挂在CLH队列为首的线程就会被notify(),然后继续CAS尝试获取锁,此时:
非公平锁,如果有其他线程尝试lock(),有可能被其他刚好申请锁的线程抢占
公平锁,只有在CLH队列头的线程才可以获取锁,新来的线程只能插入到队尾。

ReadAndWriteLock 读写锁

每一个ReentrantLock自身维护一个AQS队列记录申请锁的线程信息;

通过大量CAS保证多个线程竞争锁的时候的并发安全;

可重入的功能是通过维护state变量来记录重入次数实现的。

公平锁需要维护队列,通过AQS队列的先后顺序获取锁,缺点是会造成大量线程上下文切换;

非公平锁可以直接抢占,所以效率更高;

CountDownLatch 门栓

1.使用方法

CountDownLatch,每执行一次countDown() 就会将设置的值-1,减到0,.await()的方法即可往下执行

void test() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(3);
    for (int i = 0; i < 3; i++) {
        new Thread(()->{
            System.out.println("线程启动");
            try {
                Thread.sleep(1000);
                countDownLatch.countDown();
                System.out.println("线程停止");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
    countDownLatch.await();
    System.out.println("主线程停止");
}

2.底层实现

底层是内部维护了一个Sync并且继承了AQS(阻塞队列+CAS操作)

CAS维护状态位,并且利用CAS操作向AQS的阻塞队列的队尾添加元素

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    // 获取当前状态
    int getCount() {
        return getState();
    }

    // 当前状态是否往下继续运行
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    // 修改状态值
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

CyclicBarrier 线程栅栏

1.使用方法

CyclicBarrier,每执行一次await() 就会将设置的值+1,加到设置的值,.await()的方法即可往下执行

@Test
void test() throws InterruptedException {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    for (int i = 0; i < 10; i++) {
        Thread.sleep(1000);
        new Thread(()->{
            System.out.println("线程启动");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
            System.out.println("线程停止");
        }).start();
    }
    Thread.sleep(10000);
    System.out.println("主线程停止");
}

2.底层实现

CyclicBarrier中包含ReentrantLock和Condition

用ReentrantLock保证count值的原子性操作,Condition来唤醒等待的线程阻塞队列

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

LongAdder 线程安全的数值类

1.使用方法

使用LongAdder来替代超高并发下的AtomicLong,使用adder()和sum()进行使用

@Test
void test() throws InterruptedException {
    LongAdder adder = new LongAdder();
    for (int i = 0; i < 1000; i++) {
        new Thread(()->{
            System.out.println("线程启动");
            adder.increment();
            System.out.println("线程停止");
        }).start();
    }
    Thread.sleep(10000);
    System.out.println(adder.sum());
}

2.底层实现

LongAdder相对于有多个AtomicLong,将高并发降低为cell,每个cell内部又会用到CAS操作来实现原子性操作,最终使用sum()求和来获取最终数值

LongAdder在没有线程竞争的时候,只使用base值,此时的情况就类似与AtomicLong。但LongAdder的高明之处在于,发生线程竞争时,便会使用到Cell数组,所以该数组是惰性加载的。

abstract class Striped64 extends Number {

    @sun.misc.Contended static final class Cell {}
}
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

Phaser 阶段器

1.使用方法

与CountDownLanuch和CyclicBarrier不同的是 里面的parties是动态配置,Phaser可以动态注册需要协调的线程,相比CountDownLatch和CyclicBarrier就会变得更加灵活。

public class Phaser001 {
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        IntStream.rangeClosed(1,10).forEach(i->new MyTask(phaser).start());

        //等待注册的任务全部完成
        phaser.arriveAndAwaitAdvance();

        System.out.println("任务全部完成");
    }

}

class MyTask extends Thread{

    public Phaser phaser;

    public MyTask(Phaser phaser) {
        this.phaser = phaser;
        this.phaser.register();
        System.out.println("任务注册");
    }

    @Override
    public void run() {
        System.out.println("开始执行任务");
        System.out.println("第一阶段任务执行完成");
        //当前注册任务已经到达
        this.phaser.arrive();
    }
}

2.底层实现

cas操作

private int doRegister(int registrations) {
    // adjustment to state
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    for (;;) {
        long s = (parent == null) ? state : reconcileState();
        int counts = (int)s;
        int parties = counts >>> PARTIES_SHIFT;
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            throw new IllegalStateException(badRegister(s));
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            break;
        if (counts != EMPTY) {                  // not 1st registration
            if (parent == null || reconcileState() == s) {
                if (unarrived == 0)             // wait out advance
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    break;
            }
        }
        else if (parent == null) {              // 1st root registration
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                break;
        }
        else {
            synchronized (this) {               // 1st sub registration
                if (state == s) {               // recheck under lock
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        break;
                    // finish registration whenever parent registration
                    // succeeded, even when racing with termination,
                    // since these are part of the same "transaction".
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    break;
                }
            }
        }
    }
    return phase;
}

Semaphore 信号量

1.使用方法

有点像队列的感觉,定义一个总的信号量,若当前acquire线程达到信号量,则再进行acquire就会进入等待队列,可以用release释放,具体例子像数据库的链接池

public class Semaphore001 extends Thread{

    static Semaphore semaphore;

    @Override
    public void run() {
        try {
            //开启获取
            this.semaphore.acquire();
            System.out.println(Thread.currentThread().getName()+"开始执行");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName()+"结束执行");
            this.semaphore.release();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

    }

    public static void main(String[] args) {
        semaphore = new Semaphore(2);
        IntStream.rangeClosed(0,20).forEach(i ->{
            new Semaphore001().start();
        });
    }
}

2.底层实现

AQS实现

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Exchanger 交换者

1.使用方法

第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

public class Exchanger001 {
    public static void main(String[] args) {
        Exchanger exchanger = new Exchanger();
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"线程启动");
                int a  = Integer.parseInt(exchanger.exchange(1).toString());
                System.out.println(Thread.currentThread().getName()+"获取值为"+a);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();

        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"线程启动");
                Thread.sleep(3000);
                int a  = Integer.parseInt(exchanger.exchange(2).toString());
                System.out.println(Thread.currentThread().getName()+"获取值为"+a);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

底层实现

利用ThreadLocal实现

LockSupport

1.使用方法

利用park()和unpark()实现对指定线程的阻塞和唤醒

public class LockSupport001{
    static Thread t1;
    static Thread t2;
    public static void main(String[] args) {
        t1 = new Thread(()->{
            while(true) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                LockSupport.unpark(t2);
                LockSupport.park();
                System.out.println(Thread.currentThread().getName() + "线程0结束");
            }
        });

        t2 = new Thread(()->{
            while(true) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                LockSupport.park();
                System.out.println(Thread.currentThread().getName() + "线程1结束");
                LockSupport.unpark(t1);
            }
        });
        t1.start();
        t2.start();
    }
}

2.底层实现

public static void park() {
    UNSAFE.park(false, 0L);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容