本文主要对Thread
的使用进行归纳总结,让我们对于Thread
有更进一步的了解。
我们对线程的操作一般为:
new Thread(new Runnable() {
@Override
public void run() {
......
}
}).start;
或者为:
public class TestRunnable implements Runnable {
@Override
public void run() {
......
}
}
通过上面的方式我们就能实现新建线程使用了,但是用这种方法创建的线程不易于管理,消耗性能;还可以无限制的创建,之间存在相互竞争,会导致过多占用资源导致系统瘫痪。不过线程的创建还有很多,我们继续往下看。
系统给我们提供了ThreadFactory
接口,里面只有一个线程创建方法newThread(Runnable var1)
,参数为Runnable
对象,看源码如下:
public interface ThreadFactory {
Thread newThread(Runnable var1);
}
下面我们来使用ThreadFactory
接口来进行线程的创建和使用:
首先我们创建一个ThreadFactory
的对象factory
private static void createThreadFactory() {
factory = new ThreadFactory() {
int count = 0;
@Override
public Thread newThread(Runnable runnable) {
count++;
return new Thread(runnable, "Thread:" + count);
}
};
}
我们实现了newThread
方法,并返回了一个线程名字为"Thread:" + count
的新线程。
其次我们创建一个Runnable
对象,并在其中输出当前线程名称,代码:
private static Runnable getRunnable(final String name) {
return new Runnable() {
@Override
public void run() {
System.out.println(name + "-->CurrThread:" + Thread.currentThread().getName() + " started...\n");
}
};
}
最后,就是直接使用了,代码:
private static void useFactory(Runnable runnable) {
factory.newThread(runnable).start();
factory.newThread(runnable).start();
factory.newThread(runnable).start();
}
看上面的代码,我们是创建了3个线程,并运行。
运行代码,我们直接看输出:
Factory-->CurrThread:Thread:1 started...
Factory-->CurrThread:Thread:2 started...
Factory-->CurrThread:Thread:3 started...
这样我们就完成了使用ThreadFactory
来进行新线程的创建了。
我们在使用Executor
来进行线程的创建和使用,其源码如下:
public interface Executor {
void execute(Runnable var1);
}
从上面就可以看出Executor
是一个接口,里面实现了一个方法execute(Runnable var1)
,其参数为Runnable
对象。
首先我们创建一个Executor
对象executor
:
executor = Executors.newCachedThreadPool();
Executors
提供了大量的静态方法,用于创建线程池,返回了一个ExecutorService
接口对象,其实现了Executors
接口。下面介绍几种经常用的方法:
newCachedThreadPool
方法,创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重 用可用线程,否则就新建一条线程。
newScheduledThreadPool
方法,创建一个可延迟执行或定期执行的线程池。
newSingleThreadExecutor
方法,创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务。
newFixedThreadPool
方法,创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。
这几种方法都调用了ThreadPoolExecutor
来完成线程池的创建,具体用法在后面讲解。
其次我们创建一个Runnable
对象,并在其中输出当前线程名称,代码:
private static Runnable getRunnable(final String name) {
return new Runnable() {
@Override
public void run() {
System.out.println(name + "-->CurrThread:" + Thread.currentThread().getName() + " started...\n");
}
};
}
最后,就是直接使用了,代码:
private static void useFactory(Runnable runnable) {
executor.execute(runnable);
executor.execute(runnable);
executor.execute(runnable);
}
看上面的代码,我们是创建了3个线程,并运行。
运行代码,我们直接看输出:
Executor-->CurrThread:pool-1-thread-1 started...
Executor-->CurrThread:pool-1-thread-2 started...
Executor-->CurrThread:pool-1-thread-3 started...
newScheduledThreadPool
方法的使用:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);//生成有3个线程的延时线程池
......
private static void useScheduledThreadPool(Runnable runnable) {
for (int i = 0; i < 20; i++) {
//设置延迟5秒后执行
scheduledExecutorService.schedule(runnable, 5, TimeUnit.SECONDS);
}
}
这样就完成了延迟线程池的使用,其余的方法的使用,和这个大同小异,这里就不一一列举了。
我们使用Callable
接口来进行线程的创建和使用。来看具体使用:
private static Callable<String> getCallable(final String name) {
return new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(name+"-->CurrThread:" + Thread.currentThread().getName() + " started...\n");
return name + "-->CurrThread:" + Thread.currentThread().getName() + " ended...\n";
}
};
}
这段代码生成了Callable<V>
的对象,参数是个泛型;我们这里用的是String
类型,所以最后给我们返回一个String
类型的值。注意这里的call()
方法抛出了异常。
private static void useCallable(Callable<String> callable) {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<>();
for (int i = 0; i < 6; i++) {
resultList.add(executorService.submit(callable));//使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
}
try {
for (Future<String> stringFuture : resultList) {
System.out.println(stringFuture.get());//所有的线程执行完成后,result.get()才会执行
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
在这里创建了6个线程,使用ExecutorService
执行Callable
类型的任务,并将结果保存在Future
类中。Future
类相当于异步计算结果类。这里需要注意的是Future.get()
方法,是在所有的线程执行结束后才会执行。看运行结果为:
Callable-->CurrThread:pool-4-thread-1 started...
Callable-->CurrThread:pool-4-thread-3 started...
Callable-->CurrThread:pool-4-thread-4 started...
Callable-->CurrThread:pool-4-thread-2 started...
Callable-->CurrThread:pool-4-thread-5 started...
Callable-->CurrThread:pool-4-thread-2 started...
Callable-->CurrThread:pool-4-thread-1 ended...
Callable-->CurrThread:pool-4-thread-2 ended...
Callable-->CurrThread:pool-4-thread-3 ended...
Callable-->CurrThread:pool-4-thread-4 ended...
Callable-->CurrThread:pool-4-thread-5 ended...
Callable-->CurrThread:pool-4-thread-2 ended...
从上面的代码,我们就可以看出Callable
接口和Runnable
接口差异在与:
-
Callable
接口有返回值,Runnable
接口没有返回值 -
Callable
接口中是call()
方法,Runnable
接口中是run()
方法 -
Callable
接口中call()
方法抛出了异常,Runnable
接口中run()
方法没有 - 执行
Callable
类型的任务,得到Future
类对象,此对象可以取消任务的执行,判定任务是否取消,以及得到返回值。
直接使用ThreadPoolExecutor
来创建线程,先来看看这个方法:
public ThreadPoolExecutor(int corePoolSize,//
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) //后两个参数为可选参数
参数介绍:
- corePoolSize:核心线程数,如果运行的线程少于corePoolSize,则创建新线程来执行新任务,即使线程池中的其他线程是空闲的.
- maximumPoolSize:最大线程数,可允许创建的线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小。
corePoolSize <运行的线程数< maximumPoolSize:仅当队列满时才创建新线程
corePoolSize=运行的线程数= maximumPoolSize:创建固定大小的线程池 - keepAliveTime:如果线程数多于corePoolSize,则这些多余的线程的空闲时间超过keepAliveTime时将被终止。
- unit:keepAliveTime参数的时间单位。
- workQueue:保存任务的阻塞队列,与线程池的大小有关:
当运行的线程数少于corePoolSize时,在有新任务时直接创建新线程来执行任务而无需再进队列
当运行的线程数等于或多于corePoolSize,在有新任务添加时则选加入队列,不直接创建线程
当队列满时,在有新任务时就创建新线程 - threadFactory:使用ThreadFactory创建新线程,默认使用defaultThreadFactory创建线程。
- handler:定义处理被拒绝任务的策略,默认使用ThreadPoolExecutor.AbortPolicy,任务被拒绝时将抛出RejectExecutorException。
在代码中的使用
int corePoolSize = 3;
int maximumPoolSize = 5;
int workQueueSize = 3;
int currSize = 2;
factory = new ThreadFactory() {
int count = 0;
@Override
public Thread newThread(Runnable runnable) {
count++;
return new Thread(runnable, "Thread:" + count);
}
};
ExecutorService executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(workQueueSize), factory);
for (int i = 0; i < currSize; i++) {
final int index = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println("ThreadPool-->CurrThread:"+ Thread.currentThread().getName()+",Thread:"+ index+" running...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executorService.execute(runnable);
- 当
currSize=2
,那么currSize < corePoolSize
时,会直接启动一个线程来执行任务;运行程序:
ThreadPool-->CurrThread:Thread:1,Thread:0 running...
ThreadPool-->CurrThread:Thread:2,Thread:1 running...
- 当
currSize=4
,那么currSize > corePoolSize
时,任务会直接插入队列中等待,需要等待任务数为currSize-corePoolSize=1
,而1 < workQueueSize
此时队列没有满,因此不会创建新线程;运行程序:
ThreadPool-->CurrThread:Thread:1,Thread:0 running...
ThreadPool-->CurrThread:Thread:2,Thread:1 running...
ThreadPool-->CurrThread:Thread:3,Thread:2 running...
ThreadPool-->CurrThread:Thread:3,Thread:3 running...
- 当
currSize=7
,那么currSize > corePoolSize
时,任务会直接插入队列中等待,需要等待任务数为currSize-corePoolSize=4
,而4 > workQueueSize
此时队列已满,但还有4- workQueueSize=1
个任务没有线程可执行;而1 < maximumPoolSize-corePoolSize
,因此创建一个非核心新线程;运行程序:
ThreadPool-->CurrThread:Thread:1,Thread:0 running...
ThreadPool-->CurrThread:Thread:2,Thread:1 running...
ThreadPool-->CurrThread:Thread:3,Thread:2 running...
ThreadPool-->CurrThread:Thread:4,Thread:6 running...
ThreadPool-->CurrThread:Thread:3,Thread:3 running...
ThreadPool-->CurrThread:Thread:1,Thread:4 running...
ThreadPool-->CurrThread:Thread:4,Thread:5 running...
- 当
currSize=9
,那么currSize > corePoolSize
时,任务会直接插入队列中等待,需要等待任务数为currSize-corePoolSize=6
,而6 > workQueueSize
此时队列已满,但还有6- workQueueSize=3
个任务没有线程可执行,而3 > maximumPoolSize-corePoolSize
,那么就拒绝执行任务,ThreadPoolExecutor 会调用 RejectedExecutionHandler 的 rejectedExecution 方法来通知调用者。 ;运行程序:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.MyClass$1@1540e19d rejected from java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 5, active threads = 5, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.example.MyClass.createThreadPool(MyClass.java:152)
at com.example.MyClass.main(MyClass.java:35)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
ThreadPool-->CurrThread:Thread:1,Thread:0 running...
ThreadPool-->CurrThread:Thread:2,Thread:1 running...
ThreadPool-->CurrThread:Thread:4,Thread:6 running...
ThreadPool-->CurrThread:Thread:5,Thread:7 running...
ThreadPool-->CurrThread:Thread:3,Thread:2 running...
ThreadPool-->CurrThread:Thread:1,Thread:3 running...
ThreadPool-->CurrThread:Thread:2,Thread:4 running...
ThreadPool-->CurrThread:Thread:4,Thread:5 running...
总结:
- 最大线程数为:maximumPoolSize
- 最大任务数为:maximumPoolSize+workQueueSize
通过上面的讲解我们来看看通过new Thread()
和实现Runnable
(这里设置为标记一)建立的线程,与通过线程池(这里设置为标记二)创建的线程之间的区别:
- 通过方式一建立的线程,缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。
- 通过方式一建立的线程,不能设置定时,定期,线程中断等功能。
- 通过方式一建立的线程,性能较差。
- 通过方式二建立的线程,重用存在的线程,减少对象创建、消亡的开销,性能佳。
- 通过方式二建立的线程,可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
- 通过方式二建立的线程,提供定时执行、定期执行、单线程、并发数控制等功能。