3.1 同步控制
3.1.1 重入锁
重入锁可以代替synchronized关键字
使用java.util.concurrent.locks.ReentrantLock类来实现
小案例:
package chapter_3;
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for(int j = 0;j < 10000000; j++){
lock.lock(); //加锁
try{
i++;
}finally{
lock.unlock(); //释放锁
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock tl = new ReenterLock();
Thread t1 = new Thread(tl);
Thread t2 = new Thread(tl);
t1.start();t2.start();
t1.join();t2.join();
System.out.println(i);
}
}
重入锁的能力:
1.中断响应:在synchronized中一个线程在等待锁,只有两种情况,要么获得锁继续执行,要么一直等待。在重入锁中,线程可以被中断,在等待锁的过程中,程序可以根据需要取消对锁的请求。
案例:产生了一个死锁,得益于锁中断,可以解决死锁问题。
package chapter_3;
import java.util.concurrent.locks.ReentrantLock;
public class IntLock implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
//控制加锁顺序,方便构造死锁
public IntLock(int lock){
this.lock = lock;
}
@Override
public void run() {
try{
if(lock == 1){
lock1.lockInterruptibly(); //在等待锁的过程中,可以中断响应
try {
Thread.sleep(500);
} catch (InterruptedException e) {
lock2.lockInterruptibly();
}
}else{
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
lock1.lockInterruptibly();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock1.isHeldByCurrentThread())
lock1.unlock();
if(lock2.isHeldByCurrentThread())
lock2.unlock();
System.out.println(Thread.currentThread().getId()+":线程退出");
}
}
public static void main(String[] args) throws InterruptedException {
IntLock r1 = new IntLock(1);
IntLock r2 = new IntLock(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();t2.start();
Thread.sleep(1000);
//中断t2线程
t2.interrupt();
}
}
线程t1和t2启动后,t1先占用lock1,再占用lock2;t2先占用lock2,再请求lock1,形成t1和图
之间的相互等待。最后,t2被中断,t2释放lock2,t1顺利完成工作。
2.所申请等待限时
限时等待也可以有效避免死锁
使用方法,例:lock.tryLock(5.TimeUnit.SECONDS),设置等待时间为5秒,如果在5秒内没有获得锁,返回false。
也可以不带参数,lock.tryLock(),这种情况下,当前线程会尝试获得锁,如果未被其他线程占用,则申请成功,立即返回true,如果被其他线程占用,立即返回false。
案例:
package chapter_3;
import java.util.concurrent.locks.ReentrantLock;
public class TryLock implements Runnable {
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public TryLock(int lock){
this.lock = lock;
}
@Override
public void run() {
if(lock == 1){
while(true){
if(lock1.tryLock()){ //对锁lock1申请
try {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(lock2.tryLock()){ //对锁lock2申请
try{
System.out.println(Thread.currentThread().getId()+":My job done");
return;
}finally {
lock2.unlock();
}
}
}finally {
lock1.unlock();
}
}
}
}else{
while(true){
if(lock2.tryLock()){
try{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(lock1.tryLock()){
try{
System.out.println(Thread.currentThread().getId()+":My job done");
return;
}finally {
lock1.unlock();
}
}
}finally {
lock2.unlock();
}
}
}
}
}
public static void main(String[] args) {
TryLock r1 = new TryLock(1);
TryLock r2 = new TryLock(2);
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();t2.start();
}
}
这种情况是让t1获得lock1,t2获得lock2,让t1申请lock2,t2申请lock1,一般情况下,会导致他和2相互竞争,从而导致死锁。
但有了tryLock()后,线程不是一直等待,而是不停地尝试,因此,只有执行时间足够长,线程总能获得所有需要的资源。
3.公平锁
指的是根据时间顺序,先到先得。
优点:不会产生饥饿现象,只要你排队,最终都可以等到资源。
缺点:实现成:eentrantLock(boolean fair)对锁的公平性进行设置,true为公平锁
ReentrantLock几个方法总结
- lock():获得锁,如果锁已被占用,则等待。
- lockInterruptibly():获得锁,但优先响应中断。
- tryLock():尝试获得锁,成功返回true。该方法不等待,立即返回。
- tryLock(long time,TimeUnit unit):在给定时间内尝试获得锁。
- unlock():释放锁。
3.1.2 Condition
Condition接口提供的基本方法如下:
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time,TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()方法或signalAll()方法时,线程重新获得锁并继续执行。或者当线程被中断,也能跳出等待。与Object,wait()类似。
awaitUninterruptibly()与await()基本相同,只是不会在等待中响应中断。
singal()方法用于唤醒一个等待中的线程,singalAll()会唤醒所有等待线程。
信号量(Semaphore)
信号量构造函数:
public Semaphore(int permits) //参数为信号量的准入数
public Semaphore(int permits,boolean fair) //第二个参数可以指定是否公平
主要逻辑方法:
void acquire()
void acquireUniterruptibly()
boolean tryAcquire()
boolean tryAcquire(long time,TimeUnit unit)
void release()
- acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。
- acquireUniterruptibly()不能响应中断
- tryAcquire()尝试获得一个许可,成功则返回true,不会等待,立即返回
- release()释放一个许可
案例:
package chapter_3;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemapDemo implements Runnable {
final Semaphore semp = new Semaphore(5);
@Override
public void run() {
try{
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId()+":done!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semp.release();
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for(int i = 0;i < 20;i++){
exec.submit(demo);
}
}
}
本例中同时开启20个线程,信号量设置为5,运行后,以5组一个单位输出。
3.1.4 ReadWriteLock读写锁
使用读写锁可以有效的减少资源竞争,提升系统性能。在重入锁中,读与读之间是串行操作。但因为读并不会破坏数据,因此读与读之间可以同时进行。
- 读与读不互斥
- 读与写互斥
- 写与写互斥
3.1.5 倒计数器:CountDownLatch
这个类通常用来控制线程等待,让一个线程等待直到倒计数结束。
构造函数为
public CountDownLatch(int count)
- countdown()方法,使计数器减一
- await()方法,等待计数器为零
3.1.6 循环栅栏:CyclicBarrier
也是一种多线程并发控制工具,功能比CountDownLatch强大,其构造函数为
public CyclicBarrier(int parties,Runnable barrierAction)
barrierAction为一次计数器计数完成后,系统会执行的动作。
案例:模拟士兵集合并完成任务
package chapter_3;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static class Soldier implements Runnable{
private String soldier;
private final CyclicBarrier cyclic;
public Soldier(CyclicBarrier cyclic,String soldierName){
this.cyclic = cyclic;
this.soldier = soldierName;
}
public void dowork(){
try{
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier+":任务完成");
}
@Override
public void run() {
try{
//等待所有士兵到齐
cyclic.await();
dowork();
//等待所有士兵完成工作
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static class BarrierRun implements Runnable{
boolean flag;
int N;
public BarrierRun(boolean flag,int N){
this.flag = flag;
this.N = N;
}
@Override
public void run() {
if(flag){
System.out.println("司令:[士兵"+N+"个,任务完成]");
}else{
System.out.println("司令:[士兵"+N+"个,集合完毕]");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N,new BarrierRun(flag,N));
System.out.println("集合队伍!");
for(int i = 0;i < N;++i){
System.out.println("士兵"+i+"报道!");
allSoldier[i] = new Thread(new Soldier(cyclic,"士兵"+i));
allSoldier[i].start();
}
}
}
上述代码创建了CyclicBarrier实例,并将计数器设置为10,在计数器达到指标时,执行Soldier中的run(),在run()中,每一个士兵线程都会等待,直到士兵都集合完毕。以为着CyclicBarrier的一次计数完成,当再一次调用CyclicBarrier.await()时,会进行下一次计数。用来等待所有士兵完成任务。
一旦所有士兵都集合完成或都完成任务就会执行BarrierRun,打印相关信息。
3.1.7 线程阻塞工具类:LockSupport
是一个非常方便实用的线程阻塞工具,它可以在线程任意位置让线程阻塞。
其静态方法park()可以阻塞当前线程,parkNanos(),parkUntil()实现一个限时的阻塞。
unpark()用于唤醒线程。
3.2 线程复用:线程池
对线程进行控制和管理。
3.2.1 什么是线程池
为了避免系统频繁的创建和销毁线程。在线程池中,总有几个活跃线程。当你需要使用线程时,从池子中拿一个空闲线程,当工作完成时,并不着急关闭线程,而是将线程退回池中。
在使用线程池后,创建线程变成了从线程池获得空闲线程,关闭线程变成了向线程池归还线程。
3.2.2 JDK对线程池的支持
JDK提供了一套Executor框架,本质就是一个线程池,其中的几个方法:
- newFixedThreadPool():该方法返回一个固定线程数量的线程池,线程池中的线程数始终不变。
- newSingleThreadExecutor():该方法返回只有一个线程的线程池。
- newCachedThreadPool():该方法返回一个可根据实际情况调整线程数量的线程池。
- newSingleThreadScheduledExecutor():返回一个ScheduledExecutor对象,线程池大小为1.可以在给定时间执行某任务。
1.固定大小的线程池
小案例:
package chapter_3;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static class Mytask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Mytask mytask = new Mytask();
//创建大小为5的线程池
ExecutorService es = Executors.newFixedThreadPool(5);
for(int i = 0;i < 10;i++){
es.submit(mytask);
}
}
}
}
创建了大小为5的线程池,依次向线程池提交了10次任务。线程池会安排调度这10个任务使用5个线程,以下是运行结果。
2.计划任务
newScheduledThreadPool()方法返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。主要方法如下:
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
方法的区别:
- scheduleAtFixedRate: 创建一个周期性任务。任务开始于给定的初始时延。后续任务按照给定的周期运行:后续第一个任务将会在initialDelay+period时执行,后续第二个任务将在initialDelay+2*period后执行,依次类推。
- scheduleWithFixedDelay: 创建一个周期性任务。任务开始于给定的初始时延。后续任务将按照指定的时延进行:delay = 上一个任务结束到下一个任务开始的时间差。
使用scheduledAtFixedRate()的小案例:
package chapter_3;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}
设定任务执行1秒,调度周期为2秒,也就是每2秒执行一次。
3.2.3 核心线程池的内部实现
上述创建内存池的方法本质上都是通过ThreadPoolExecutor类。
public static ExecutorService newFixedThreadPool(int nThread){
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
public static ExecutorService newSingleThreadPool(int nThread){
return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(int nThread){
return new ThreadPoolExecutor(0,Interger.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
下面是该类的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maxinumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
各个参数含义:
- corePoolSize:指定了线程池中线程数量。
- maximumPoolSize:指定了线程池中最大线程数量。
- keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程存活时间。
- unit:keepAlivetime的单位。
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:线程工厂,用于创建线程。
- handler:拒绝策略,当任务太多来不及处理时,如何拒绝任务。
其中BlockingQueue<Runnable> workQueue参数可以使用以下几种方法实现:
直接提交的队列:由SynchronousQueue对象提供。SynchronousQueue没有容量,提交的任务不会被真实保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新线程,如果线程已达到最大线程数量则执行拒绝执行策略。
有界的任务队列:使用ArrayBlockingqueue类实现。 其构造函数为
public ArrayBlockingQueue(int capacity) //参数表示该队列的最大容量
若有新任务执行,如果线程池的实际线程小于corePoolSize,则优先创建新的线程,若大于,则进入等待队列,若等待队列ArrayBlockingQueue已满,则在线程池线程数量小于maxPoolSize的情况下创建新的线程。若大于maxPoolSize,则执行拒绝执行策略。
- 无界的任务队列:通过LinkedBlockingQueue类实现。除非系统资源耗尽,否则不存在进入等待队列失败。所以,当线程池线程小于corePoolSize时,新任务来时会创建线程,当线程池线程大于corePoolSize时,全部进入等待队列,因此,线程池中最多有corePoolSize的线程。
- 优先任务队列:通过PriorityBlockingQueue类实现,可以控制任务执行的先后顺序
3.2.4 拒绝策略
JDK提供四中拒绝策略如下:
- AbortPolicy策略:直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy策略: 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的线程。这样做不会丢弃线程,但可能会使提交线程性能急剧下降。
- DiscadOldestPolicy策略:丢弃一个最老的请求,也就是即将被执行的一个任务,再尝试提交当前任务。
- DiscardPolicy策略:丢弃无法处理的任务,不做任何处理。
以上策略均实现了RejectedExecutionHandler接口,若无法满足实际需求,可以自定义实现这个接口。接口如下:
public interface RejectedExecutionHandler{
void rejectedExecution(Runnable r,ThreadPoolExecutor executor)
}
其中r为请求执行的任务,executor为当前线程池。
小案例:
package chapter_3;
import java.util.concurrent.*;
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable{
@Override
public void run() {
try {
System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask myTask = new MyTask();
//设置线程池corePoolSize和maxPoolSize均为5,等待队列为有界,大小为10的队列,拒绝方法自定义
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//处理方法,打印被遗弃任务信息
System.out.println(r.toString()+" is discard");
}
});
for(int i = 0;i < Integer.MAX_VALUE;i++){
es.submit(myTask);
Thread.sleep(10);
}
}
}
3.2.5 自定义线程创建:ThreadFactory
ThreadFactory是一个接口,其中创建线程的方法为:
Thread newThread(Runnable r);
小案例:通过自定义ThreadFactory,记录线程的创建,将所有线程都设置成守护线程。当主线程退出后,会强制销毁线程池。
package chapter_3;
import java.util.concurrent.*;
public class ThreadFactoryDemo {
public static void main(String[] args) throws InterruptedException {
RejectThreadPoolDemo.MyTask task = new RejectThreadPoolDemo.MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("create "+t);
return t;
}
});
for(int i = 0;i < 5;i++){
es.submit(task);
}
Thread.sleep(2000);
}
}
3.2.6 扩展线程池
ThreadPoolExecutor是一个可以扩展的线程池,提供了beforeExecute(),afterExecute()和terminated()三个接口用来对线程池进行控制。
以beforeExecute()和afterExecute()为例,ThreadPoolExecutor.Worker.runTask()方法提供以下的内部实现:
boolean ran = false;
beforeExecute(thread,task) //运行前
try{
task.run(); //运行
ran = true;
afterExecute(task,null); //运行结束后
++completedTasks;
}catch(RuntimeExeception ex){
if(!ran)
afterExecute(task,ex); //运行结束
throw ex;
}
ThreadPoolExecutor.Worker是ThreadPoolExecutor的内部类,实现Runnable接口,Worker.runTask()方法会被线程池以多线程模式异步调用,即Worker.runTask()会同时被多个线程访问,因此,beforeExecute()和afterExecute()接口也可以在其中被调用。
案例:线程池的扩展,记录每个任务的执行日志。
package chapter_3;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExThreadPool {
public static class Mytask implements Runnable{
public String name;
public Mytask(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("正在执行"+":Thread ID:"+Thread.currentThread().getId()+",Task name"+name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.SECONDS
,new LinkedBlockingDeque<Runnable>()){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("准备执行"+((Mytask)r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("执行完成:"+((Mytask)r).name);
}
@Override
protected void terminated() {
System.out.println("线程池退出");
}
};
for(int i = 0;i < 5;i++){
Mytask task = new Mytask("TASK-GEYM"+i);
es.execute(task);
Thread.sleep(10);
}
es.shutdown();
}
}
可以看到所有任务的执行前、执行后以及任务的名字都已近可以捕获。对于应用程序的调试和诊断是很有帮助的。
3.2.7 优化线程池线程数量
线程池的大小对系统有一定的影响。要避免极大和极小两种情况。一般来说,按照下列公式即可:
Ncpu = Cpu的数量
Ucpu = 目标cpu使用率,0<= Ucpu <=1
W/C = 等待时间与计算时间的比例
Nthreads = NcpuUcpu(1+W/C)
java中,可以使用:
Runtime.getRuntime().availableProcessors()
来获得cpu数量。