一、如何创建线程
1、将类声明为Thread的子类,重写Thread类的run方法,使用子类的实例调用start()方法启动线程。
class MyThread extends Thread {
public void run(){
//code
}
}
MyThread m = new MyThread();
m.start();
2、让类实现Runnable接口,该类实现run方法,然后在Thread构造方法中传入Runnable接口的实现类,使用Thread对象调start()方法启动线程。
class MyThread implements Runnable {
public void run(){
//code
}
}
MyThread m = new MyThread();
//new Thread(m).start();
Thread t = new Thread(m);
t.start();
实现Runnable接口,避免了继承Thread类的单继承局限性。
3、匿名内部类的方式
public class Test {
public static void main(String[] args){
new Thread(new Runnable(){
public void run(){
//code
}
}).start();
}
}
4、让类实现Callable接口,该类实现call()方法,使用FutureTask类来包装Callable实现类的对象,然后再Thread构造方法中传入FutureTask对象,然后再使用Thread对象调start()方法启动线程。
public class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
return "data";
}
}
public class Test{
public static void main(String[] args){
Callable<String> myThread = new MyThread();
FutureTask<String> ft = new FutureTask<>(myThread);
Thread t = new Thread(ft);
t.start();
String s = ft.get();
System.out.println("callable线程返回值:" + s);
}
}
打印结果:callable线程返回值:data
二、线程池
jdk中线程池通过线程池工厂类创建,再通过线程去执行任务方法。
- Executors:线程池创建工厂类
- ExecutorService:线程池类
- Future:用来记录线程任务执行完毕后产生的结果
1、线程池的使用
使用Executors
中的newFixedThreadPool()
静态方法得到线程池对象,然后调用submit()
方法提交线程执行任务。
submit(Runable task):
下面的例子是提交一个Runnable的任务,返回的Future,该Future的get()方法在成功完成时返回null。
public class MyThread implements Runnable{
//run方法
}
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
es.submit(new MyThread());
}
}
submit(Runable task,T result):
下面的例子是提交一个Runnable任务,返回的Future,该Future的get()方法在成功完成时将会返回给定的结果。 可以将传入的参数在run里处理并返回结果。
public class Data {
int num = 1;
String str = "threadData";
//省略
}
public class MyThread implements Runnable{
Data data;
public void run(){
Data.setStr("data")
}
}
public class ThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(5);
Future<Data> runableResult = es.submit(new MyThread(data), data);
System.out.println(runableResult.get());
}
}
打印结果:Data{num=1, str='data'}
submit(Callable<T> task):
下面的例子是提交一个Callable的任务,返回一个表示该任务的Future,然后通过Future中的get()方法可以获取到返回值。
public class MyThread implements Callable<String> {
@Override
public String call() throws Exception {
return "aaa";
}
}
public class ThreadPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(5);
Future future = es.submit(new MyThread());
System.out.println(future.get());
}
}
打印的结果为:aaa
2、java中Executors创建的四种线程池
- newCachedThreadPool()
创建一个可缓存的线程池。如果有新任务提交时,有空闲线程则直接处理任务,如果没有就创建新的线程处理任务,线程池不对线程池大小做限制。
newCachedThreadPool(ThreadFactory threadFactory):创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory
创建新线程。
注:也可以使用ThreadPoolExecutor
创建个性化设置的线程池。这里讲的四种线程池看源码,里面都是使用ThreadPoolExecutor
构建的。
ExecutorService es = Executors.newCachedThreadPool();
- newFixedThreadPool(int nThreads)
创建一个固定大小的线程池。当线程达到线程池的最大值,再提交任务则进入队列中,等有线程空闲时,再从队列中取出任务继续执行。
newFixedThreadPool(int nThreads,ThreadFactory threadFactory)
,threadFactory
是创建新线程时使用的工厂。
- newScheduledThreadPool(int corePoolSize)
创建一个固定大小的线程池,支持定时及周期性任务执行。corePoolSize
是池中所保存的线程数,即使线程是空闲的也包括在内。
newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory)
,threadFactory
是创建新线程时使用的工厂。
下面代码中的schedule(Runnable<V> command,long delay,TimeUnit unit)
方法的参数:第一个参数是要执行的任务,第二个是从现在开始延迟执行时间,unit是延迟参数的时间单位。
ThreadPoolRunnable tpr = new ThreadPoolRunnable();
ScheduledExecutorService es = Executors.newScheduledThreadPool(3);
es.schedule(tpr,3,TimeUnit.SECONDS);
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
方法可以创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期。
-
newSingleThreadExecutor()
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行。这里的单线程执行指的是线程池内部,从线程池外看,提交任务到线程池时并没有阻塞,仍然是异步的。
3、为什么使用ThreadFactory及用法
使用ThreadFactory
有几点好处:
- 我们可以自己设置一个线程名,而不用使用默认的pool-thread-n这种名字。
- 可以给线程设置成守护线程。
- 可以设置优先级。
- 可以处理为捕捉的异常。
ThreadFactory使用方法:
MyRunnable:
public class MyRunnable implements Runnable{
@Override
public void run() {
System.out.println("当前线程为:" + Thread.currentThread().getName());
}
}
MyThreadFactory:
public class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("threadFactory线程");
return thread;
}
}
public class Test {
public static void main(String[] args) {
MyThreadFactory tf = new MyThreadFactory();
MyRunnable runnable = new MyRunnable();
ExecutorService es = Executors.newFixedThreadPool(tf);
es.submit(runnable);
}
}
三、ThreadPoolExecutor用法
1、Executors创建的四种线程池可能会出现的问题
上面说了Executors
创建的几种线程池,但是不建议使用Executors
去创建线程池,这是为什么呢?
我们来看一下Executors
创建的线程池源码:
- Executors.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
源码中corePoolSize
被设置为0,maximumPoolSize
被设置为Integer.MAX_VALUE
,keepAliveTime被设置为60秒,因为最大线程池是Integer.MAX_VALUE
,可能会因为创建大量的线程导致OOM。
- Executors.newScheduledThreadPool(n)
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
其问题和newCachedThreadPool
一样,源码中maximumPoolSize
被设置为Integer.MAX_VALUE
,所以可能会因为创建大量的线程导致OOM。
- Executors.newFixedThreadPool(n)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
源码中使用的是LinkedBlockingQueue
无界队列作为线程池的工作队列,因为没有设置容量,其默认队列的容量是Integer.MAX_VALUE
,所以也可能会堆积大量的请求,从而导致OOM。
- Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
和newFixedThreadPool
一样,因为使用LinkedBlockingQueue
也可能堆积大量的请求,从而导致OOM。
注:如果使用Executors的静态方法创建ThreadPoolExecutor对象,可以通过使用Semaphore
对任务的执行进行限流也可以避免出现OOM异常。
2、ThreadPoolExecutor配置:
有下面7个参数:
int corePoolSize:核心线程池大小
int maximumPoolSize:最大线程池大小
long keepAliveTime:非核心线程最大空闲时间
TimeUnit unit:时间单位
BlockingQueue<Runnable> workQueue:线程排队策略
RejectedExecutionHandler handler:拒绝策略
ThreadFactory threadFactory:线程工厂
-
corePoolSize和maximumPoolSize
ThreadPoolExecutor将根据corePoolSize
和maximumPoolSize
设置的边界自动调整线程池大小。
corePoolSize和maximumPoolSize:
当新任务在方法execute(Runnable)
中提交任务时,如果运行的线程少于corePoolSize
时,哪怕线程池中有空闲状态的线程,也会创建一个新的线程来处理任务。如果运行的线程多于corePoolSize
而少于maximumPoolSize
,则仅当队列满时才创建新线程。如果设置corePoolSize
和maximumPoolSize
相同,则创建了固定大小的线程池。如果将maximumPoolSize
设置为基本的无界值(如Integer.MAX_VALUE
),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用setCorePoolSize(int)
和setMaximumPoolSize(int)
进行动态更改。
按需构造:
默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法prestartCoreThread()
或prestartAllCoreThreads()
对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。
创建新线程:
使用ThreadFactory创建新线程,如果没有另外说明,则在同一个ThreadGroup
中一律使用Executors.defaultThreadFactory()
创建线程,这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
-
keepAliveTime线程存活时间
此参数是在池中当前有多于corePoolSize
的线程起作用,这些多出的线程在空闲时间超过keepAliveTime
时终止。目的是为了在池处于非活跃状态时减少资源消耗的方法。
可以使用setKeepAliveTime(long keepAliveTime,TimeUnit unit)
方法动态更改此参数。使用Long.MAX_VALUE
,TimeUnit.NANOSECONDS
这两个参数时可以使空闲线程永远不会在关闭之前终止。
只要keepAliveTime值非0,allowCoreThreadTimeOut(boolean)
方法可以将此超时策略应用于核心线程(corePool)。
-
TimeUnit时间单位
用于设置keepAliveTime设置的超时时间的单位。keepAliveTime:60L,TimeUnit.SECONDS
代表:60秒。
注:TimeUnit.SECONDS.sleep(1); //相当于Thread.sleep(1000);
即可以使用TimeUnit里的sleep()方法来代替Thread.sleep()方法
-
BlockingQueue<Runnable>任务排队策略
所有的BlockingQueue
都可用于传输和保持提交的任务。此队列的使用时与池大小进行交互,下面是排队时机:
- 如果运行的线程少于
corePoolSize
,则Executor
始终首选添加新的线程,而不进行排队。 - 如果运行的线程等于或多于
corePoolSize
,则Executor
始终首选将请求加入队列,而不添加新的线程。 - 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出
maximumPoolSize
,在这种情况下,任务将被拒绝。
主要有三种排队策略:
直接提交队列:
直接提交队列的默认选项是SynchronousQueue
,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列:
使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列:
当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
-
RejectedExecutionHandler任务拒绝策略
当Executo
已经关闭,或Executor
将有限边界用于最大线程数和工作对列容量,并且两者已饱和时,在execute方法中提交新任务将被拒绝。以上两种情况下,exeute
方法都将调用RejectedExecutionHandler
的rejectedExecution()
方法。线程池提供了下面四种预定的拒绝策略:
中止策略(默认):ThreadPoolExecutor.AbortPolicy()
线程数超过maximumPoolSize
时,直接拒绝,抛出运行时RejectedExecutionException
的异常。
调用者运行策略:ThreadPoolExecutor.CallerRunsPolicy()
用调用者的线程来调用execute
,除非executor
被关闭,否则任务不会被丢弃。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。注意:当最大线程池过小时,此策略下,任务会交上层线程(即调用executor
的线程)执行,导致上层线程既要处理其他任务,又要处理排队中的大量任务,如果遇到长连接,上层线程将长时间阻塞,出现故障。
抛弃旧任务策略:ThreadPoolExecutor.DiscardOldestPolicy()
如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。注意:如果此时阻塞队列使用PriorityBlockingQueue
优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。
直接丢弃策略:ThreadPoolExecutor.DiscardPolicy()
线程数超过maximumPoolSize
时,任务被直接被丢弃。和AbortPolicy
一样,但不抛出异常。
写在最后:
- 如果文章中有错误或是表达不准确的地方,欢迎大家评论中指正,以便我完善。
- 文章我也会根据所学到新的知识不断更新。