Java并发总结
1.多线程的优点
- 资源利用率更好
- 程序在某些情况下更简单
- 程序响应更快
2.创建线程
1.实现Runnable接口
new Thread(Runnable).start()
- 可以避免由于java单继承带来的局限
- 增强程序的健壮性,代码能够被多个线程共享,代码和数据是=数据是独立的
- 适合多个相同程序代码的线程区处理同意资源的情况
2.继承Thread类
new MyThread().start()
注意:启动线程的方式必须是start(),若是直接调用Thread.run()代码也能执行,但是就变成了普通方法的调用了,并没有启动线程
3.线程状态
1.线程状态介绍
线程在一定条件下,状态会发生变化。线程一共有以下几种状态:
新建状态(New):新创建了一个线程对象。
就绪状态(Runnable):线程对象创建后,其他线程调用了该对象的start()方法。该状态的线程位于“可运行线程池”中,变得可运行,只等待获取CPU的使用权。即在就绪状态的进程除CPU之外,其它的运行所需资源都已全部获得。
运行状态(Running):就绪状态的线程获取了CPU,执行程序代码。
阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:
等待阻塞:运行的线程执行wait()方法,该线程会释放占用的所有资源,JVM会把该线程放入“等待池”中。进入这个状态后,是不能自动唤醒的,必须依靠其他线程调用notify()或notifyAll()方法才能被唤醒,
同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入“锁池”中。
其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。
死亡状态(Dead):线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
2.中断机制
可中断的阻塞状态:Thread.sleep(),Object.wait(),Thread.join(),ReenTrantLock.lockInterruptibly()。
不可中断的阻塞状态:
synchronized和I/O阻塞状态
1.可以通过调用Thread对象的interrupt()方法来中断线程,但是此方法只是将中断标志设置为true标志,并不能直接中断线程,若执行interrupt()方法时线程处于:
未阻塞状态,那么此时阻塞标志已经设为true,等到下一次阻塞状态来临时,就会直接抛出InterruptedException异常,但是只会被捕捉到,可以在catch块内自行return来结束run方法,否则,只是异常被捕捉,线程仍然可以继续往下执行
可中断的阻塞状态,线程收到中断信号后,会立即抛出InterruptedException, 同时会把中断状态置回为false。
不可中断的阻塞状态,不抛出InterruptedException,也不会退出阻塞状态
2.检查中断状态:
使用 Thread对象的isInterrupted()方法判断中断状态,当调用了interrupt(),isInterrupted()返回true,一旦抛出中断异常中断标志被置为false,isInterrupted()返回false
Thread.interrupted()只是静态方法,只用来判断当前调用它的线程的中断状态,和Thread对象的isInterrupted不同的是,它在每次调用一定会将中断状态置为false
4.守护线程
Java中有两类线程:
1.用户线程:运行在前台的线程
2.守护线程:运行在后台的线程,并为前台线程的运行提供便利服务(比如垃圾回收线程),当所有的用户线程都结束了,那么守护线程也会结束,因为被守护者没有了。因此,不要在守护线程中执行业务逻辑操作(比如对数据的读写等)。
- 可以用Thread对象的setDaemon(true)方法设置当前线程为守护线程,但要在start()之前,否则无效.
- 在守护线程中创建的子线程也是守护线程
- 不是所有的应用都可以分配给守护线程来进行服务,比如读写操作或者计算逻辑
- 后台进程在不执行finally子句的情况下就会终止其run()方法
5.同步机制
1.原子性和可见性
原子性,不能被线程调度器中断的操作,是不可分割的。由Java内存模型来直接保证的原子性变量操作包括read、load、assign、use、store、和write这六个,基本数据类型的访问读写是具备原子性的(long和double)的非原子性协定例外),synchronized块之间的操作也具备原子性
可见性,Java允许多个线程保存共享成员变量的私有拷贝,等到进行完操作后,再赋值回主存(减少了同主存通信的次数,提高了运行的速度)。因此线程对变量的修改是互相不可见的,在赋值的时候就会发生覆盖,这样就引出一个问题-变量可见性。因此,当一个线程修改了共享变量的值,其他线程能够立即得知这个修改,就称这个变量具有可见性。
2.volatile关键字
private volatile boolean value;
volatile关键字具有可见性,被它修饰的变量不能被线程拷贝,即直接在主存读和写,保证了新值能立即刷新,每个线程时刻看到的都是最新值。因此,保证了多线程操作时变量的可见性,而不具有原子性,因为它不会阻塞线程,是一种稍弱的同步机制,要使volatile变量提供理想的线程安全(同时可见性和原子性),必须同时满足下面两个条件,否则要加锁来保证原子性:
- 对变量的写操作不依赖于当前值。例如自增操作就依赖当前值,因为它是一个读取-修改-写入操作序列组成的组合操作,必须以原子方式执行,而 volatile 不能提供必须的原子特性
- 该变量没有包含在具有其他变量的不变式中。
3.synchronised关键字
采用synchronized修饰符实现的同步机制叫做互斥锁机制,每一个对象都有一个monitor(锁标记),只能分配给一个线程。当线程拥有这个锁标记时才能访问这个资源,没有锁标记便进入锁池,因此叫做互斥锁,synchronised同时具有可见性和原子性,原子性是因为锁内的操作不可分割,可见性因为 入锁(进入synchronized)会获取主存中变量的最新值和出锁(退出synchronized)会将变量的最新值刷新回主存
1.实例方法的同步与实例方法内的同步块
实例方法内的synchronized是同步在某个实例对象上。即对象锁
public class MyClass {
public synchronized void log1(String msg1, String msg2){
//...
}
public void log2(String msg1, String msg2){
synchronized(object){
//...
}
}}
2.静态方法的同步与静态方法内的同步块
静态方法内的synchronized是同步在类对象上,锁住的是整个类,即类锁
public class MyClass {
public static synchronized void log1(String msg1, String msg2){
//...
}
public void log2(String msg1, String msg2){
synchronized(MyClass.class){
//...
}
}
}
使用同步机制获取互斥锁的情况,进行几点说明:
一旦某个锁被某个线程获取,那么其他所有在这个锁(同一个对象或类)上竞争的线程都会阻塞,不管是不是同一个方法或代码块(仔细体会)。以对象锁为例,假如有三个synchronized方法a,b,c,当线程A进入实例对象M中的方法a时,它便获得了该M对象锁,其他的线程会在M的所有的synchronized方法处阻塞,即在方法 a,b,c 处都要阻塞
对象级别锁,锁住的是对象,有上面说的锁的特性.
类级别锁,锁住的是整个类,它用于控制对 static 成员变量以及 static 方法的并发访问。有上面说的锁的特性
互斥是实现同步的一种手段,临界区、互斥量和信号量都是主要的互斥实现方式。synchronized 关键字经过编译后,会在同步块的前后分别形成 monitorenter 和 monitorexit这两个字节码指令。根据虚拟机规范的要求,在执行 monitorenter指令时,首先要尝试获取对象的锁,如果获得了锁,把锁的计数器加 1,相应地,在执行 monitorexit 指令时会将锁计数器减 1,当计数器为 0 时,锁便被释放了。由于synchronized同步块对同一个线程是可重入的,一个线程可以多次获得同一个对象的互斥锁,要释放相应次数的该互斥锁,才能最终释放掉该锁。
4.显示的Lock锁
unlock()需放在finally子句中,try中必须有return,以确保unlock()不会过早的发生。
显示的Lock对象在加锁和释放锁方面,相比synchronized,还赋予了更细粒度的控制力。
Lock对象必须被显示的创建、锁定和释放。相比synchronized,代码缺乏优雅性。
在使用Lock锁时,某些事物失败抛出一个异常,可以使用finally去做清理工作,以维护系统使其处于良好的状态,这是synchronized不具有的
ReentrantLock允许尝试获取但最终未获取锁,如果其他线程已经获取这个锁,可以决定离开去执行其他一些事情,而不是等待锁被释放。这是synchronized不具有的
5.synchronized 和 Volatile的比较
在访问volatile变量时不会执行加锁操作,因此也不会使线程阻塞。
加锁机制既可以确保可见性又可以确保原子性,而 volatile 变量只能确保可见性。
如果过度依赖volatile变量来控制状态的可见性,通常会比使用锁的代码更脆弱。仅当volatile变量能简化代码的实现以及对同步策略的验证时,才使用它
在需要同步的时候,第一选择应该是synchronized关键字,这是最安全的方式,
6.TheadLocal
ThreadLocal类的实例,即便被多个线程锁共享,但是在每个线程当中都有一份私有拷贝,并且多个线程无法看到对方的值,即线程对于此变量的使用完全是在自己拷贝对象上。
private ThreadLocal myThreadLocal = new ThreadLocal();
ThreadLocal可以储存任意对象
myThreadLocal.set("A thread local value");//存储此对象的值
String threadLocalValue = (String) myThreadLocal.get();//读取
ThreadLocal myThreadLocal1 = new ThreadLocal<String>();//泛型使用
7.死锁
1.普通循环等待死锁
如果在同一时间,线程A持有锁M并且想获得锁N,线程B持有锁N并且想获得锁M,那么这两个线程将永远等待下去,这种情况就是最简单的死锁形式。
public class DeadLock{
private final Object left = new Object();
private final Object right = new Object();
public void leftRight() throws Exception
{
synchronized (left)
{
Thread.sleep(2000);
synchronized (right)
{
System.out.println("leftRight end!");
}
}
}
public void rightLeft() throws Exception
{
synchronized (right)
{
Thread.sleep(2000);
synchronized (left)
{
System.out.println("rightLeft end!");
}
}
}
}
死锁的四个必要条件:
- 互斥条件:线程对所分配到的资源进行排他性使用,即在一段时间内,某资源只能被一个线程占用。如果此时还有其它线程请求该资源,则只能等待,直到占有该资源的线程用毕释放。
- 请求和保持条件:已经保持了至少一个资源,但是又提出新的资源请求,而资源已被其他线程占有,此时请求线程只能等待,但对自己已获得的资源保持不放。
- 不可抢占条件:线程已获得的资源在未使用完之前不能被抢占,只能线程使用完之后自己释放。
- 循环等待条件:在发生死锁时,一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务所持有的资源,使得大家被锁住。
避免死锁的方式:
1、只在必要的最短时间内持有锁,考虑使用同步语句块代替整个同步方法;
2、设计时考虑清楚锁的顺序,尽量减少潜在的加锁交互数量
3、既然死锁的产生是两个线程无限等待对方持有的锁,我们可以使用Lock类中的tryLock方法去尝试获取锁,这个方法可以指定一个超时时限,获取锁超时后会返回一个失败信息,放弃取锁。
2.重入锁死
如果一个线程在两次调用lock()间没有调用unlock()方法,那么第二次调用lock()就会被阻塞,这就出现了重入锁死。避免重入锁死有两个选择:
- 编写代码时避免再次获取已经持有的锁
- 使用可重入锁
8.多线程集合的安全使用
在 Collections 类中有多个静态方法,它们可以获取通过同步方法封装非同步集合而得到的集合:
public static Collection synchronizedCollention(Collection c)
public static List synchronizedList(list l)
public static Map synchronizedMap(Map m)
public static Set synchronizedSet(Set s)
public static SortedMap synchronizedSortedMap(SortedMap sm)
public static SortedSet synchronizedSortedSet(SortedSet ss)
在多线程环境中,当遍历当前集合中的元素时,希望阻止其他线程添加或删除元素。安全遍历的实现方法如下:
import java.util.*;
public class SafeCollectionIteration extends Object {
public static void main(String[] args) {
//为了安全起见,仅使用同步列表的一个引用,这样可以确保控制了所有访问
//集合必须同步化,这里是一个List
List wordList = Collections.synchronizedList(newArrayList());
//wordList中的add方法是同步方法,会自动获取wordList实例的对象锁
wordList.add("Iterators");
wordList.add("require");
wordList.add("special");
wordList.add("handling");
//获取wordList实例的对象锁,
//迭代时,此时必须阻塞其他线程调用add或remove等方法修改元素
synchronized ( wordList ) {
Iterator iter = wordList.iterator();
while ( iter.hasNext() ) {
String s = (String) iter.next();
System.out.println("found string: " + s + ", length=" + s.length());
}
}
}
}
大部分的线程安全类都是相对线程安全的,也就是我们通常意义上所说的线程安全,像Vector这种,add、remove方法都是原子操作,不会被打断,但也仅限于此,如果有个线程在遍历某个Vector、有个线程同时在add这个Vector,99%的情况下都会出现·ConcurrentModificationException·,也就是fail-fast机制
6.多线程协作
1.wait、notify、notifyAll的使用
wait():将当前线程置入休眠状态,直到接到唤醒通知或被中断为止。调用后当前线程立即释放锁。
notify():用来通知那些正在等待该对象的对象锁的其他线程。如果有多个线程等待,则线程规划器任意挑选出其中一个wait()状态的线程来发出通知唤醒它。其他线程继续阻塞,但是调用后当前线程不会立马释放该对象锁,直到程序出锁
notifyAll():notifyAll会使在该对象锁上wait的所有线程统统退出wait的状态(即全部被唤醒),待程序出锁后,所有被唤醒的线程共同竞争该锁,没有竞争到锁的线程会一直竞争(不是阻塞)
注意:在调用wait、notify、notifyAll方法之前,必须先获得对象锁,并且只能在synchronized代码块或者方法中调用。否则抛出IllegalMonitorStateException异常
总结:
如果线程调用了对象的wait()方法,那么该线程便会处于该对象的 等待池 中,等待池中的线程不会去竞争该对象的锁。
如果线程调用了对象的notifyAll()方法(唤醒所有wait线程)或notify()方法(只随机唤醒一个wait线程),被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。
优先级高的线程竞争到对象锁的概率大,假若某线程没有竞争到该对象锁,它还会留在锁池中。
2.notify 通知的遗漏
当线程 A 还没开始 wait 的时候,线程 B 已经 notify 了,这样,线程 B 的通知是没有任何响应的,当 线程B 退出 synchronized 代码块后,线程A 再开始 wait,便会一直阻塞等待。也就是说这个通知信号提前来了,没有wait线程收到,因此丢失了信号,为了避免丢失信号,可以设置一个成员变量来标志信号。
public class MyWaitNotify{
MonitorObject myMonitorObject = new MonitorObject();
//一旦调用notify,则设为true表示唤醒信号发出来了,则设为false表示唤醒信号已经被其他某个线程消耗了,
boolean wasSignalled = false;
public void doWait(){
synchronized(myMonitorObject){
//自旋锁,循环检查,只有当为true时才表示有唤醒信号来了
while(!wasSignalled){
try{
myMonitorObject.wait();
} catch(InterruptedException e){...}
}
//clear signal and continue running.
wasSignalled = false;
}
}
public void doNotify(){
synchronized(myMonitorObject){
wasSignalled = true;
myMonitorObject.notify();
}
}
}
如果有多个线程被notifyAll()唤醒,所有被唤醒的线程都会在while循环里检查wasSignalled变量值,但是只有一个线程可以获得对象锁并且退出wait()方法并清除wasSignalled标志(设为false)。这时这个标志已经被第一个唤醒的线程消耗了,所以其余的线程会检查到标志为false,还是会回到等待状态。
3.字符串常量或全局对象作为锁的隐患
字符串常量和全局对象在不同的实例当中是同一个对象,即其实用的是同一把锁。因此本来在不同实例对象的线程会互相干扰,例如在实例A中的线程调用notifyAll()可能会唤醒实例B当中的wait线程。因此应该避免使用这两种对象作为监视器对象,而应使用每个实例中唯一的对象
String myMonitorObject = "";//相同String 常量赋值在内存当中只会有一份对象
4.生产者-消费者模型synchronized实现
生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。问题在于如何通过线程之间的协作使得生产和消费轮流进行。
package job_3;
import java.util.Random;
public class Datebuf {
private Integer i = 0;
private int result;
private Random random;
public Datebuf() {
random = new Random();
}
public void sendData() {
while (!Thread.interrupted()) {
synchronized (this) {
try {
while (i != 0) {
this.wait();
}
i = random.nextInt(100);
System.out.println("线程 " + Thread.currentThread().getName()
+ "生产" + i);
this.notify();
} catch (InterruptedException e) {
return;
}
}
}
}
public void addData() {
while (!Thread.interrupted()) {
synchronized (this) {
try {
while (i == 0) {
this.wait();
}
result += i;
System.out.println("线程 " + Thread.currentThread().getName()
+ "消费" + i + "--result=" + result);
i = 0;
this.notify();
} catch (InterruptedException e) {
return;
}
}
}
}
}
5.其他协调方法
1.join()
一个线程可以调用其他线程的join()方法,其效果是等待其他线程结束才继续执行。如果某个线程调用t.join(),此线程将被挂起,直到目标线程t结束才恢复(即t.isAlive()为假)。
2.yield()
建议线程调度器让其他具有相同优先级的线程优先运行,但只是建议,并不一定就是别的线程先运行了
7.线程池
1.ExecutorService介绍
ExecutorService的生命周期包括三种状态:运行,关闭,终止。创建后便进入了运行状态,当调用了 shutdown()方法时,便进入关闭状态。
使用线程池的好处:
- 降低资源消耗。通过重复利用已创的线程降低线程创建和销毁在成的消耗
- 提高响应速度。当任务到达的时候,不需要再去创建线程就能立即执行
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控
2.自定义线程池
public ThreadPoolExecutor (
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
corePoolSize:线程池中所保存的核心线程数,包括空闲线程。
maximumPoolSize:池中允许的最大线程数。
keepAliveTime:线程池中的空闲线程所能持续的最长时间,超过将被线程池移除,可以通过调大此值来提高线程的利用率。
unit:持续时间的单位。
-
workQueue:任务执行前保存任务的队列,仅保存由 execute 方法提交的 Runnable 任务。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态(缓冲区为1的生产者消费者模式)
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
-
饱和策略
- AbortPolicy: 直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
- 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久
3.创建线程池
以下四个线程池底层都是调用了ThreadPoolExecutor的构造方法,所以它们主要只是参构造数设置上的差异,理解了它们的默认构造参数值就能明白它们的区别
newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 直接提交策略SynchronousQueue,无界线程池(maximumPoolSize无限大),可以进行自动线程回收
newFixedThreadPool(int)
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 使用了LinkedBlockingQueue无界队列(workQueue无限大),线程数当超过了coreSize,此队列由于是链式则可以无限添加(资源耗尽,另当别论),永远也不会触发产生新的线程,且线程结束即死亡不会被重复利用。
newScheduledThreadPool(int)
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
延迟调用周期执行示例,表示延迟1秒后每3秒执行一次:
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("delay 1 seconds, and excute every 3 seconds");
}
}, 1, 3, TimeUnit.SECONDS);
- 定长调度型线程池,这个池子里的线程可以按 schedule 依次 delay 执行,或周期执行,这是特殊的DelayedWorkQueue的效果.
SingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1,0L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
- 单例线程,任意时间池中只能有一个线程,如果向该线程池提交了多个任务,这些任务将排队
当试图通过 excute 方法将一个 Runnable 任务添加到线程池中时,按照如下顺序来处理:
ex=>start: 提交任务
co=>condition: 核心线程池满了?
new=>operation: 创建新线程
queue=>condition: 缓冲队列无法加入?
add=>operation: 加入缓冲队列
bao=>operation: 饱和政策处理
max=>condition: 最大线程池满了?
e=>end
ex->co
co(yes)->queue
co(no)->new
queue(yes)->max
queue(no)->add
max(yes)->bao
max(no)->new
4.几种排队的策略
直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即无空闲线程),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool 采用的便是这种策略。
无界队列,典型的便是的LinkedBlockingQueue(链式),当线程数超过corePoolSize时,新的任务将在缓冲队列无限排队。因此,创建的线程就不会超过corePoolSize,maximumPoolSize的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。
有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。
5.关闭线程池
- shutdown:不可以再submit新的task,已经submit(分两种,正在运行的和在缓冲队列的)的将继续执行。并interrupt()空闲线程。
- shutdownNow:试图停止当前正执行的task,清除未执行的任务并返回尚未执行的task的list。
- 只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true
6.Executor执行Runnable和Callable任务
Runnable
无返回值,无法抛出经过检查的异常,通过execute方法添加
ExecutorService executorService = Executors.newSingleThreadExecutor();//创建线程池
executorService.execute(new TestRunnable());//添加任务
Callable
返回Future对象,获取返回结果时可能会抛出异常,通过submit方法添加
package threadLearn;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CallableDemo{
public static void main(String[] args){
ExecutorService executorService=Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
//创建10个任务并执行
for (int i = 0; i < 10; i++){
//使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
Future<String> future = executorService.submit(new TaskWithResult(i));
//将任务执行结果存储到List中
resultList.add(future);
}
//遍历任务的结果
for (Future<String> fs : resultList){
try{
while(!fs.isDone()){//Future返回如果没有完成,则一直循环等待,直到Future返回完成
System.out.println("还没完成");
}
System.out.println(fs.get());//打印各个线程(任务)执行的结果
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}finally{
//启动一次顺序关闭,执行以前提交的任务,但不接受新任务
executorService.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id){
this.id = id;
}
/**
* 任务的具体过程,一旦任务传给ExecutorService的submit方法,
* 则该方法自动在一个线程上执行
*/
public String call() throws Exception {
System.out.println("call()方法被自动调用" + Thread.currentThread().getName());
Thread.sleep(1000);
//该返回结果将被Future的get方法得到
return "call()方法被自动调用,任务返回的结果是:" + id + "-----" + Thread.currentThread().getName();
}
}
如果真正的结果的返回尚未完成,则get()方法会阻塞等待,可以通过调用 isDone()方法判断 Future 是否完成了返回。
9.线程异常处理
由于线程的本质特性(可以理解不同的线程是平行空间),从某个线程中逃逸的异常是无法被别的线程捕获的。一旦异常逃出任务的run()方法,就会向外传向控制台。Thread.UncaughtExceptionHandler是JavaSE5中的新接口,它允许在每个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用。
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
8.Lock 锁与Condition
1.Lock锁的介绍
Lock接口有3个实现它的类:ReentrantLock、ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁、读锁和写锁。
- lock 必须被显式地创建、锁定和释放,为了使用更多的功能,一般用 ReentrantLock 为其实例化
- 为了保证锁最终一定会被释放(可能会有异常发生),要把互斥区放在 try 语句块内,并在finally语句块中释放锁,尤其当有return语句时,return 语句必须放在try字句中,以确保unlock()不会过早发生,从而将数据暴露给第二个任务。
(1)ReentrantLock与synchronized的比较
等待可中断:当持有锁的线程长期不释放锁时,正在等待的线程可以选择放弃等待,改为处理其他事情,由synchronized产生的互斥锁时,会一直阻塞,是不能被中断的
可实现公平锁:多个线程在等待同一个锁时,必须按照申请锁的时间顺序排队等待,通过构造方法 ReentrantLock(ture)来要求使用公平锁
锁可以绑定多个条件:ReentrantLock 对象可以同时绑定多个 Condition 对象(名曰:条件变量或条件队列),我们还可以通过绑定 Condition 对象来判断当前线程通知的是哪些线程(即与Condition对象绑定在一起的其他线程
(2)ReetrantLock的忽略中断锁和响应中断锁
忽略中断锁与 synchronized实现的互斥锁一样,不能响应中断,而响应中断锁可以响应中断。
ReentrantLock lock = new ReentrantLock();
lock.lockInterruptibly();//获取响应中断锁
(3)读写锁
用读锁来锁定读操作,用写锁来锁定写操作,这样写操作和写操作之间会互斥,读操作和写操作之间会互斥,但读操作和读操作就不会互斥。
ReadWriteLock rwl = new ReentrantReadWriteLock();
rwl.writeLock().lock() //获取写锁
rwl.readLock().lock() //获取读锁
2.生产者-消费者模型Lock与Condition实现
package job_3;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class LockDatebuf implements Data {
private Integer i = 0;
private int result;
private Random random;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public LockDatebuf() {
random = new Random();
}
public void sendData() throws InterruptedException {
while (!Thread.interrupted()) {
lock.lockInterruptibly();
try {
while (i != 0) {
condition.await();
}
i = random.nextInt(100);
System.out.println("线程 " + Thread.currentThread().getName()
+ "生产" + i);
condition.signal();
} catch (InterruptedException e) {
}finally{
lock.unlock();
}
}
}
public void addData() throws InterruptedException {
while (!Thread.interrupted()) {
lock.lockInterruptibly();
try {
while (i == 0) {
condition.await();
}
result += i;
System.out.println("线程 " + Thread.currentThread().getName()
+ "消费" + i + "--result=" + result);
i = 0;
condition.signal();
} catch (InterruptedException e) {
}finally{
lock.unlock();
}
}
}
}
9.并发新特性
1.CountDownLatch
可以让一组任务必须在另一组任务全部结束后才开始执行,向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用await()方法的线程都将阻塞,直至计数值子减为0。其他任务在结束其工作时,可以调用countDown()来减小这个计数值。CountDownLatch被设计为只触发一次。
package threadLearn;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
// 只触发一次,计数值不能被重置
int size = 5;
CountDownLatch latch = new CountDownLatch(size);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Waiting(latch,"wait线程"));
for (int i = 0; i < size; i++) {
exec.execute(new OtherTask(latch));
}
TimeUnit.SECONDS.sleep(1);
exec.shutdown();
}
}
class Waiting implements Runnable {
private CountDownLatch latch;
private String name;
public Waiting(CountDownLatch latch,String name) {
this.latch = latch;
this.name = name;
}
public void run() {
try {
latch.await();
System.out.println(name+"最后执行的任务...");
} catch (Exception e) {
return;
}
}
}
class OtherTask implements Runnable {
private CountDownLatch latch;
private Random rand = new Random();
public OtherTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch (Exception e) {
return;
}
}
private void doWork() throws InterruptedException {
TimeUnit.MICROSECONDS.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 执行完毕");
}
}
2.障碍器 CyclicBarrier
它适用于这样一种情况:你希望创建一组任务,它们并发地执行工作,另外的一个任务在这一组任务并发执行结束前一直阻塞等待,直到该组任务全部执行结束,这个任务才得以执行。这非常像CountDownLatch,只是 CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用
package threadLearn;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
//创建CyclicBarrier对象,
//并设置执行完一组5个线程的并发任务后,再执行MainTask任务
CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
new SubTask("A", cb).start();
new SubTask("B", cb).start();
new SubTask("C", cb).start();
new SubTask("D", cb).start();
new SubTask("E", cb).start();
}
}
/**
* 最后执行的任务
*/
class MainTask implements Runnable {
public void run() {
System.out.println("......终于要执行最后的任务了... ...");
}
}
/**
* 一组并发任务
*/
class SubTask extends Thread {
private String name;
private CyclicBarrier cb;
SubTask(String name, CyclicBarrier cb) {
this.name = name;
this.cb = cb;
}
public void run() {
System.out.println("[并发任务" + name + "] 开始执行");
for (int i = 0; i < 999999; i++) ; //模拟耗时的任务
System.out.println("[并发任务" + name + "] 开始执行完毕,通知障碍器");
try {
//每执行完一项任务就通知障碍器
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
3.信号量Semaphore
信号量 Semaphore 实际上是一个功能完毕的计数信号量,从概念上讲,它维护了一个许可集合,对控制一定资源的消费与回收有着很重要的意义。Semaphore 可以控制某个资源被同时访问的任务数,它通过acquire()获取一个许可,release()释放一个许可。如果被同时访问的任务数已满,则其他 acquire 的任务进入等待状态,直到有一个任务被release掉,它才能得到许可。Semaphore 仅仅是对资源的并发访问的任务数进行监控,而不会保证线程安全,因此,在访问的时候,要自己控制线程的安全访问。
10.性能调优
(1)比较各类互斥技术
Atomic类
不激烈情况下,性能比synchronized略逊,而激烈的时候,也能维持常态。激烈的时候,Atomic的性能会优于ReentrantLock一倍左右。缺点是只能同步一个值,一段代码中只能出现一个Atomic的变量,多于一个同步无效。因为他不能在多个Atomic之间同步。关键字synchronized
在资源竞争不是很激烈的情况下,Synchronized的性能要优于ReetrantLock,原因在于,编译程序通常会尽可能的进行优化synchronizeLock
在资源竞争很激烈的情况下,Synchronized的性能会下降几十倍,但是ReetrantLock的性能能维持常态。ReentrantLock提供了多样化的同步,比如有时间限制的同步,可以被Interrupt的同步(synchronized的同步是不能Interrupt的)等。ReentrantLock拥有Synchronized相同的并发性和内存语义,此外还多了 锁投票,定时锁等候和中断锁等候。可以被中断。
(2)免锁容器
CopyOnWiteArrayList的写入将导致创建整个底层数组的副本,而原数组将保留在原地,使得复制的数组在被修改时,读取操作可以安全的执行。当修改完成时,一个原子性的操作把新的数组换入,使得新的读取操作可以看到这个新的修改。
- 好处是当多个迭代器同时遍历和修改这个列表时,不会抛出ConcurrentModificationException。
- CopyOnWriteArraySet将使用CopyOnWriteArrayList来实现其免锁行为。
- ConcurrenHashMap和ConcurrentLinkedQueue使用了类似的技术,允许并发的读取和写入,但是容器中只有部分内容而不是整个容器可以被复制和修改。在修改完成之前,读取者仍旧不能看到他们。
(3)ReadWriteLock
对向数据结构相对不频繁的写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。ReadWriteLock使得你可以同时有多个读者,只要他们都不试图写入即可。如果写锁已经被其他任务持有,那么任何读者都不能访问,直至这个写锁被释放为止。即适用于读者多于写者的情况。
对于ReadWriteLock的应用主要是:缓存和提高对数据结构的并发性。
锁降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
锁获取的中断:读取锁和写入锁都支持锁获取期间的中断。
Condition 支持 :写入锁提供了一个Condition实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。
读取锁不支持 Condition,readLock().newCondition() 会抛出 UnsupportedOperationException。重入:此锁允许 reader 和 writer 按照 ReentrantLock 的样式重新获取读取锁或写入锁。在写入线程保持的所有写入锁都已经释放后,才允许重入 reader 使用它们。
下面的代码展示了如何利用重入来执行锁降级:
class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
//在获取写锁之前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// 重新检查状态,因为可能其他线程已经获取到读锁了
if (!cacheValid) {
data = ...
cacheValid = true;
}
rwl.readLock().lock();
rwl.writeLock().unlock();
}
use(data);
rwl.readLock().unlock();
}
}