Java 多线程 & 线程模型

多线程

概述

Thread 线程 => 是操作系统能够进行运算调度的最小单位。大部分的情况下,它被包含在进程中,是进程中的实际运作单位

  • Java 的执行模型是同步 | 阻塞(block)的
  • JVM 默认情况下只有一个线程 => 具有严重的性能问题
  • Java 中只有 Thread 代表线程
  • Thread 类的每一个实例代表一个 JVM 中的线程
  • Thread.start() 之后,JVM 中就增加一个线程,增加一个执行流,增加了一套方法栈 => 开启新线程 => new Thread(Runnable target).start() + run
  • start 方法调用 => 开始并发执行
  • 主线程的入口一定是 Main 方法,其余新开的线程的最底部一定是 Thread.run 方法
  • 方法栈(局部变量)是线程私有的
  • 静态变量 | 类变量是被所有线程共享的
  • 多线程中的线程会被线程调度机制打断
  • 不同的执行流的同步执行是一切线程问题的根源

生命周期

java.lang.Thread.State Thread Life Cycle
  • NEW => Thread state for a thread which has not yet started => 初始态 => 新创建一个线程对象,但还没有调用 start() 方法
  • RUNNABLE => Thread state for a runnable thread. A thread in the runnable state is executing in the Java virtual
    machine but it may be waiting for other resources from the operating system such as the processor. => 运行
  • BLOCKED => Thread state for a thread blocked waiting for a monitor lock. A thread in the blocked state is waiting for a monitor lock to enter a synchronized block/method or reenter a synchronized block/method after calling Object.wait. => synchronized => 阻塞 => 表示线程阻塞于锁
  • WAITING => Thread state for a waiting thread. A thread is waiting due to calling one of the following methods: Object.wait with no timeout, Thread.join with no timeout & LockSupport.park. A thread in the waiting state is waiting for another thread to perform a particular action. => 等待
  • TIMED_WAITING => Thread state for a waiting thread with a specified waiting time. A thread is in the timed waiting state due to calling one of the following methods with a specified positive waiting time: Thread.sleep, Object.wait with a timeout, Thread.join with a timeout, LockSupport.parkNanos & LockSupport.parkUntil => 超时等待 => 调用锁 | monitor |
    对象的 wait() 方法之后,就进入了 WAITING 状态
  • TERMINATED => Thread state for a terminated thread. The thread has completed execution => 终止 => 表示该线程已经执行完毕

多线程

  • CPU 的速度远远大于内存的读取、硬盘的读取以及网络IO的速度,在读取相关数据的时候,CPU 可以做其他有意义的事情,故产生了多线程 => 尽可能的运用 CPU 的效率
  • 现代 CPU 都是多核的 => Q = I^2 *RT => CPU 大于 4GHz 之后发热量承受不住 => 推出多核 => 发热量并没有显著变化 => 多线程可以更大限度发挥多核 CPU 的好处
  • 原子操作 -> atomic operation => 不会被线程调度机制打断的操作。

线程难的原因:需要看着同一份代码,想象不同的人在疯狂的以乱序执行它 => 多个线程同时访问同一个共享变量时,由于变量不是原子的,以致于过程是乱序的,不知道什么时候会发生不正常的,有可能是正常的,有可能是不正常的

// 最终结果不是 1000
public class Test {
    private static int j = 0;

    public static void main(String[] args) {
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println(++j);
            }).start();
        }
    }
}
result

Java 为多线程提供了语言级的支持

  • sychoronized 关键字
  • Thread
  • Object.wait | Object.notify | Object.notifyAll

多线程适用场景

  • 对于 IO 密集型应用极其有用 => 网络 IO (通常包括数据库) | 文件 IO
  • 对于 CPU 密集型应用稍有折扣 => 多线程带来的提升有限 => CPU 密集型:计算机完成一项任务的时间是取决于 CPU 的速度,其 CPU 占用率高,也许在某段时间内保持 100% 占用率
  • 性能提升的上线 => 单核CPU 100% | 多核CPU N * 100%

多线程问题

  1. 正确性
    • 安全 => 竞争条件 & 死锁
    • 协同 => 同时、随机执行的线程,如何让他们协同工作?
  2. 效率 & 易用性
    • 执行的越快越好
    • 用起来越不容易出错越好

java.lang.Object

  • Java 从一开始就把线程作为语言特性,提供语言级的支持

为什么 Java 中所有对象都可以成为锁?

  • Object.wait() | Object.wait(long) | Object.wait(long, int) => Causes the current thread to wait until
    another thread invokes the java.lang.Object#notify() method or the java.lang.Object#notifyAll() method for this
    object. The current thread must own this object's monitor. => Error java.lang.IllegalMonitorStateException
  • Object.notify => Wakes up a single thread that is waiting on this object's monitor. If any threads are waiting on
    this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the
    implementation.
  • Object.notifyAll() => Wakes up all threads that are waiting on this object's monitor. The awakened threads will
    not be able to proceed until the current thread relinquishes the lock on this object. The awakened threads will
    compete in the usual manner with any other threads that might be actively competing to synchronize on this object; for
    example, the awakened threads enjoy no reliable privilege or disadvantage in being the next thread to lock this
    object.

合理的使用 wait + notify 就可以达到调度不同线程的目的 => wait() 之后 notify 进程才会一直走下去,如果先 notifywait
那么进程将会一直等待 notify

synchronized(obj) {
    while(condition does no hold) {
        obj.wait();
    }
}

synchronized(obj) {
    obj.notify();
}

线程不安全

线程不安全的表现

  • 数据错误
  • 死锁

数据错误

数据错误 => 不是原子操作

  • i++
  • if-then-do
    public class Test {
        private static final Map<Integer, Integer> map = new HashMap<>();
    
        public static void main(String[] args) {
            for (int i = 0; i < 1000; i++) {
                new Thread(() -> {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    int random = new Random().nextInt(10);
                    if (!map.containsKey(random)) {
                        map.put(random, random);
                        System.out.println("put: " + random);
                    }
                }).start();
            }
        }
    }
    
    result

死锁

synchronized => 同步 => 锁 => 同一个时刻只能有一个线程拿到同一把锁 => 在 Java 中任何一个对象都可以当成锁🔐

public class Test {
    // Thread1 和 Thread2 使用了不同的顺序来获得资源锁
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    public static void main(String[] args) {
        new Thread1().start();
        new Thread2().start();
    }

    static class Thread1 extends Thread {
        @Override
        public void run() {
            synchronized (lock1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (lock2) {
                    System.out.println("");
                }
            }
        }
    }

    static class Thread2 extends Thread {
        @Override
        public void run() {
            synchronized (lock2) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (lock1) {
                    System.out.println("");
                }
            }
        }
    }
}
死锁宏观表现

排查死锁问题

  1. jps => 查看当前所有的 Java 进程
  2. 找到死锁所运行的进程 ID
  3. jstack <java process id> > <file> => 将死锁的进程栈信息保存到指定文件
  4. 分析文件 | 排查调用栈,看每个 Thread 在哪个方法处停滞了

预防死锁产生的原则

所有的线程都按照相同的顺序获得资源的锁🔐

线程安全

实现线程安全的基本手段

  • 不可变类 => 数据错误「多个线程同时修改同一个数据」=> Integer | String | ...
  • synchronized 同步块
  • JUC 包

synchronized

  • Java 语音级的支持,1.6 之后性能极大的提升
  • 字节码层面的实现 => MONITORENTER + MONITOREXIT
  • 锁住的是什么?
    // 锁住对象 LOCK
    public static void Main() {
        // 在语句外加 synchronized
        synchronized(LOCK) {
        }
    }
    
    // 锁住 Class 对象
    public synchronized static void foo() {
    }
    
    // 锁住 this (当前对象)
    // 在方法上声明 synchronized 关键字
    public synchronized void bar() {
    }
    
    // 等价于 => 效果等价,字节码不等价
    public void bar() {
        synchronized(this) {
            // do something
        }
    }
    
    // 将 ` this ` 当成锁 => 将该实例当成锁
    Main object = new Main();
    new Thread(object::modifySharedVariable).start();
    
    private synchronized void modifySharedVariable() {
        // do something
    }
    
    // 等价于
    private void modifySharedVariable() {
        synchronized(this) {
        }
    }
    
  • 非公平锁 => 谁获得锁取决于操作系统的调度
  • 可重入锁

缺点

  • synchronized 不能知道当前锁是否能被拿到
  • 只有悲观锁、排他锁,没有乐观锁、共享锁
  • 性能相对稍差

JUC 包

Java 的协同机制一定要和锁(monitor | 监视器 | 管程)一起来工作

  • JUC => java.util.concurrent => Java 并发工具包
  • java.util.concurrent.ConcurrentHashMap => 任何使用 HashMap 有线程安全问题的地方,都无脑使用 ConcurrentHashMap 替换即可
  • java.util.concurrent.atomic.AtomicBoolean | java.util.concurrent.atomic.AtomicInteger | java.util.concurrent.atomic.AtomicLong => a += 1 不是原子操作 => AtomicInteger 中的 addAndGet() 方法是原子操作
  • java.util.concurrent.locks.ReentrantLock => 可重入锁 => 与 synchronized 不同点:ReentrantLock 可以在一个地方加锁,在另一个地方解锁
  • java.util.concurrent.CountDownLatch

Lock & Condition

  • 同一个锁可以有多个条件
  • 读写分离 => ReadLock | WriteLock => 读写锁 => 可以实现多个线程读,只有一个线程写
  • Lock.tryLock() => Acquires the lock only if it is free at the time of invocation. => boolean tryLock()
  • 可以方便的实现更加灵活的优先级 & 公平性 => 公平锁 & 非公平锁
  • ReentrantLock => sync + NonfairSync + FairSync => 默认的 ReentrantLock 是非公平锁
    ReentrantLock
  • 锁的重入问题 => 当一个线程重复的去获取锁,是否可以获取到 => Lock 支持可重入锁
private static Lock LOCK = new ReentrantLock();

public static void Main() {
    LOCK.lock();
    LOCK.lock();
    LOCK.lock();
    
    # lock 了几次就要 unlock 几次,不然其他线程获取不到锁
    LOCK.unlock();
    LOCK.unlock();
    LOCK.unlock();
}

CountDownLatch

  • 倒数闭锁
  • 用于协调一组线程的工作
  • countDown + await
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class Main {
  public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(10);
    for (int i = 0; i < 10; i++) {
      int finalI = i;
      new Thread(() -> {
        int second = new Random().nextInt(10);
        try {
          Thread.sleep(second * 1000);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
        System.out.println("线程" + finalI + "活干完了");
        latch.countDown();
      }).start();
    }

    latch.await();
    System.out.println("我是老板,所有工人的活都干完了");
  }
}

CyclicBarrier

  • CyclicBarrier => 循环屏障 => 等待所有线程到达一个屏障之后所有线程再一起出发
  • await
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Main {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(10);
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            new Thread(() -> {
                int second = new Random().nextInt(10);
                try {
                    Thread.sleep(second * 1000);
                    System.out.println("线程" + finalI + "活干完了");
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    throw new RuntimeException(e);
                }
                System.out.println("等其他人到了才能继续");
            }).start();
        }
    }
}

Semaphore

  • 信号量 => 当信号量为1时,就是锁,排它锁就是一个信号量为1的特殊的锁
  • acquire + release

BlockingQueue & BlockingDeque

  • 传统的集合框架的操作要么正常返回,要么丢出异常,BlockingQueue | BlockingDeque 提供了一种等待的可能
  • queue => 先进先出
  • deque => double ended queue => 头尾都可以进出
  • BlockingQueue => capacity + put + take

线程池 ThreadPoolExecutor

  • 线程是非常昂贵的 => Java 线程模型的缺陷
  • Java 的线程调度完全依赖于操作系统的依赖调度,所以线程是非常昂贵的,每个线程都要占用操作系统一部分资源
  • 使用线程池完成多线程协同和任务分解
  • 线程池是预先定义好的若干个线程 => 省去每次创建 | 销毁线程的开销 => java.concurrent.Executors + java.util.Callable + java.util.concurrent.Future
  • java.util.Callable => 用于描述一个抽象任务的 interface => 泛型 + 有返回值 + 可以抛出异常
  • java.util.concurrent.Future
    ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
    Future submit = executorService.submit(Callable);
    submit.get();
    

参数

new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
  • corePoolSize => the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set => 核心员工数量
  • maximunPoolSize => the maximum number of threads to allow in the pool => 最大招募的员工数量
  • keepAliveTime & unit => 员工闲下来之后多久炒掉他们
    • keepAliveTime => when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating
    • unit => the time unit for the keepAliveTime argument
  • workQueue => the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method => 订单队列
  • threadFactory => the factory to use when the executor creates a new thread => 造人的工厂
  • handler => the handler to use when execution is blocked because the thread bounds and queue capacities are reached => 订单实在太多的处理策略

Rejected Execution Handler Policy

RejectedExecutionHandler
  • AboutPolicy => A handler for rejected tasks that throws a RejectedExecutionException
  • CallerRunsPolicy => A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded
  • DiscardOldestPolicy => A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded
  • DiscardPolicy => A handler for rejected tasks that silently discards the rejected task

ExecutorService

一个多线程执行器框架,最常用的实现是线程池。屏蔽了线程的细节,提供了并发执行任务机制

Future

  • 代表未来才会发生的事情
  • Future 本身是立即返回的
  • Future.get 会阻塞并返回执行结果,并抛出可能的异常 => 使用 get 异常会从其他线程中转移到当前线程,方便处理异常
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService threadPoll = Executors.newFixedThreadPool(10);
        Future<?> future = threadPoll.submit(() -> {
            try {
                Thread.sleep(10000);
                System.out.println("我结束工作了");
                return "Future Result";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });

        System.out.println("我把工作提交了");

        // 获取 future 结果
        System.out.println(future.get());
        System.out.println("我提交的工作做完了");
    }
}

创建多线程方法

  • for loop + new Thread()
  • 线程池 => java.concurrent.Executors + java.util.Callable + java.util.concurrent.Future
  • java.util.concurrent.ForkJoinPool
    • Arrange async execution => execute method
    • Await and obtain result => invoke method
    • Arrange exec and obtain Future => submit method
  • java.util.Collection.parallelStream() => Collection.parallelStream().forEach(item => // do something)

调度线程方法

Runnable & Callable

  • Runnable | Callable 都不是线程,只是一个任务,只是一小段代码,这个任务会被哪个线程执行取决于线程池本身,可以被任何一个线程执行
  • Runnable 的限制
    1. 不能丢出来异常,Runnable 方法签名上没有 Throws
    2. Runnable 没有返回值

基于 Runnable 的问题,创造了 Callable,Callable 就是 Runnable 的高级版本,Callable 解决了 Runnable 的问题

Java Memory Modal

JMM => Java 线程模型 => 方法中的局部变量是私有的,其他都是公有的

当两个线程执行时,方法栈是私有的,方法栈中有局部变量,局部变量也是私有的,当在线程中 new Object() 的时候,每个线程都会在 Heap 中生成一个 Object,方法栈中的局部变量指向 Heap 中的 Object,两个线程会生成两个 Object

由于 CPU 和 Main Memory 交换是很慢的,CPU 进行一次内存寻址大约是 1 微秒,大约相当于 CPU 的 1000 - 3000 个时钟周期。所以 Java 模型允许每一个线程自己有一份私有的副本,CPU
和私有的副本进行读写。Java 线程模型定期的把每个线程的私有变量同步给 Main Memory

上述导致的问题:

  1. 线程A更新一个公有变量 globalVariable,线程B感知不到,需要等到线程A同步至 Main Memory,并且线程B的副本同步 Main Memory,才能感知到 globalVariable 的更新
  2. 在现代 CPU 中,如果两条指令没有依赖关系,编译器可以进行一个指令重排

volatile

volatile 关键字声明的变量在多线程中,所有的线程写该变量的时候都会直接写入 Main Memory,对于共享变量所做的修改会立刻写回 Main Memory。当读该变量时,会立刻从 Main Memory 中读取刷新至当前 CPU 的副本中,再从副本中读取

  1. 可见性,并非原子性
  • 写入 volatile 变量会直接写入 Main Memory
  • 从 volatile 变量读取会直接读取 Main Memory
  • 非常弱的同步机制
  1. 禁止指令重排 => 编译器和处理器都可能对指令进行重排,以提高效率,提高程序执行的速度,在多线程中会导致问题 => 可以保证自己的读写前后插入屏障,使得不会发生指令重排
  2. 有同步的时候无需 volatile => synchronized | Lock | AtomicInteger => 既保证可见性又保证原子性

知识点

  1. 如何查看一个数据是否是线程安全的?
    进入方法后,搜索 thread => <strong>Note that this implementation is not synchronized.</strong> => 线程不安全
  2. 除非特意提及线程安全,不然都是线程不安全的
  3. Collections.synchronized 方法可以将一些线程不安全的类 | 方法变为 synchronized
  4. monitor => 在 Java 世界中,使用 monitor 代表 synchronized 锁住的对象
  5. Runnable 中的 run
    • 没有返回值
    • 没有声明抛出的异常
  6. 线程是一种很昂贵的资源,不能频繁的创建它。在 IO 密集型应用中,创建线程数量最大值约为 CPU 数量 + 1
  7. Object.wait()Thread.sleep() 有什么区别?
    • Object.wait() => 持有锁之后调用锁的 wait 方法后 sleep,之后放弃锁,等待被 notify
    • Thread.sleep() => sleep 的时候持有锁,sleep 可以不持有锁,当不持有锁的时候 sleep,不会影响其他线程工作
  8. 创建线程池里有 new Thread => TODO: 找到相应代码
  9. TODO
  • 排它锁
  • 共享锁
  • 乐观锁
  • 悲观锁
  • 公平锁
  • 非公平锁
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容