前言
进程及其作用
一个进程就是一个运行中的程序。例如:在 windows 操作系统启动 Word 就表示启动了一个进程。在java的开发环境下启动JVM,就表示启动了一个进程。现代的计算机都是支持多进程的,在同一个操作系统中,可以同时启动多个进程。玩电脑,一边玩游戏(游戏进程)一边听音乐(音乐进程)。 对于单核计算机来讲,在同一个时间点上,游戏进程和音乐进程是同时在运行吗?不是。 因为计算机的 CPU 只能在某个时间点上做一件事。由于计算机将在“游戏进程”和“音乐进程”之间频繁的切换执行,切换速度极高,人类感觉游戏和音乐在同时进行。 多进程的作用不是提高执行速度,而是提高 CPU 的使用率。 进程和进程之间的内存是独立的。进程间的切换会有较大的开销,一个进程包含1--n个线程。(进程是资源分配的最小单位)
线程及其作用
一个线程是进程中的一个执行场景(或叫一条执行路径)。一个进程可以启动多个线程。每个线程有独立的运行栈和程序计数器(PC),线程切换开销小。(线程是cpu调度的最小单位)多线程不是为了提高执行速度,而是提高应用程序的使用率。 线程和线程共享“堆内存和方法区内存”,栈内存是独立的,一个线程一个栈。 可以给现实世界中的人类一种错觉:感觉多个线程在同时并发执行。打个比方,一个银行职员,两个客户A和B,A的业务需要3分钟,B的业务需要5分钟。单任务操作系统就相当于职员先帮A办理再帮B办理,总共花费8分钟,这样的话B可能就会抱怨;多任务操作系统就相当于职员快速的在A和B之间切换,由于切换得非常快,让A和B以为职员一直在为自己办理业务,就实现了“同时”给2位客户办理业务,时间也是8分钟(算上切换的时间开销的话,实际上应该超过8分钟),所以并没有提高速度,但是让两个客户都满意了。
并行、并发、同步
- 并行:多个cpu实例或者多台机器同时执行一段处理逻辑,是真正的同时
- 并发:通过cpu调度算法,让用户看上去同时执行,实际上从cpu操作层面不是真正的同时。并发如果在场景中有公用的资源,那么针对这个公用的资源往往产生瓶颈,我们会用TPS或者QPS来反应这个系统的处理能力
-同步:多线程不一定会引发问题,此时叫线程安全;但如果多个线程对相同的资源进行操作,就会引发问题,导致线程不安全,同步就是解决问题,实现线程安全。线程安全的优先级高于性能
题外话
线程其实没那么复杂和神秘,就是开一条新的工作线,执行一个任务。所以我们只要把任务写进run方法(Runnable)或call方法(Callable)然后传给Thread或线程池,然后开启线程就行了
任务调度
如果计算机只有一个 CPU,CPU 在某一个时刻只能执行一条指令,线程只有得到 CPU时间片,也就是使用权,才可以执行指令。在单CPU 的机器上线程不是并行运行的,只有在多个 CPU 上线程才可以并行运行。Java 虚拟机要负责线程的调度,取得 CPU 的使用权,目前有两种调度模型:分时调度模型和抢占式调度模型,Java 使用抢占式调度模型。分时调度模型:所有线程轮流使用 CPU 的使用权,平均分配每个线程占用 CPU 的时间片。抢占式调度模型:优先让优先级高的线程使用 CPU,如果线程的优先级相同,那么会随机选择一个,优先级高的线程获取的 CPU 时间片相对多一些
相关包、类、API
java.util.concurrent
java.util.concurrent.locks
Thread(lang包)
Thread.currentThread(),获取当前线程
线程状态
新建(new):采用 new 语句创建完成
就绪(runnable):执行 start 后
运行(running):占用 CPU 时间 (java规范没有把这一状态定义为一个独立状态,对该状态的线程调用getState返回runnable)
阻塞(blocked):执行了 wait 语句、执行了 sleep 语句、join、等待某个对象锁、等待输入的场合
终止(terminated):退出 run()方法
各种状态一目了然,值得一提的是"blocked"这个状态,线程在Running的过程中可能会遇到阻塞(Blocked)情况,分三种
- 调用join()和sleep()方法,sleep()时间结束或被打断,join()中断,IO完成都会回到Runnable状态,等待JVM的调度。这种情况下的Running状态的线程并是根据实际需求休眠一下(sleep)或者让某个线程先执行(join)或者等待用户输入完成而进入Blocked状态。
- 调用wait(),使该线程处于等待池(wait blocked pool),直到notify()/notifyAll(),线程被唤醒被放到锁定池(lock blocked pool ),释放同步锁使线程回到可运行状态(Runnable)。举例,在生产者-消费者情景中,当产品数到达最大值时,生产线程拿到CPU执行权,拿到锁对象并进入(synchronized)生产代码,发现产品满了,不用再生产了,这时候调用wait方法(wait是锁对象的方法),生产线程就会释放锁对象(不释放的话消费线程进不来啊),进入图中的等待池(wait blocked pool)
-
对Running状态的线程加同步锁(Synchronized)使其进入(lock blocked pool ),同步锁被释放进入可运行状态(Runnable)。举例,当生产线程拿到CPU执行权的时候,拿到锁对象进入(synchronized)生产代码,判断产品没满,就开始生产,但是生产到一半的时候,CPU执行权被JVM切换走了,生产线程被暂停了(即时间片用完了回到Runnable状态,但是不释放锁),但是注意,锁对象还是在生产线程手中。当CPU执行权给到消费者线程的时候,发现锁不在,被别人持有了,所以消费者线程就进入图中的锁定池(lock blocked pool )
1、新建状态(New):新创建了一个线程对象。
2、就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于可运行线程池中,变得可运行,等待获取CPU的使用权。
3、运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。
4、阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:
(一)、等待阻塞:运行的线程执行wait()方法,JVM会把该线程放入等待池中。(wait会释放持有的锁)
(二)、同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。
(三)、其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。(注意,sleep是不会释放持有的锁)
5、死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
线程不安全是怎么产生的
一般是多线程环境中存在共享资源,方法内的局部变量时不会产生线程安全问题的,因为方法是在栈上执行的,而Java栈是线程私有的,因此不会产生线程安全问题
举个例子,有2个线程A和B,都操作同一个变量S。S初始值为5,A线程对S进行S=S+2操作,B线程对S进行S=S+3操作。如果两个操作都没有使用同步,那么当A线程执行时,计算好了S+2等于7,正准备把值赋给S,执行权被JVM切换给B了(此时S还是等于5)。B执行了S+3等于8,正准备赋值给S的时候,执行权被JVM切换给A了,然后A执行完S的值为7,然后执行权给到B,B执行完,S的值为8,问题就产生了。如果我们给操作加上锁(同一把锁),尽管A还没来得及执行最后的赋值操作就被剥夺执行权,但是锁还在A这里,执行权给到B,B没有锁就会进入阻塞状态(lock blocked pool)
线程状态切换和锁
线程状态的切换是因为CPU执行权的切换,虽然锁可能会导致线程阻塞,但是就算没有锁,线程同样会阻塞,锁的作用是同步,保证线程安全,保证这段代码是单线程执行(一个线程没执行完,其他线程就算有CPU执行权也执行不了,因为锁被别的线程拿了),因为单线程是安全(注意只是在多线程这个层面是安全的,因为就算只有当前线程能够执行这段代码,但是如果当前线程不正常结束了,同样会造成数据安全问题,比如转账的例子,转出方完成了,突然线程意外中断,可是收钱方没收到钱,此时需要用事务来解决)的。多线程虽然提高了CPU使用率,但是会带来数据安全问题,解决办法就是把这段代码用锁保护起来,使之成为单线程。因此多线程叫做异步,单线程是同步。
每个对象都有的方法(机制)
synchronized,wait,notify 是任何对象都具有的同步工具。他们是应用于同步问题的人工线程调度工具。讲其本质,首先就要明确monitor的概念,Java中的每个对象都有一个监视器,来监测并发代码的出入。在非多线程编码时该监视器不发挥作用,反之如果在synchronized 范围内,监视器发挥作用。wait/notify必须存在于synchronized块中。并且,这三个关键字针对的是同一个监视器(某对象的监视器)。这意味着wait之后,其他线程可以进入同步块执行。
线程优先级
优先级: 1-10,线程优先级要分三种 : MAX_PRIORITY( 10,最高 );MIN_PRIORITY ,1,最低级 )NORM_PRIORITY(5,标准)默认。线程的优先级有继承关系,比如A线程中创建了B线程,那么B将和A具有相同的优先级。
//必须在启动前设置优先级
//设置线程的优先级,线程启动后不能再次设置优先级
//设置最高优先级
t.setPriority(Thread.MAX_PRIORITY);
线程的实现
一般有四种方式,前面两种可以归结为一类:无返回值,原因很简单,通过重写run方法,run方式的返回值是void且不能抛异常,所以没有办法返回结果;后面两种可以归结成一类:有返回值,通过Callable接口,就要实现call方法,这个方法的返回值是Object,所以返回的结果可以放在Object对象中
- 继承Thread类,重写run方法
- 实现Runnable接口,重写run方法,将实现了Runnable接口的实现类的实例对象作为Thread构造函数的target
- 通过Callable(重写call方法)和FutureTask创建线程
public class ThreadDemo {
public static void main(String[] args) {
Callable<Object> oneCallable = new Tickets<Object>();
FutureTask<Object> oneTask = new FutureTask<Object>(oneCallable);
Thread t = new Thread(oneTask);
System.out.println(Thread.currentThread().getName());
t.start();
}
}
class Tickets<Object> implements Callable<Object>{
//重写call方法
@Override
public Object call() throws Exception {
System.out.println(Thread.currentThread().getName()+"-->我是通过实现Callable接口通过FutureTask包装器来实现的线程");
return null;
}
}
- 通过线程池创建线程
public class ThreadDemo05{
private static int POOL_NUM = 10; //线程池数量
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i = 0; i<POOL_NUM; i++)
{
RunnableThread thread = new RunnableThread();
//Thread.sleep(1000);
executorService.execute(thread);
}
//关闭线程池
executorService.shutdown();
}
}
class RunnableThread implements Runnable
{
@Override
public void run()
{
System.out.println("通过线程池方式创建的线程:" + Thread.currentThread().getName() + " ");
}
}
ExecutorService、Callable都是属于Executor框架。返回结果的线程是在JDK1.5(现在叫jdk5)中引入的新特征,还有Future接口也是属于这个框架,有了这种特征得到返回值就很方便了。 通过分析可以知道,他同样也是实现了Callable接口,实现了Call方法,所以有返回值。这也就是正好符合了前面所说的两种分类。执行Callable任务后,可以获取一个Future的对象,在该对象上调用get就可以获取到Callable任务返回的Object了。get方法是阻塞的,即:线程无返回结果,get方法会一直等待。再介绍Executors类:提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。
- public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。 - public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 - public static ExecutorService newSingleThreadExecutor()
创建一个只有一个线程的线程池。 - public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize) 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。 - ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
几个方法
run
run方法不能有返回值,也不能抛出异常sleep(Thread的静态方法)
不会释放锁,让当前线程睡眠,设置休眠的时间,单位毫秒,当一个线程遇到 sleep 的时候,就会睡眠,进入到阻塞状态,放弃 CPU,腾出 CPU时间片,给其他线程用,所以在开发中通常我们会这样做,使其他的线程能够取得 CPU 时间片,当睡眠时间到达了,线程会进入可运行状态,得到 CPU 时间片继续执行,如果线程在睡眠状态被中断了,将会抛出 IterruptedExceptionyield(Thread的静态方法)
不会释放锁,它与 sleep()类似,只是不能由用户指定暂停多长时间,并且 yield()方法只能让同优先级的线程有执行的机会,采用 yield 可以将 CPU 的使用权让给同一个优先级的线程join(Thread方法)
当前线程可以调用另一个线程的 join 方法,调用后当前线程会被阻塞不再执行,直到被调用的线程执行完毕,当前线程才会执行。查看源码可知调用了wait方法,所以会释放锁。interrupte(Thread方法)
并不是中断一个正在运行的线程,而是中断线程的当前状态(给处于wait,sleep,join状态的线程发送中断信号,比如中断一个睡眠或死锁的线程),被打断的线程会抛出InterruptedException,如果没处理这个异常,线程就会结束(线程内有异常没有处理(没有catch),线程就会结束)。每个线程都有一个中断标志位,interrupt方法就是使得这个标志位为true,处于sleep、wait、join状态的线程会不时检查中断标志位,如果检测到标志位为true,就会抛出InterruptedException异常。给线程发送中断信号不是为了要中断线程,而是要引起线程的注意,至于要如何响应中断信号,由线程自己决定(写代码的人考虑)interrupted(Thread的静态方法)
检查当前线程是否发生中断,会清除中断状态(中断标志设为false)isInterrupted(Thread的实例方法)
检测线程是否中断,不会清除中断状态isAlive(Thread方法)
测试线程是否处于活动状态。如果线程已经启动且尚未终止,则为活动状态wait(Object的方法)
锁对象调用wait方法使得当前线程阻塞进入等待区(每个锁有两个区,一个是等待区【锁对象调用wait后,当前线程就进入该区】,一个是等锁区【在其他线程中锁对象调用了notify/notifyAll方法后,等待区的线程就会进入等锁区;如果是指定了超时的wait方法,则时间到了就会进入等锁区,不需要唤醒。另外一种是获得时间片但是发现拿不到锁的线程会直接进入等锁区】,等锁区的线程获得锁对象后就进入就绪状态,等待OS调度),当该线程重新获得执行机会的时候,就会从暂停处继续执行。如果线程没有持有该锁就去调用该锁的wait方法,会抛出IllegalMonitorStateException异常notify/notifyAll(Object的方法)
notify从等待区随机唤醒一个线程进入等锁区,notifyAll唤醒所有等待区的线程进入等锁区。如果线程没有持有该锁就去调用该锁的notify/notifyAll方法,会抛出IllegalMonitorStateException异常
- Thread类最佳实践:
写的时候最好要设置线程名称 ,并设置线程组 ThreadGroup,目的是方便管理。在出现问题的时候,打印线程栈 (jstack -pid) 一眼就可以看出是哪个线程出的问题,这个线程是干什么的。
sleep,yield,join,wait的区别
sleep是Thread的静态方法,该方法不会导致线程放弃锁对象,可以在任何地方使用,必须捕捉异常
yield和sleep一样,只是不能指定时间,该方法不会导致线程放弃锁对象,需要捕捉异常
-
join是Thread的非静态方法,需要捕捉异常,在A线程中调用B的join方法会导致A放弃锁对象并阻塞,直到B线程结束,查看join的源码可以知道,join内是调用了wait方法的,wait方法会使得当前线程释放锁对象。
wait方法是Object的,会使得当前线程释放锁对象,并进入锁对象的等待锁定池(wait blocked pool),只有时间到了(如果wait指定了时间的话)或者锁对象调用notify/notifyAll方法才从wait blocked pool进入lock blocked pool。wait方法和notify/notifyAll只能在synchronized范围内使用。所以执行到一半千万不要调用wait,因为会释放锁,其他线程可以获得锁,如果存在共享资源的话,容易造成数据安全问题,要么就在synchronized范围的最前面执行wait。wait不需要捕捉异常
以上方法使得线程阻塞(yield不是阻塞),当线程重新获得执行权时,会从原来阻塞的位置继续执行
同步
同步会使得程序编变慢
- 为什么要引入线程同步呢?
为了数据的安全,尽管应用程序的使用率降低,但是为了保证数据是安全的,必须加入线程同步机制。线程同步机制使程序变成了(等同)单线程。 - 什么条件下要使用线程同步?
第一: 必须是多线程环境
第二: 多线程环境共享同一个数据.
第三: 共享的数据涉及到修改操作.
synchronized和Lock的区别
实现同步的方法
- synchronized,wait,notify/notifyAll,何时应该使用notify、notifyAll呢?只要事情的变化有利于等待的线程就应该使用
- synchronized代码块,可以自己定义锁对象,不定义可以使用this作为锁对象
public class Thread1 implements Runnable {
Object lock;//自定义锁对象
public void run() {
synchronized(lock){
..do something
}
}
}
public void run() {
//this作为锁对象
synchronized (this) {
for (int i=0; i<10; i++) {
s+=i;
}
System.out.println(Thread.currentThread().getName() + ", s=" + s);
s = 0;
}
- synchronized修饰方法,类的每个实例作为锁
/**
* 生产者生产出来的产品交给店员
*/
public synchronized void produce()
{
if(this.product >= MAX_PRODUCT)
{
try
{
wait();
System.out.println("产品已满,请稍候再生产");
}
catch(InterruptedException e)
{
e.printStackTrace();
}
return;
}
this.product++;
System.out.println("生产者生产第" + this.product + "个产品.");
notifyAll(); //通知等待区的消费者可以取出产品了
}
/**
* 消费者从店员取产品
*/
public synchronized void consume()
{
if(this.product <= MIN_PRODUCT)
{
try
{
wait();
System.out.println("缺货,稍候再取");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return;
}
System.out.println("消费者取走了第" + this.product + "个产品.");
this.product--;
notifyAll(); //通知等待去的生产者可以生产产品了
}
- synchronized修饰静态方法,类锁(类的class对象作为锁),类的实例共用一把锁
public class SynchronizedTest2 {
public static void main(String[] args) throws InterruptedException {
MyClass mc1=new MyClass();
MyClass mc2=new MyClass();
Thread t1=new Thread(new Runnable1(mc1));
Thread t2=new Thread(new Runnable1(mc2));
t1.setName("t1");
t2.setName("t2");
t1.start();
//延迟,保证t1先执行
Thread.sleep(1000);
t2.start();
}
}
class Runnable1 implements Runnable{
MyClass mc;
Runnable1(MyClass mc){
this.mc=mc;
}
@Override
public void run() {
if("t1".equals(Thread.currentThread().getName())){
MyClass.m1();//因为是静态方法,用的还是类锁,和对象锁无关
}
if("t2".equals(Thread.currentThread().getName())){
MyClass.m2();
}
}
}
class MyClass{
//synchronized添加到静态方法上,线程执行此方法的时候会找类锁,类锁只有一把
public synchronized static void m1(){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("m1()............");
}
/**
* m2方法等m1结束之后才能执行,该方法有synchronized
* 线程执行该方法需要"类锁",而类锁只有一个.
*/
public synchronized static void m2(){
System.out.println("m2()........");
}
}
private volatile boolean done;
public vodi flipDone() {
done = !done;//这句代码并不是原子操作,其实是两个指令,所以就算使用了volatile也不一定能翻转done的值
}
多线程的内存模型:main memory(主存)、working memory(线程栈),在处理数据时,线程会把值从主存load到本地栈,完成操作后再save回去(volatile关键词的作用:每次针对该变量的操作都激发一次load and save)。针对多线程使用的变量如果不是volatile或者final修饰的,很有可能产生不可预知的结果(另一个线程修改了这个值,但是之后在某线程看到的是修改之前的值)。其实道理上讲同一实例的同一属性本身只有一个副本。但是多线程是会缓存值的,本质上,volatile就是不去缓存,直接取值。在线程安全的情况下加volatile会牺牲性能。
使用不可变类(自己觉得的,有待考究)
ThreadLocal(java.util.concurrent包)
ThreadLocal类的作用是为每个线程提供各自的实例(即数据),ThreadLocal是泛型类,简单说一下原理,每个线程都有一个ThreadLocalMap(该类是ThreadLocal的内部类,是一种特殊的Map)类型的变量,用来保存本线程需要用到的数据。ThreadLocalMap使用ThreadLocal对象作为键,线程要保存的数据作为值,所以如果一个线程需要保存多个变量,则需要多个ThreadLocal对象作为键,ThreadLocal主要的方法有get()
、set(T t)
和remove()
,还有个initialize()
方法,作用是提供初始值,默认是返回null,一般需要重写来返回一个需要的对象。get方法其实是先通过Thread.currentThread().getMap().get(this)
,即先获得当前线程,然后获得线程的map,然后通过this(即该ThreadLocal对象)获取对应的值
ThreadLocal的作用原子类
原子类使用高效的机器级的指令(而不是锁)来保证操作的原子性。在java.util.concurrent.atomic包中提供了很多原子类(AtomicInteger、AtomicLong、AtomicBoolean、AtomicReference……)。赋值操作时原子性的,但是i++
和i--
这样的自增自减语句不是原子的,AtomicInteger类提供了incrementAndGet
和decrementAndGet
方法,以原子的方式将一个整数自增和自减,用AtomicInteger作为共享的计数器而不需要使用同步。不过书上说不建议应用程序员使用这些类,这些类仅供开发并发工具的系统程序员使用,不知为什么。Lock和Condition
ReentrantLock实现了Lock。Lock的方法有lock()
,unlock()
,tryLock()
(分为不带超时和带超时的),lockInterruptily()
,newCondition()
(构造并返回与该对象锁相关的条件对象),Lock对象是可以多次执行lock方法的,Lock对象会保持一个计数,用来记录lock的调用次数,解锁需要同样的次数。另外使用锁就不能使用带资源的try语句
Condition的方法有await()
(虽然获得锁但是不满足条件时调用,会使当前线程放弃锁对象并进入该条件对象的等待区),signal()
,signalAll()
,其实跟synchronized本质都是一样的,只是synchronized是隐式的,一个锁一个条件,而Lock是显式的,而且一个锁可以有多个条件,每个条件对象管理自己的等待区里的线程,而synchronized则是锁对象自己管理等待区里的线程
lock: 在java.util.concurrent.locks包内。共有三个实现:
ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock
主要目的是和synchronized一样, 两者都是为了解决同步问题,处理资源争端而产生的技术。功能类似但有一些区别。区别如下:
lock更灵活,可以自由定义多把锁的加锁解锁顺序(synchronized要按照先加的后解顺序)。提供多种加锁方案,lock 阻塞式(线程去申请锁,如果该锁被其他线程持有,线程将被阻塞),tryLock 无阻塞式(线程去申请锁,如果该锁被其他线程持有,返回false,线程不会被阻塞,可以去做其他事情), lockInterruptily 可打断式, 还有tryLock的带超时时间版本。本质上和监视器锁(即synchronized是一样的),能力越大,责任越大,必须控制好加锁和解锁,否则会导致灾难。
- 容器类
BlockingQueue和ConcurrentHashMap
- 阻塞队列BlockingQueue
阻塞队列。该类是java.util.concurrent包下的重要类,通过对Queue的学习可以得知,这个queue是单向队列,可以在队列头添加元素和在队尾删除或取出元素。类似于一个管道,特别适用于先进先出策略的一些应用场景。普通的queue接口主要实现有PriorityQueue(优先队列),有兴趣可以研究。BlockingQueue在队列的基础上添加了多线程协作的功能:
除了传统的queue功能(表格左边的两列)之外,还提供了阻塞接口put和take,带超时功能的阻塞接口offer和poll。put会在队列满的时候阻塞,直到有空间时被唤醒;take在队列空的时候阻塞,直到有东西拿的时候才被唤醒。用于生产者-消费者模型尤其好用,堪称神器
常见的阻塞队列有
ArrayBlockingQueue//数组实现
LinkedBlockingQueue//链表实现
LinkedBlockingDeque//链表实现,双端队列
ArrayListBlockingQueue
LinkedListBlockingQueue
DelayQueue
PriorityBlockingQueue//堆实现
SynchronousQueue
方法 | 描述 |
---|---|
add(e) | 在队尾添加一个元素,队列满,抛出IllegalStateException异常 |
remove() | 移除并返回头元素,队列空,抛出NoSuchElementException |
element() | 返回头元素,队列空,抛出NoSuchElementException |
offer(e) | 在队尾添加一个元素,添加成功返回true,队列满,返回false |
offer(e,time,unit) | 在队尾添加一个元素,添加成功返回true,队列满,返回false |
poll() | 移除并返回头元素,队列空,返回null |
poll(time,unit) | 移除并返回头元素,队列空,返回null |
peek() | 返回头元素,队列空,返回null |
put(e) | 队尾添加一个元素,队列满,线程阻塞 |
take() | 移除并返回头元素,队列空,线程阻塞 |
- ConcurrentHashMap
高效的线程安全哈希map。请对比hashTable,concurrentHashMap,HashMap
- 管理类
ThreadPoolExecutor
死锁
一般在一段代码中使用多个锁会产生死锁
public class DeadLock {
public static void main(String[] args) {
Object o1 = new Object();
Object o2 = new Object();
Thread t1 = new Thread(new T1(o1, o2));
Thread t2 = new Thread(new T2(o1, o2));
t1.start();
t2.start();
}
}
class T1 implements Runnable {
Object o1;
Object o2;
T1(Object o1, Object o2) {
this.o1 = o1;
this.o2 = o2;
}
@Override
public void run() {
synchronized (o1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) {
}
}
}
}
class T2 implements Runnable {
Object o1;
Object o2;
T2(Object o1, Object o2) {
this.o1 = o1;
this.o2 = o2;
}
@Override
public void run() {
synchronized (o2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
}
}
}
}
未捕获异常处理器
run方法没有返回值,也不能抛出异常,如果没有处理(catch)异常,线程就会死亡。未捕获异常处理器需要实现Thread.UncaughtExceptionHandler接口,该接口只有一个方法
void uncaughtException(Thread t,Throwable e)
,我们可以通过Thread的实例方法setUncaughtExceptionHandler来为一个线程设置独立的未捕获异常处理器。或静态方法setDefaultUncaughtExceptionHandler来为一个线程设置默认的未捕获异常处理器。如果线程没安装默认的处理器,则默认的处理器是空的。如果没安装独立的处理器,则此时的处理器是该线程的ThreadGroup对象,ThreadGroup实现了Thread.UncaughtExceptionHandler接口,它的uncaughtException做法为
- 如果该线程组有父线程组,则调用父线程组的uncaughtException方法
- 否则,调用Thread.getDefaultUncaughtExceptionHandler,如果返回非空,则调用其uncaughtException方法
- 否则,如果未捕获的异常是一个ThreadDeath对象,则什么也不做
- 否则,就是我们经常看到的,线程的名字和异常的栈追踪被输出到System.err上
被锁保护的代码区
我们称为临界区,尽管锁的存在保证了单线程运行这段代码,直到该线程运行结束其他线程才有机会,但这是正常情况。考虑这样一个情况,在一笔银行转账交易的线程中,A账户转给B账户1000元,当A账户扣完款之后,发生了异常,导致线程异常结束,但是B账户并没有增加1000元。这样的情况,尽管线程是安全的,但是数据还是出了问题,需要使用事务。所以被锁保护的代码本身也是可能出问题的,需要特别注意。
特殊情况下的释放锁
- synchronized方式如果遇到异常导致线程死亡的话,是会自动释放锁的。Lock方式如果遇到异常导致线程死亡的话,需要主动unlock来释放锁,否则不会释放锁从而造成死锁,所以unlock一般都放在finally块里面
- 一个获得锁对象的线程在执行过程中,如果时间片用完了,还没执行完,也不会释放锁(所以别的线程也拿不到锁),下次获得时间片从暂停的地方继续执行
线程组
线程分组,方便管理
守护线程
从线程分类上可以分为:用户线程(以上讲的都是用户线程),另一个是守护线程。守护线程是这样的,所有的用户线程结束生命周期,守护线程才会结束生命周期,只要有一个用户线程存在,那么守护线程就不会结束,例如 java 中著名的垃圾回收器就是一个守护线程,只有应用程序中所有的线程结束,它才会结束。
Thread t1 = new Thread(new Runnable2());
t1.setName("t1");
// 将t1这个用户线程修改成守护线程.在线程没有启动时可以修改以下参数
t1.setDaemon(true);
t1.start();
慎用e.printStrackTrace方法
该方法是将异常信息以及异常栈追踪输出到System.err。原本我以为只是打印到控制台这样简单地处理一下异常就接着执行后面的代码,但实际该语句会占用很大的内存,可能把应用卡死
例子
线程安全的类和不安全的类
类名 | 是否线程安全 |
---|---|
SimpleDateFormat | no |
java.util.Random | yes |
StringBuffer | no |
StringBuilder | yes |
Vector | yes |
Hashtable | yes |
监视器的概念
因为使用锁和条件(Condition)实现同步,并不是面向对象的,所以有个牛人曾经提出监视器类的概念,监视器的数据域全是私有的,而且每个监视器类都有一个相关的锁,当线程调用该监视器类的方法时,自动获得该对象的锁,方法结束时自动释放该锁,因为监视器类的所有数据域是私有的,该锁也是私有的,所以也是线程安全的,并且这是面向对象的,程序开发人员不需要去考虑同步的问题。但是java的设计者并没有完全采纳这种做法(部分采用,使用了监视器的概念),因为监视器类要求所有的数据域都是私有的。
线程安全的集合
- BlockingQueue系列
- ConcurrentHashMap
- ConcurrentSkipListMap
putIfAbsent(k,v)
removeIfPresetn(k,v)
remove(k,v)
replace(k,old,new)
- ConcurrentSkipListSet
- ConcurrentLinkedQueue
- CopyOnWriteArrayList
- CopyOnWriteArraySet
- 同步包装器
同步线程包装器是为“原集合对象”的实际操作找一个代理对象,代理在“原集合对象”的一切功能之上又增加了同步功能(只是对这个“代理对象”上的操作同步,“原集合对象”上的操作非同步)。java的同步线程包装器是有条件的同步,只有对集合的原子粒度的操作才同步。对于有并发情况的迭代操作,因为迭代操作是通过对对像集的调用间接操作原对像,所以在迭代时要对迭代的对像实现再同步
Collection c = Collections.synchronizedCollection(myCollection);
...
synchronized(c) {
Iterator i = c.iterator(); // Must be in the synchronized block
while (i.hasNext())
foo(i.next());
}
public static Collection synchronizedCollection(Collection c);
public static Set synchronizedSet(Set s);
public static List synchronizedList(List list);
public static Map synchronizedMap(Map m);
public static SortedSet synchronizedSortedSet(SortedSet s);
public static SortedMap synchronizedSortedMap(SortedMap m);
Callable
Runnable的run方法没有参数,也没有返回值,不能抛出异常。Callable的call方法有返回值,而且可以抛异常。Callable接口是泛型的,有一个类型参数,是返回值的类型
public interface Callable<V> {
V call() throws Exception;
}
Future
Future
Future保存线程的计算结果,可以把Future对象交给一个线程,任务结束后,通过Future对象获得结果
public interface Future<V>{
V get() throws...;
V get(long timeout, TimeUnit unit) throws...;
boolean cancel(boolean mayInterrupt);
boolean isCancel();
boolean isDone();
}
get方法获得计算结果,但如果计算没完成将会一直等待,会阻塞线程,直到计算完成。
isDone方法,如果计算在进行中,则返回false,如果计算结束,返回true
cancel方法用于取消计算,如果计算还没开始,计算就会被取消;如果计算处于运行中,而mayInterrupt参数为true,计算就被中断
Futuretask
FutureTask包装器是一种非常便利的机制,它可以将Callable转化成Future和Runnable,它同时实现了二者的接口
Callable<Integer> callable = new MyCallable<Integer>();
FutureTask<Integer> task = new FutureTask<Integer>(callable );
Thread t = new Thread(task);//此时task是Runnable
t.start();
...
Integer result = task.get();//此时task是Future
FutureTask的构造器
FutureTask(Callable<V> task);
FutureTask(Runnable task, V result)
执行器(Executors)和线程池
Executors的多个静态方法返回不同功能的线程池(前3个返回实现了ExecutorService接口的对象,后两个返回实现了ScheduledExecutorService接口的对象,),虽然我们在构建线程池的时候使用的是Executors的静态方法,但是通过源码我们知道实际上Executors还是调用ThreadPoolExecutor的构造函数,所以如果我们需要自己定制化的线程池,还是通过ThreadPoolExecutor的构造方法来构造。
ThreadPoolExecutor总结
//Executors的newFixedThreadPool源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
方法 | 描述 |
---|---|
newFixedThreadPool | 创建指定数量的线程池,空闲线程会一直保留。线程不够用时,其他任务放进队列排队,有空余线程时再执行 |
newCachedThreadPool | 当池中线程不够用时,创建一个新的线程。空闲超过60s的线程会被清除 |
newSingleThreadExecutor | 单例值有一个线程的池,个任务按顺序一个一个执行 |
newScheduledThreadPool | 返回一个用于预定或重复执行的给定数量线程池,代替java.util.Timer |
newSingleThreadScheduledExecutor | 返回一个用于预定或重复执行的单线程池 |
线程池
ExecutorService接口
Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task);
execute(Runnable);
List<Future<T>> invokeAll(Collection<Callable<T>> tasks);//添加任务集,返回任务结果集
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit);//同上,带超时
T invokeAny(Collection<Callable<T>> tasks);添加任务集,返回最先完成的任务的结果
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit);//同上,带超时
shutdown();
shutdownNow();
submit和execute的区别是前者有返回值,后者无返回值。shutdown()后,线程池不再接受新的任务,当所有已接受任务完成后,线程池中的线程死亡;shutdownNow()后,不仅不接受新的任务,还取消已接受但未开始的任务,并试图中断正在进行的线程
ThreadPoolExecutor
ThreadPoolExecutor实现了ExecutorService接口,ThreadPoolExecutor总结
//ThreadPoolExecutor例子
//arguments of constructor
int corePoolSize = 0;
int maximumPoolSize = Integer.MAX_VALUE;
long keepAliveTime = 60L;
TimeUnit timeUnit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new java.util.concurrent.LinkedBlockingQueue<Runnable>();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
RejectedExecutionHandler defaultHandler = new java.util.concurrent.ThreadPoolExecutor.AbortPolicy();
//==>>
ThreadPoolExecutor threadPoolExecutor = new java.util.concurrent.ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
timeUnit,
workQueue,
threadFactory,
defaultHandler);
threadPoolExecutor.execute(new Runnable(){
@Override
public void run() {
System.out.println("execute run method.");
}});
threadPoolExecutor.shutdown();
int getLargestPoolSize()//获得线程池生命周期中最大的线程数量
构造函数参数
int corePoolSize,有新task,先创建新Thread来处理task,一直新创建直到corePoolSize个线程
int maxinumPoolSize,当线程数达到corePoolSize后,新task会被放到阻塞队列,如果阻塞队列也放不下,则新创建线程来处理task,直到创建maxinuxPoolSize个线程
long keepAliveTime,当线程数大于corePoolSize,或者allowCoreThreadTimeOut为true时,线程获取阻塞等待队列里的任务的超时时间,否则,会一直阻塞等待。
TimeUnit timeUnit,keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue,存放task的阻塞队列
ThreadFactory threadFactory,使用工厂模式创建新线程时
RejectedExecutionHandler handler,当线程超过corePoolSize,阻塞队列也放不下task,线程数而且也超过maxinumPoolSize时,会拒绝task,调用handler.rejectedExecution(..)方法拒绝
其实,还有另外一个属性allowCoreThreadTimeOut,不过不在构造函数来传值
/**
- false:默认值,当thread空闲时,且队列里没任务,则thread阻塞在queue.take()方法,从而保证thread不退出。
-
true:当thread空闲时,且队列里没任务,而通过queue.poll(keepAliveTime)获取任务超时时,则thread退出,值为true时,keepAliveTime值要大于0。
*/
private volatile boolean allowCoreThreadTimeOut;
ScheduledExecutorService接口,可以延迟或循环执行给定的任务
//这两个方法在指定的时间之后执行task
schedule(Callable<V> task, long time, TimeUnit unit);
schedule(Runnable task, long time, TimeUnit unit)
scheduleAtFixedRate(Runnable task, long initalDealy, long period, TimeUnit);//在指定的初始延迟之后,周期性地执行task,周期为period,即上一次task开始和下一次task开始之间的时间为period
scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit);//在指定的初始延迟后,周期性的执行task,上一次task结束和下一次task开始之间的时间为delay
Runnable、Callable和Thread
不管是Runnable还是Callable,其只是定义了任务,并不能开启线程,真正开启线程的还是Thread
代码片
- 确保其他线程都结束,然后自己才结束
while(Thread.activeCount() > 1) {
Thread.yeild();
}
Fork-Join框架(java7引入,卷一P686)
RecursiveTask<T>(返回T类型的计算结果)
RecursiveAction(不需要返回结果)
覆盖compute方法(相当于run和call方法)
join方法返回结果
invokeAll方法
ForkJoinPool类
同步器(卷一P688)
运用于哪些相互合作的线程集
- 信号量
一个信号量管理许多的许可证(permits),其实没有许可对象,只是维护一个计数。许可证的数量(即计数)限制了通过的线程数,线程通过调用acquire请求许可 - 倒计时门栓(CountDownLatch)
让一个线程集等待直到计数器变为0,倒计时门栓是一次性的,一旦计数为0就不能再重用了 - 障栅(CyclicBarrier)
多个线程完成计算后集合到一同一个障栅,然后触发一个操作(比如把所有线程的结果汇总),障栅可重用 - 交换器(Exchanger)
当两个线程工作在统一数据结构的两个实例上的时候,一个往实例添加数据,一个清除实例的数据,两个线程都完成时就可以交换数据,典型的例子是一个线程往缓存区填数据,另一个线程消耗另一个相同类型的缓冲区的数据,当两个线程都完成时,就可以交换缓冲区 - 同步队列(SynchronousQueue)
同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put方法时,该线程将阻塞直到另一个线程调用t同步队列的take方法为止,反之亦然。同步队列数据仅沿着一个方向传递,从生产者到消费者,即使SynchronousQueue实现了BlockingQueue接口,从概念上讲,同步队列不是一个队列,以为它不包含任何元素,它的size方法总返回0