java并发基础

创建线程的方式

创建线程主要有三种方法 : 继承Thread类,实现Runnable接口,实现Callable接口。

方法一,直接使用Thread

 //创建线程
        Thread t = new Thread(){
            
            @Override
            public  void  run(){
                System.out.println("======");
            }
        };
         t.setName("yi");
        //启动线程
        t.start();

t.start()调用后才能和操作系统关联起来

方法二,使用 Runnable 配合Thread

Runnable runnable = () -> {
            System.out.println("---------------");
            System.out.println("12");
            System.out.println("12");
            System.out.println("12");
            System.out.println("12");
        };

        Thread t1 = new Thread(runnable);
        t1.start();

方法三,实现Callable接口

这种方式创建线程是可以获得子线程返回结果的,是Runnable的加强版。

public class ThreadThree implements Callable<Long> {
    @Override
    public Long call() throws Exception {
        System.out.println("thread three");
        return 23L;
    }
}

常用方法

方法名 static 功能说明 注意
start() 启动一个新线程,在新的线程运行run方法中的代码 start方法只是让线程进入就绪,里面的代码不一定立刻运行(cpu时间片还没有分给它)。每个线程对象的start方法只能调用一次
run() 新线程启动后会调用的方法 如果在否早Thread对象时传递了Runnable参数,则线程启动后会调用Runnable中的run方法,否则不执行任何线程操作。
join() 等待线程运行结束
join(long n) 等待线程运行结束,最多等待n毫秒
interrupt() 打断线程 如果打断线程整在sleep,wait,join会导致被打断的线程抛出InterruptedException,并清除打断标记;如果打断正在运行的线程,则会设置打断标记;park的线程被打断,也会设置打断标记
isInterrupted() 线程是否被打断过,不会清除打断标记 不会清除打断标记,正常线程被打断后,不会停止;需要使用打断标记来判断
interrupted() static 判断当前线程是否被打断 会清除 打断标记

两阶段终止

两阶段终止

线程状态

(操作系统)层面

  • 初始状态:仅仅实在语言层面创建了线程对象,还未与操作系统线程相关联
  • 就绪状态:该线程已经被创建(与操作系统线程关联),可由cpu调度执行
  • 运行状态:获取cpu时间片运行中的状态
  • 阻塞状态:缺少资源的状态
  • 终止状态:线程已经执行完毕,生命周期已经结束,不会再转换为其他状态

(java api)层面

  • new:线程刚被创建,但是还没有调用start()方法
  • runnable:当调用了start()后的状态,对应着操纵系统中的就绪状态、运行状态、和阻塞状态
  • blocked:
  • waiting:
  • timed_waiting:
  • terminated:终止状态
线程六种状态

状态转换

状态转换

情况一:NEW –> RUNNABLE

当调用了 t.start() 方法时,由 NEW –> RUNNABLE

情况二: RUNNABLE <–> WAITING

  • 当调用了t 线程用 synchronized(obj) 获取了对象锁后,调用 obj.wait() 方法时,t 线程从 RUNNABLE –> WAITING

  • 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时,会在 WaitSet 等待队列中出现锁竞争,非公平竞争

    • 竞争锁成功,t 线程从 WAITING –> RUNNABLE

    • 竞争锁失败,t 线程从 WAITING –> BLOCKED

情况三:RUNNABLE <–> WAITING

当前线程调用 t.join() 方法时,当前线程从 RUNNABLE –> WAITING 注意是当前线程在 t 线程对象的监视器上等待 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING –> RUNNABLE

情况四: RUNNABLE <–> WAITING

当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE –> WAITING 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING –> RUNNABLE

情况五: RUNNABLE <–> TIMED_WAITING

t 线程用 synchronized(obj) 获取了对象锁后 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE –> TIMED_WAITING t 线程等待时间超过了 n 毫秒,或调用 obj.notify() , obj.notifyAll() , t.interrupt() 时 竞争锁成功,t 线程从 TIMED_WAITING –> RUNNABLE 竞争锁失败,t 线程从 TIMED_WAITING –> BLOCKED

情况六:RUNNABLE <–> TIMED_WAITING

当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE –> TIMED_WAITING 注意是当前线程在 t 线程对象的监视器上等待 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING –> RUNNABLE

情况七:RUNNABLE <–> TIMED_WAITING

当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE –> TIMED_WAITING 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING –> RUNNABLE

情况八:RUNNABLE <–> TIMED_WAITING

当前线程调用 LockSupport.parkNanos(long nanos) 或 LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE –> TIMED_WAITING 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING–> RUNNABLE

情况九:RUNNABLE <–> BLOCKED

t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE –> BLOCKED 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED –> RUNNABLE ,其它失败的线程仍然 BLOCKED

情况十: RUNNABLE <–> TERMINATED

当前线程所有代码运行完毕,进入 TERMINATED

共享模型之管程(Monitor)

i++实际的jvm字节码指令

getstatic   i //获取静态变量i的值
iconst_1        //准备常量1
iadd            //自增
putstatic   i    //将修改后的值存入静态变量i

方法上的synchronized

 class  test{  //锁成员方法   等价于锁当前对象
        public synchronized void test(){}
    }

等价于
 class  test{
        public  void test(){
            synchronized(this){
            }
        }
}



class  Test{   //锁静态方法  等价于锁当前类对象
    public synchronized static void test(){}
}

等价于
    
class  Test{
        public  static void test(){
            synchronized(Test.class){
                
            }
        }
    }


偏向锁的撤销

将对象从可偏向变成不可偏向状态

  1. 调用对象的hashCode
  2. 其他线程使用对象
  3. 调用wait/notify

sleep和wait的区别

wait sleep
同步 只能在同步上下文中调用wait方法,。否则或抛出iiegalMonitorStateException异常 不与需要在同步方法或者同步块中调用
作用对象 wait方法定义在Object类中,作用于对象本身 sleep方法定义在Java.lang.Thread中,作用于当前线程
释放锁资源
唤醒条件 其他线程调用对象的notify()或者notifyAll()方法 超时或者调用interrupt方法体
方法属性 wait是实例方法 sleep是静态方法

park和unpark 与Object的wait & notify区别

  • wait,notify和notifyAll必须配合Object Monitor一起使用,而unpark不必
  • park & unpark 是以线程为单位来阻塞唤醒线程,而notify只能随机唤醒一个等待线程,notiifyAll是唤醒所有的等待进程,就不那么精确
  • park & unpark 可以先unpark,而wait & notify 不能先notify

死锁和活锁

死锁:竞争对方已拥有的资源 可使用顺序加锁的方式解决

活锁:任务或者执行者没有被阻塞,由于某些条件没有被满足,导致一直重复尝试,失败,尝试,失败。 可以使用先来先服务方式解决

ReentrantLock

相对于synchronized 有如下特点:

  • 可中断
  • 可以设置超时时间
  • 可以设置公平锁 (先进先出) 默认为非公平锁
  • 支持多个条件变量 (等价于多个waitSet)
  • 对象的级别保护临界区

与synchronized一样,都支持可重入

可重入

同一个线程吐过首次获得了这把锁,那么因为她是这把锁的拥有者,因此有权利再次获取这把锁

如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住

可打断

如果某个线程处于阻塞状态,可以调用其interrup他方法让其停止阻塞,获得锁失败

简而言之就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行

调用 lock.lockInterruptibly();方法 如果使用lock.lock()方法不可打断

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {
            try {
                // 加锁,可打断锁
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 被打断,返回,不再向下执行
                return;
            }finally {
                // 释放锁
                lock.unlock();
            }

        });

        lock.lock();
        try {
            t1.start();
            Thread.sleep(1000);
            // 打断
            t1.interrupt();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

锁超时

使用 lock.tryLock 方法会返回获取锁是否成功。如果成功则返回 true ,反之则返回 false 。
并且 tryLock 方法可以指定等待时间,参数为:tryLock(long timeout, TimeUnit unit), 其中 timeout 为最长等待时间,TimeUnit 为时间单位
简而言之就是:获取锁失败了、获取超时了或者被打断了,不再阻塞,直接停止运行。
不设置等待时间

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {
            // 未设置等待时间,一旦获取失败,直接返回false
            if(!lock.tryLock()) {
                System.out.println("获取失败");
                // 获取失败,不再向下执行,返回
                return;
            }
            System.out.println("得到了锁");
            lock.unlock();
        });


        lock.lock();
        try{
            t1.start();
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

设置等待时间

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(() -> {
            try {
                // 判断获取锁是否成功,最多等待1秒
                if(!lock.tryLock(1, TimeUnit.SECONDS)) {
                    System.out.println("获取失败");
                    // 获取失败,不再向下执行,直接返回
                    return;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 被打断,不再向下执行,直接返回
                return;
            }
            System.out.println("得到了锁");
            // 释放锁
            lock.unlock();
        });


        lock.lock();
        try{
            t1.start();
            // 打断等待
            t1.interrupt();
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

公平锁

在线程获取锁失败,进入阻塞队列时,先进入的会在锁被释放后先获得锁。这样的获取方式就是公平的。

// 默认是不公平锁,需要在创建时指定为公平锁
ReentrantLock lock = new ReentrantLock(true);

条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒

使用要点:

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执
  • 使用await后要使用signal来唤醒

同步模式之顺序控制

wait/notify实现

public class Code_32_Test {

    public static void main(String[] args) {
        WaitAndNotify waitAndNotify = new WaitAndNotify(1, 5);

        new Thread(()->{
            waitAndNotify.run("a", 1, 2);
        }).start();
        new Thread(()->{
            waitAndNotify.run("b", 2, 3);
        }).start();
        new Thread(()->{
            waitAndNotify.run("c", 3, 1);
        }).start();
    }
}

class WaitAndNotify {
    public void run(String str, int flag, int nextFlag) {
        for(int i = 0; i < loopNumber; i++) {
            synchronized(this) {
                while (flag != this.flag) {
                    try {
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.print(str);
                // 设置下一个运行的线程标记
                this.flag = nextFlag;
                // 唤醒所有线程
                this.notifyAll();
            }
        }
    }

    private int flag;
    private int loopNumber;

    public WaitAndNotify(int flag, int loopNumber) {
        this.flag = flag;
        this.loopNumber = loopNumber;
    }
}

park和unpark实现

public class Code_33_Test {

    public static Thread t1, t2, t3;
    public static void main(String[] args) {
        ParkAndUnPark obj = new ParkAndUnPark(5);
        t1 = new Thread(() -> {
            obj.run("a", t2);
        });

        t2 = new Thread(() -> {
            obj.run("b", t3);
        });

        t3 = new Thread(() -> {
            obj.run("c", t1);
        });
        t1.start();
        t2.start();
        t3.start();

        LockSupport.unpark(t1);
    }
}

class ParkAndUnPark {
    public void run(String str, Thread nextThread) {
        for(int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(nextThread);
        }
    }

    private int loopNumber;

    public ParkAndUnPark(int loopNumber) {
        this.loopNumber = loopNumber;
    }
}

await/signal实现

public class Code_34_Test {

    public static void main(String[] args) {
        AwaitAndSignal lock = new AwaitAndSignal(5);
        Condition a = lock.newCondition();
        Condition b = lock.newCondition();
        Condition c = lock.newCondition();
        new Thread(() -> {
            lock.run("a", a, b);
        }).start();

        new Thread(() -> {
            lock.run("b", b, c);
        }).start();

        new Thread(() -> {
            lock.run("c", c, a);
        }).start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        lock.lock();
        try {
            a.signal();
        }finally {
            lock.unlock();
        }
    }
}

class AwaitAndSignal extends ReentrantLock {
    public void run(String str, Condition current, Condition nextCondition) {
        for(int i = 0; i < loopNumber; i++) {
            lock();
            try {
                current.await();
                System.out.print(str);
                nextCondition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                unlock();
            }
        }
    }

    private int loopNumber;

    public AwaitAndSignal(int loopNumber) {
        this.loopNumber = loopNumber;
    }
}

共享模型之内存

java内存模型(JMM)

JMM 即 Java Memory Model,它定义了主存(共享内存)、工作内存(线程私有)抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM 体现在以下几个方面

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受 cpu 缓存的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

volatile原理

volatile的底层实现原理是内存屏障,Memory Barrier

  • 对volatile变量的写指令后会加入写屏障
  • 对volatile变量的读指令前会加入读屏障

保证可见性

  • 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
public void actor2(I_Result r) {
     num = 2;
     ready = true; // ready 是被 volatile 修饰的,赋值带写屏障
     // 写屏障
}
  • 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
public void actor1(I_Result r) {
 // 读屏障
 // ready是被 volatile 修饰的,读取值带读屏障
 if(ready) {
    r.r1 = num + num;
 } else {
    r.r1 = 1;
 }
}

分析如图:


屏障

保证有序性

  • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
public void actor2(I_Result r) {
 num = 2;
 ready = true; // ready 是被 volatile 修饰的,赋值带写屏障
 // 写屏障
}
  • 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
public void actor1(I_Result r) {
 // 读屏障
 // ready 是被 volatile 修饰的,读取值带读屏障
 if(ready) {
 r.r1 = num + num;
 } else {
 r.r1 = 1;
 }
}

注意

volatile只能保证有序性和可见性,不能解决指令交错

写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去。 而有序性的保证也只是保证了本线程内相关代码不被重排序

image-20210425200550672.png

synchronized可以保证原子性、有序性、可见性是对其可以完全管理的资源来说的,如使用double-checked时的INSTANCE未被完全管理,则不能完全实现有序性、可见性

// 最开始的单例模式是这样的
    public final class Singleton {
        private Singleton() { }
        private static Singleton INSTANCE = null;
        public static Singleton getInstance() {
        // 首次访问会同步,而之后的使用不用进入synchronized
        synchronized(Singleton.class) {
            if (INSTANCE == null) { // t1
                INSTANCE = new Singleton();
            }
        }
            return INSTANCE;
        }
    }

// 但是上面的代码块的效率是有问题的,因为即使已经产生了单实例之后,之后调用了getInstance()方法之后还是会加锁,这会严重影响性能!因此就有了模式如下double-checked lockin:
    public final class Singleton {
        private Singleton() { }
        private static Singleton INSTANCE = null;
        public static Singleton getInstance() {
            if(INSTANCE == null) { // t2
                // 首次访问会同步,而之后的使用没有 synchronized
                synchronized(Singleton.class) {
                    if (INSTANCE == null) { // t1
                        INSTANCE = new Singleton();
                    }
                }
            }
            return INSTANCE;
        }
    }
//但是上面的if(INSTANCE == null)判断代码没有在同步代码块synchronized中,不能享有synchronized保证的原子性,可见性。所以

以上的实现特点是:

  • 懒惰实例化
  • 首次使用 getInstance() 才使用 synchronized 加锁,后续使用时无需加锁
  • 有隐含的,但很关键的一点:第一个 if 使用了 INSTANCE 变量,是在同步块之外

但在多线程环境下,上面的代码是有问题的,getInstance 方法对应的字节码为:

0: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
3: ifnonnull 37
// ldc是获得类对象
6: ldc #3 // class cn/itcast/n5/Singleton
// 复制操作数栈栈顶的值放入栈顶, 将类对象的引用地址复制了一份
8: dup
// 操作数栈栈顶的值弹出,即将对象的引用地址存到局部变量表中
// 将类对象的引用地址存储了一份,是为了将来解锁用
9: astore_0
10: monitorenter
11: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
14: ifnonnull 27
// 新建一个实例
17: new #3 // class cn/itcast/n5/Singleton
// 复制了一个实例的引用
20: dup
// 通过这个复制的引用调用它的构造方法
21: invokespecial #4 // Method "<init>":()V
// 最开始的这个引用用来进行赋值操作
24: putstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
27: aload_0
28: monitorexit
29: goto 37
32: astore_1
33: aload_0
34: monitorexit
35: aload_1
36: athrow
37: getstatic #2 // Field INSTANCE:Lcn/itcast/n5/Singleton;
40: areturn

其中:

  • 17 表示创建对象,将对象引用入栈 // new Singleton
  • 20 表示复制一份对象引用 // 复制了引用地址
  • 21 表示利用一个对象引用,调用构造方法 // 根据复制的引用地址调用构造方法
  • 24 表示利用一个对象引用,赋值给 static INSTANCE

可能出现的问题


线程t1 还未对对象初始化完成,但时赋值操作已经完成,此时线程t2就会使用未初始化完全的对象。

简单来说,就是new 对象这个操作不是原子操作。会分成创建引用且复制一份引用,其中一份引用赋值给变量,另外一份执行<init>函数。所以,如果在初始化没有完成前,就赋值了则会出现并发问题。

double-checked locking 解决

加volatile就行了。

public final class Singleton {
        private Singleton() { }
        private static volatile Singleton INSTANCE = null;
        public static Singleton getInstance() {
            // 实例没创建,才会进入内部的 synchronized代码块
            if (INSTANCE == null) {
                synchronized (Singleton.class) { // t2
                    // 也许有其它线程已经创建实例,所以再判断一次
                    if (INSTANCE == null) { // t1
                        INSTANCE = new Singleton();
                    }
                }
            }
            return INSTANCE;
        }
    }

共享模型之无锁

管程即 monitor 是阻塞式的悲观锁实现并发控制,这章我们将通过非阻塞式的乐观锁的来实现并发控制

CAS方式

class AccountSafe implements Account{

    AtomicInteger atomicInteger ;
    
    public AccountSafe(Integer balance){
        this.atomicInteger =  new AtomicInteger(balance);
    }
    
    @Override
    public Integer getBalance() {
        return atomicInteger.get();
    }

    @Override
    public void withdraw(Integer amount) {
        // 核心代码
        while (true){
            int pre = getBalance();
            int next = pre - amount;
            if (atomicInteger.compareAndSet(pre,next)){
                break;
            }
        }
    }
}

原子整数

java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
使用原子的方式更新基本类型

  • AtomicInteger:整型原子类
  • AtomicLong:长整型原子类
  • AtomicBoolean :布尔型原子类

以 AtomicInteger 为例

public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger(0);
    
        // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
        System.out.println(i.getAndIncrement());
    
        // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
        System.out.println(i.incrementAndGet());
    
        // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
        System.out.println(i.decrementAndGet());
    
        // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
        System.out.println(i.getAndDecrement());
    
        // 获取并加值(i = 0, 结果 i = 5, 返回 0)
        System.out.println(i.getAndAdd(5));
    
        // 加值并获取(i = 5, 结果 i = 0, 返回 0)
        System.out.println(i.addAndGet(-5));
    
        // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.getAndUpdate(p -> p - 2));
    
        // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.updateAndGet(p -> p + 2));
    
        // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
        // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
        System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
    
        // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
    }

原子引用

为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

  • AtomicReference:引用类型原子类
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
  • AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起。只关心是否被更改过

原子数组

改变的只是对象本身里面的值,而不需要创建一个新的数组对象

使用原子的方式更新数组里的某个元素

  • AtomicIntegerArray:整形数组原子类
  • AtomicLongArray:长整形数组原子类
  • AtomicReferenceArray :引用类型数组原子类
public class Code_10_AtomicArrayTest {

    public static void main(String[] args) throws InterruptedException {
        /**
        参数1,提供数组、可以是线程不安全数组或线程安全数组
        参数2,获取数组长度的方法
        参数3,自增方法,回传array,index
        参数4,打印数组的方法
        
        
        函数式编程:
        //supplier 提供者  无中生有 ()->结果
        //function  函数      一个参数一个结果 (参数)->结果, BiFunction(参数1,参数2)-> 结果
        //consumer  消费者     一个参数没有结果  ()->void,   BiComsumer(参数1,参数2)->
        
         * 结果如下:
         * [9934, 9938, 9940, 9931, 9935, 9933, 9944, 9942, 9939, 9940]
         * [10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
         */
        demo(
                () -> new int[10],
                (array) -> array.length,
                (array, index) -> array[index]++,
                (array) -> System.out.println(Arrays.toString(array))
        );
        TimeUnit.SECONDS.sleep(1);
        demo(
                () -> new AtomicIntegerArray(10),
                (array) -> array.length(),
                (array, index) -> array.getAndIncrement(index),
                (array) -> System.out.println(array)
        );
    }

    private static <T> void demo(
            Supplier<T> arraySupplier,
            Function<T, Integer> lengthFun,
            BiConsumer<T, Integer> putConsumer,
            Consumer<T> printConsumer) {
        ArrayList<Thread> ts = new ArrayList<>(); // 创建集合
        T array = arraySupplier.get(); // 获取数组
        int length = lengthFun.apply(array); // 获取数组的长度
        for(int i = 0; i < length; i++) {
            ts.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putConsumer.accept(array, j % length);
                }
            }));
        }
        ts.forEach(Thread::start);
        ts.forEach((thread) -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        printConsumer.accept(array);
    }

}

字段更新器

原子累加器

AtomicLong Vs LongAdder

    public static void main(String[] args) {
        for(int i = 0; i < 5; i++) {
            demo(() -> new AtomicLong(0), (ref) -> ref.getAndIncrement());
        }
        for(int i = 0; i < 5; i++) {
            demo(() -> new LongAdder(), (ref) -> ref.increment());
        }
    }

    private static <T> void demo(Supplier<T> supplier, Consumer<T> consumer) {
        ArrayList<Thread> list = new ArrayList<>();

        T adder = supplier.get();
        // 4 个线程,每人累加 50 万
        for (int i = 0; i < 4; i++) {
            list.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) {
                    consumer.accept(adder);
                }
            }));
        }
        long start = System.nanoTime();
        list.forEach(t -> t.start());
        list.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.nanoTime();
        System.out.println(adder + " cost:" + (end - start)/1000_000);
    }


执行代码后,发现使用 LongAdder 比 AtomicLong 快2,3倍,使用 LongAdder 性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

LongAdder 原理

和ConcurrentHashMap中的countcell数组使用了相同的思想

LongAdder 类有几个关键域
public class LongAdder extends Striped64 implements Serializable {}
下面的变量属于 Striped64 被 LongAdder 继承。

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy; 

使用cas实现一个自旋锁

 public void lock() {
        while (true) {
            if(state.compareAndSet(0, 1)) {
                break;
            }
        }
    }

    public void unlock() {
        log.debug("unlock...");
        state.set(0);
    }

原理之伪内存

// 防止缓存行伪共享
@sun.misc.Contended   //防止一个缓存行容纳多个cell对象
static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    }
    // 省略不重要代码
}

每个cpu拥有着多级缓存(一般为3级缓存),缓存以缓存行为单位i,每个缓存行对应着一块内存,一般是64字节

cpu要保证数据的一致性,如果某个cpu核心更改了数据,其它cpu核心对应的整个缓存行必须失效

Unsafe

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。LockSupport 的 park 方法,cas 相关的方法底层都是通过Unsafe类来实现的。

public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
        // Unsafe 使用了单例模式,unsafe 对象是类中的一个私有的变量 
        Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
        theUnsafe.setAccessible(true);
        Unsafe unsafe = (Unsafe)theUnsafe.get(null);
        
    }

final原理

和volatile一样,使用了读写屏障

线程池

具体请见线程池简易实现和线程池源码

image-20210708194709608.png

自定义线程池

package Thread;


import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 自定义线程池
 */

public class demo06 {

    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,
                (queue, task) -> {
                    // 1. 阻塞等待。
//                    queue.put(task);
                    // 2. 带超时的等待
//                    queue.offer(task, 500, TimeUnit.MILLISECONDS);
                    // 3. 调用者放弃
//                    log.info("放弃 {}", task);
                    // 4. 调用者抛出异常
//                    throw new RuntimeException("任务执行失败" + task);
                    // 5. 调用者自己执行任务
                    task.run();
                });
        for(int i = 0; i < 4; i++) {
            int j = i;
            threadPool.executor(() ->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(j);
            });
        }
    }

}

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

// 实现线程池
class ThreadPool {
    // 线程集合
    private Set<Worker> works = new HashSet<Worker>();
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 线程池的核心数
    private int coreSize;
    // 获取任务的超时时间
    private long timeout;
    private TimeUnit unit;
    // 使用策略模式。
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit unit, int queueCapacity,
                      RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.unit = unit;
        taskQueue = new BlockingQueue<Runnable>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    // 执行任务
    public void executor(Runnable task) {
        // 如果线程池满了. 就将任务加入到任务队列, 否则执行任务
        synchronized (works) {
            if(works.size() < coreSize) {
                Worker worker = new Worker(task);
//                log.info("新增 worker {} ,任务 {}", worker, task);
                works.add(worker);
                worker.start();
            } else {
//                taskQueue.put(task);
                // 1)死等
                // 2)带超时等待
                // 3)让调用者放弃任务执行
                // 4)让调用者抛出异常
                // 5)让调用者自己执行任务

                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        // 执行任务
        // 1)当 task 不为空,执行任务
        // 2)当 task 执行完毕,再接着从任务队列获取任务并执行
        @Override
        public void run() {
//            while (task != null || (task = taskQueue.take()) != null) {
            while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
                try {
//                    log.info("正在执行 {}", task);
                    task.run();
                }catch (Exception e) {

                } finally {
                    task = null;
                }
            }
            synchronized (works) {
//                log.info("worker 被移除 {}", this);
                works.remove(this);
            }
        }
    }
}

// 实现阻塞队列
class BlockingQueue<T> {

    // 阻塞队列的容量
    private int capacity;
    // 双端链表, 从头取, 从尾加
    private Deque<T> queue;
    // 定义锁
    private ReentrantLock lock;
    // 当阻塞队列满了时候, 去 fullWaitSet 休息, 生产者条件变量
    private Condition fullWaitSet;
    // 当阻塞队列空了时候,去 emptyWaitSet 休息, 消费者小件变量
    private Condition emptyWaitSet;

    public BlockingQueue(int capacity) {
        queue = new ArrayDeque<>(capacity);
        lock = new ReentrantLock();
        fullWaitSet = lock.newCondition();
        emptyWaitSet = lock.newCondition();
        this.capacity = capacity;
    }

    // 带有超时时间的获取
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            // 同一时间单位
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    if(nanos <= 0) {
                        return null;
                    }
                    // 防止虚假唤醒, 返回的是所剩时间
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    // 获取
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            T t = queue.removeFirst();
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }

    // 添加
    public void put(T task) {
        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
//                    log.info("等待加入任务队列 {}", task);
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
//            log.info("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
        }finally {
            lock.unlock();
        }
    }
    // 带有超时时间的添加
    public boolean offer(T task, long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if(nanos <= 0) {
                        return false;
                    }
//                    log.info("等待加入任务队列 {}", task);
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
//            log.info("加入任务队列 {}", task);
            queue.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 判断判断是否满
            if(queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else { // 有空闲
//                log.info("加入任务队列 {}", task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }

        }finally {
            lock.unlock();
        }
    }

    public int getSize() {
        lock.lock();
        try {
            return queue.size();
        } finally {
            lock.unlock();
        }
    }
}

AQS

具体请见AQS源码浅析

全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。

特点:

  • 用state属性来表示资源的状态(分为独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和设防所

    • getState-获取state状态

    • setState=设置state状态

    • compareAndSetState - 乐观锁设置state状态

    • 独占模式是只有以一个线程能够访问资源,而共享模式可以允许多个线程访问资源

  • 提供了基于FIFO的等待队列,类似于Monitor的EntryList

  • 条件变量来实现等待,唤醒机制,支持多个条件变量,类似于Monitor的WaitSet

ReentrantLock

image-20210517145119092.png

非公平锁实现原理

从构造器开始看,默认为非公平锁实现

public ReentrantLock() {
 sync = new NonfairSync();
 }

lock()

总结:

掌握知识点:

Synchronized原理 LockSupport原理 ReenTrantLock原理

  • 分析多线程访问共享资源时,哪些代码片段属于临界区
  • 使用 synchronized 互斥解决临界区的线程安全问题
    • 掌握 synchronized 锁对象语法
    • 掌握 synchronzied 加载成员方法和静态方法语法
    • 掌握 wait/notify 同步方法
  • 使用 lock 互斥解决临界区的线程安全问题 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
  • 学会分析变量的线程安全性、掌握常见线程安全类的使用
  • 了解线程活跃性问题:死锁、活锁、饥饿
  • 应用方面
    • 互斥:使用 synchronized 或 Lock 达到共享资源互斥效果,实现原子性效果,保证线程安全。
    • 同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果。
  • 原理方面
    • monitor、synchronized 、wait/notify 原理
    • synchronized 进阶原理
    • park & unpark 原理
  • 模式方面
    • 同步模式之保护性暂停
    • 异步模式之生产者消费者
    • 同步模式之顺序控制

JMM CAS原理 Volatile原理

  • 可见性 - 由 JVM 缓存优化引起
  • 有序性 - 由 JVM 指令重排序优化引起
  • happens-before 规则
  • 原理方面
    • volatile
  • 模式方面
    • 两阶段终止模式的 volatile 改进
    • 同步模式之 balking
  1. CAS 与 volatile
  2. juc 包下 API
    1. 原子整数
    2. 原子引用
    3. 原子数组
    4. 字段更新器
    5. 原子累加器
  3. Unsafe
  4. 原理方面
    1. LongAdder 源码
    2. 伪共享
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容