取材于网络,忘记哪些帖子惹,挺多的。。。主要是我自己防止自己忘记记录的
1.线程
是大家比较熟悉的概念,线程和进程都有五个阶段:创建、就绪、运行、阻塞、终止。多线程即一个程序有多个顺序流在执行。
实现的方法 有三种:Thread、Runnable 和 Callable接口与Future、线程池结合。
2.首先说java.lang.Thread
Thread类,是很方便的一种,使用起来很快速,如果我们只是想启一个线程,没特殊要求,可以直接用Thread。使用的方法也极其简单,继承,并填充run方法。等使用的时候直接new出来,然后使用start方法即可,使用十分简单。
public class MyThread extends Thread {
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("线程 " + name + i);
try {
sleep((int)Math.random()*100);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
test:
new MyThread("A").start();
new MyThread("B").start();
这就行了,程序运行main方法的时候,java会启动一个线程,主线程随着main创建。调用start方法之后,另外两个线程也启动了。Start方法调用之后,并不是立即执行了多线程的代码,而是先把线程转为Runnable状态,操作系统决定代码何时运行。多线程的程序实际上的乱序执行的,执行结果是随机的。
3.然后说一下Runnable
是接口,目标类需要实现接口,然后重写run方法,使用也是很简单,new Thread()里面参数填我们自己创建的runnable实现类就可以了。然后还是调用start方法。
好处也很明显,毕竟java类单继承,但是可以实现多个接口。
public class MyRunnable implements Runnable{
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("线程 " + name + i);
try {
sleep((int) Math.random() * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
test:
new Thread(new MyRunnable("A")).start();
new Thread(new MyRunnable("B")).start();
当类实现了Runnable接口之后,此类就具有多线程的特征,接下来做的就是填充run方法。看源码可知,Thread类实际上也是实现了Runnable的类。启动的时候,是先通过Thread类构造方法,调用生成的Thread对象的start方法。可以说多线程的都是通过Thread的start方法来运行的。Thread的API是多线程的关键。
要注意一点!!!线程池只能放入实现Runnable和Callable类线程,不能直接放入继承Thread的类。
4.最后就是Callable
一般来说,使用Runnable接口和继承Thread实现线程是无法给我们返回结果的,如果需要结果,那么可以使用Callable,但是Callable只能在ExecutorService的线程池里跑,同时可以通过返回的Future对象查询执行状态,Future取得了执行异步任务的结果。
详细说一下,先看个例子
public class MyCallable implements Callable {
private String name;
public MyCallable(String name) {
this.name = name;
}
@Override
public Object call() throws Exception {
for (int i = 0; i < 10; i++) {
System.out.println("线程 " + name + i);
try {
sleep((int) Math.random() * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return name;
}
}
写倒是同样很简单,但是怎么去使用呢?
最好是通过线程池去启动:
ExecutorService executor = Executors.newCachedThreadPool();
MyCallable c1 = new MyCallable("A");
MyCallable c2 = new MyCallable("B");
MyCallable c3 = new MyCallable("C");
Future<Integer> result1 = executor.submit(c1);
Future<Integer> result2 = executor.submit(c2);
Future<Integer> result3 = executor.submit(c3);
System.out.println("task1运行结果:"+result1.get());
System.out.println("task2运行结果:"+result2.get());
System.out.println("task3运行结果:"+result3.get());
executor.shutdown();
这里用了缓存型池。后续补充说明。
看完例子我们稍稍微微的看一下源码。
Callable接口的源码很简单,只有一个call方法,返回的一个泛型,这个就是线程返回的结果。然后使用ExecutorService接口是负责线程池调度的。
里面比较重要的是:
submit(Callable<T> task)
submit(Callable<T> task,T result)
Submit(Runnable task)
boolean awaitTermination(long timeout, TimeUnit unit)
Vord shutdown()
前三个很明显,就是帮助我们开启线程的,类似start
第四个是阻塞,监测ExecutorService是否关闭,如果关闭了返回true,否则false。一般和shutdown组合使用。
最后的shutdown顾名思义,平滑的关闭ExecutorService。调用时,停止接收新的任务并且等待已经提交的任务执行完。所有任务完成之后,线程池关闭。
再看Future接口:
get() 是等待线程结果返回,会阻塞
get(long timeout,TimeUnit unit) 设置超时时间
cancel() 取消任务执行,任务已经完成或者已经取消则失败,参数是是否应该试图停止任务的方式去中断线程。
还有isCancelled和isDone都是判断状态的。
关于Callable只有这么多吗???当然不是!!!有个很重要的类叫做FutureTask,它是Future的实现类,并且实现了Runnable接口,所以可以通过Executor来执行,也可以传递给Thread对象执行。
如果要在主线程执行比较耗时的工作,同时不想阻塞主线程,可以交给Future对象在后台完成。Executor框架利用FutureTask完成异步任务,一般来说用它进行耗时的计算。主线程可以在完成自己的任务之后,再去其获取结果。FutureTask的使用有两种情况:
- 1.作为new Thread()的参数。
- 2.ExecutorService.submit() 扔线程池里。
public static void main(String[] args) {
// 1.直接启动线程
MyCallable task1 = new MyCallable("A");
MyCallable task2 = new MyCallable("B");
FutureTask<Integer> result1 = new FutureTask<Integer>(task1);
FutureTask<Integer> result2 = new FutureTask<Integer>(task2);
Thread thread1 = new Thread(result1);
Thread thread2 = new Thread(result2);
thread1.start();
thread2.start();
// 2.线程池启动
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> result3 = executor.submit(task1);
Future<Integer> result4 = executor.submit(task2);
executor.shutdown();
}
发现了吗?其实用了线程池反而更简单了。。。
但是有个问题,如果我们开了很多很多线程,这些线程的执行时间是未知的,但是我们有需要返回结果,如果我们只是通过Future和FutureTask去取结果效率就很低,因为我们需要通过循环不断遍历线程池里面的线程,判断执行状态并取得结果。于是针对这种情况,有一个CompletionService。
其原理在于将线程池执行结果放到一个Blockqueueing里面,这里线程执行结果进入Blockqueueing的顺序只与线程的执行时间有关。
CompletionService是一个接口,里面有五个方法分别是:
//提交线程任务
Future<V> submit(Callable<V> task);
//提交线程任务
Future<V> submit(Runnable task, V result);
//阻塞等待
Future<V> take() throws InterruptedException;
//非阻塞等待
Future<V> poll();
//带时间的非阻塞等待
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
这个接口有一个实现类ExecutorCompletionService。
实现类的源码可知,最终线程要提交到Executor里面去运行,所以构造函数中需要Executor参数。每当线程执行完毕之后会往阻塞队列添加一个Future。
举例时间:
只用Future:
//不使用 CompletionService 只用 Future
public static void futureTest() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Integer>> result = new ArrayList<>();
// 假设有10个线程
for (int i = 0; i < 10; i++) {
Future<Integer> submit = executor.submit(new MutiThreadFuture(i));
result.add(submit);
}
executor.shutdown();
// 依次等待返回结果
for (Future<Integer> future : result) {
System.out.println("返回结果" + future.get());
}
}
使用了CompletionService:
public static void CompleteTest() throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newCachedThreadPool();
// 完成服务
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < 10; i++) {
completionService.submit(new MutiThreadFuture<>(i));
}
// 依次等待返回结果
for (int i = 0; i < 10; i++) {
System.out.println("结果" + completionService.take().get());
}
}
得到的结果:
pool-1-thread-3
pool-1-thread-2
pool-1-thread-6
pool-1-thread-4
pool-1-thread-1
返回结果0
返回结果1
返回结果2
返回结果3
pool-1-thread-5
返回结果4
返回结果5
pool-1-thread-7
返回结果6
pool-1-thread-9
pool-1-thread-8
返回结果7
返回结果8
pool-1-thread-10
返回结果9
------------------------------------------------------分割线-----------------------------------------------
pool-2-thread-2
结果1
pool-2-thread-6
pool-2-thread-1
结果5
pool-2-thread-10
结果0
结果9
pool-2-thread-3
pool-2-thread-4
结果2
结果3
pool-2-thread-7
pool-2-thread-5
结果4
结果6
pool-2-thread-8
pool-2-thread-9
结果7
结果8
根据结果能看得出来,使用了Completion之后的结果的输出和线程放入没啥关系。
5.简单说一下线程池
刚才在上面一直在用的线程池是这个:newCachedThreadPool
但实际上还有三种线程池可供选择:
- newFixedThreadPool 固定容量的线程池,线程达到最大值的时候,线程池的规模不会再变化。
- newCachedThreadPool 缓存的线程池,线程达到最大值之后,将会回收空的线程。当需求增加,线程数量也会增加,没有上限。
- newSingleThreadPoolExecutor 单线程的线程池,串行执行。
- newScheduledThreadPool 固定容量的线程池,以延迟或者定时的方式去执行,一般来说会选择消息队列或xxl-job去做定时,而不是这个。
使用线程池有啥好处呢?首先效率高,重用现有的线程,可以在处理多个请求的时候分担线程产生和销毁的开销。请求达到一定数量的时候,工作线程已经存在,响应性大大提高。线程池的容量是可以控制的,尽可能的利用好资源。
使用线程池的方法也很简单!就像上面写的,首先写一个线程类,三种方式任意。
然后建立一个线程池,四个任意,推荐newCachedThreadPool
(也可以通过newFixedThreadPool 设置线程池大小,有效利用资源)
最后调用线程池操作,executorService.execute(你的线程)
然后就完成了!!!是不是很EZ??!
那么要分析一下:
1.什么情况用CachedThreadPool
它首先会创造足够多的线程去执行任务,随着程序的执行,有点线程执行完,可以循环使用,这时候不必新建线程。客户端线程和线程池之间会有一个任务队列,程序要关闭时,需要注意两件事!1.入队的任务现在什么情况。2.正在运行这个任务现在怎么样了。有两种方式去关闭线程池,这很重要!1.入队任务全部执行完毕(shutdown())。2.舍弃这些任务直接结束(shutdownNow())。根据具体情节,程序员自己要做取舍。 注意:主线程的执行和线程池里面的线程分开,很有可能主线的线程结束了,但是线程池还在运行。
2.什么情况用FixedThreadPool
它固定了容量,这个模最大式线程数目是一定的。当确定任务资源占用的情况之后,想要控制资源利用,那么可以使用这个线程池。线程执行完成就从线程池直接移出,不能保证顺序性,全看线程之间的竞争。
3.什么情况用newSingleThreadExecutor
它能保证的是线程的执行顺序,并且能够保证线程结束之后,下一条线程再被开启。
4.什么情况用newScheduledThreadPool
它功能看起来很强大,的确很强大,因为可以设置时间和线程执行的先后间隔,但是大多数情况下,我会选择直接使用消息队列或者定时器。
说了四个线程池,他们有个共同的特性,都传了不同的参数去调用了同一个接口,ThreadPoolExecutor。然后这个ThreadPoolExecutor就厉害了。
找了个带注释的源码部分:
//运行状态标志位
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
//线程缓冲队列,当线程池线程运行超过一定线程时并满足一定的条件,待运行的线程会放入到这个队列
private final BlockingQueue<Runnable> workQueue;
//重入锁,更新核心线程池大小、最大线程池大小时要加锁
private final ReentrantLock mainLock = new ReentrantLock();
//重入锁状态
private final Condition termination = mainLock.newCondition();
//工作都set集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//线程执行完成后在线程池中的缓存时间
private volatile long keepAliveTime;
//核心线程池大小
private volatile int corePoolSize;
//最大线程池大小
private volatile int maximumPoolSize;
//当前线程池在运行线程大小
private volatile int poolSize;
//当缓冲队列也放不下线程时的拒绝策略
private volatile RejectedExecutionHandler handler;
//线程工厂,用来创建线程
private volatile ThreadFactory threadFactory;
//用来记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
//用来记录已经执行完毕的任务个数
private long completedTaskCount;
................
}
execute(Runnable command)方法很重要
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
注释写了很多,一共分三步:
1.如果运行的线程,小于corePoolSize那么尝试开新线程。addWorker方法的原子性,检查runState和workerCount,可以通过返回false防止出现错误。
2.如果一个任务成功排队,我们仍然要再次检查一下是否应该添加一个线程。或者线程池会关闭。我们重新检查状态,如果没有线程就新开一个,如果停止就有必要回滚队列。
3.如果没法排队了,就要添加新线程了,如果失败了,我们就会发现队列饱和或者是线程池被关闭,所以会拒绝任务。