1.继承Thread类创建线程
- 1】定义Thread类的子类,并重写该类的run()方法,该方法的方法体就是线程需要完成的任务,run()方法也称为线程执行体。
- 2】创建Thread子类的实例,也就是创建了线程对象
- 3】启动线程,即调用线程的start()方法
public class TestThread {
public static void main(String[] args) {
new MyThread().start();
}
}
class MyThread extends Thread {
int total;
public void run() {
synchronized (this) {
System.out.println("MyThread is running..");
for (int i = 0; i < 100; i++) {
total += i;
System.out.println("total is " + total);
}
notify();// 因为synchronized (this),为了保证其他线程有wait可以被重新唤醒
}
}
}
2.实现Runnable接口创建线程
- 1】定义Runnable接口的实现类,一样要重写run()方法,这个run()方法和Thread中的run()方法一样是线程的执行体
- 2】创建Runnable实现类的实例,并用这个实例作为Thread的target来创建Thread对象,这个Thread对象才是真正的线程对象
- 3】第三部依然是通过调用线程对象的start()方法来启动线程
public class TestThread2 implements Runnable {//实现Runnable接口
public static void main(String[] args) {
// 创建TestThread2后,将对象放入线程找那个,然后执行线程
new Thread(new TestThread2()).start();
}
@Override
public void run() {
System.out.println("------------线程被创建");
}
}
3.使用Callable和Future创建线程
- 1】创建Callable接口的实现类,并实现call()方法,然后创建该实现类的实例(从java8开始可以直接使用Lambda表达式创建Callable对象)。
- 2】使用FutureTask类来包装Callable对象,该FutureTask对象封装了Callable对象的call()方法的返回值
- 3】使用FutureTask对象作为Thread对象的target创建并启动线程(因为FutureTask实现了Runnable接口)
- 4】调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
public class CallableAndFuture {
//Callable :一个产生结果
//Future :一个拿到结果
public static void main(String[] args) {
//1.创建Callable的对象返回值
Callable<Integer> callable = new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
//2.FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值
FutureTask<Integer> future = new FutureTask<Integer>(callable);
//3.作为Runnable被线程执行
new Thread(future).start();
try {
Thread.sleep(5000);// 可能做一些事情
//4.作为Future得到Callable的返回值
System.out.println(future.get());// 拿到结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
4.由Callable和Future创建线程引出的扩展
思路:【Callable和Future】可以创建异步线程接收返回值,怎么使用会更简单?
- 4.1 如果有一个线程的管理类,会不会变得简单一点?
ExecutorService继承自Executor,它的目的是为我们管理Thread对象,从而简化并发编程,Executor使我们无需显示的去管理线程的生命周期,是JDK 5之后启动任务的首选方式
public class CallableAndFuture2 {
public static void main(String[] args) throws Exception {
// 1.创建Thread线程管理对象
ExecutorService threadPool = Executors.newSingleThreadExecutor();
// 2.threadPool调用Callable,直接返回Future对象
Future<Integer> future = threadPool.submit(new Callable<Integer>() {
public Integer call() throws Exception {
// 可能是线程的异步调用方法,最后回来的返回值
return new Random().nextInt(100);
}
});
Thread.sleep(1000);// 可能做一些事情
// 3.Future取得线程的异步返回值
System.out.println(future.get());
}
}
- 4.2 如果执行多个带返回值的任务,并取得多个返回值,应该怎么处理?
场景:多线程下载文件,不同的文件用不同的线程去下载,可以在控制台看到返回的下载进度
public class CallableAndFuture3 {
// 模拟一个请求,需要同时开10个线程,然后分别打印线程的返回值
public static void main(String[] args) throws Exception, ExecutionException {
// 1.创建线程池
ExecutorService threadPool = Executors.newCachedThreadPool();
// 2.CompletionService的一个实现是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果
CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool);
// 3.同时开10个线程
for(int i = 0; i < 10; i++) {
final int taskID = i;
// 4.Executor完成任务调度
cs.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return taskID;
}
});
}
// 5.BlockingQueue负责保存异步任务的执行结果
for(int i = 0; i < 10; i++) {
System.out.println(cs.take().get());
}
}
}
- 4.3 【对比4.2案例】如果创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据,会有什么不一样吗?
1.CompletionService.take 会获取并清除已经完成Task的结果,如果当前没有已经完成Task时,会阻塞(根据执行返回的结果)。
2.【先创建一个装Future类型的集合,用Executor提交的任务返回值添加到集合中,最后遍历集合取出数据】 (根据执行的顺序)。
-- 这种方法通常是按照Future加入的顺序。 两个方法最大的差别在于遍历 Future 的顺序,
3.相对来说, CompletionService 的性能更高。
4.考虑如下场景:多线程下载,结果用Future返回。第一个文件特别大,后面的文件很小。
-- 用方法1,能很快知道已经下载完文件的结果(第一个文件可能还没有下载完);
-- 用方法2,必须等第一个文件下载结束后,才会获得其他文件的下载结果。
- 4.4 了解CompletionService的简单使用,那么CompletionService到底是什么鬼?
1.根据上面的例子,好像CompletionService可以调度线程
2.根据上面的例子,好像CompletionService保存返回结果
3.CompletionService与ExecutorService有什么不一样呢?
5.CompletionService与ExecutorService
- 1】见名知意:CompletionService是什么?有哪些方法?
package java.util.concurrent;
public interface CompletionService<V> {
Future<V> submit(Callable<V> task) ; //执行Callable线程
Future<V> submit(Runnable task, V result); //执行Runnable线程
Future<V> take() throws InterruptedException; //获取线程的返回值
Future<V> poll(); //类似链表的poll(暂且不管什么作用,可能和链表有关系)
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
- 2】见名知意:ExecutorService 是什么?有哪些方法?
public interface Executor {
// 1.实现run方法的调度
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
// 1.基础判断和返回值方法
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;
// 2.执行线程的调度
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 3.任务的批量提交
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;
// 4.任务的单个提交
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
- 3】问题:都执行线程池的任务,为什么CompletionService不直接继承Executor接口?
- CompletionService与ExecutorService类似都可以用来执行线程池的任务
- ExecutorService是继承了Executor的接口
- CompletionService则只是一个接口
- 主要是Executor的特性决定的,Executor框架不能完全保证任务执行的异步性,那就是如果需要实现任务(task)的异步性,只要为每个task创建一个线程就实现了任务的异步性。代码往往包含new Thread(task).start()。
- 这种方式的问题在于,它没有限制可创建线程的数量(在ExecutorService可以限制),不过,这样最大的问题是在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。
- 另外,Executor框架还支持同步任务的执行,就是在execute方法中调用提交任务的run()方法就属于同步调用
- 4】CompletionService实现原理?
- 一般情况下,如果需要判断任务是否完成,思路是得到Future列表的每个Future,然后反复调用其get方法,并将timeout参数设为0,从而通过轮询的方式判断任务是否完成。为了更精确实现任务的异步执行以及更简便的完成任务的异步执行,可以使用CompletionService。
- CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。
- 在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。QueueingFuture的源码如下:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
- 从代码可以看到,CompletionService将提交的任务转化为QueueingFuture,并且覆盖了done方法,在done方法中就是将任务加入任务队列中。这点与之前对Executor框架的分析是一致的。
6.CompletionService与ExecutorService分别在电商项目中的调度实例对比
- 6.1】ExecutorService任务调度实例
public class TestExecutorService {
// 1.构造方法
private final ExecutorService executorService;
public TestExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
// 2.渲染商品图片的方法
public void renderProductDetail() throws Exception {
// 1.加载数据源(类比查询数据库有几条信息)
final List<ProductInfo> productInfos = new ProductUtils().loadProductInfos();
// 2.创建线程,但是只能阻塞等待所有的图像下载完成
Callable<List<ProductImage>> task = new Callable<List<ProductImage>>() {
@Override
public List<ProductImage> call() throws Exception {
List<ProductImage> imageList = new ArrayList<>();
for (ProductInfo info : productInfos){
imageList.add(info.getImage());
}
return imageList;
}
};
// 3.提交给线程池执行
Future<List<ProductImage>> listFuture = executorService.submit(task);
// 4.展示商品简介的信息
new ProductUtils().renderProductText(productInfos);
try {
// 5.显示商品的图片
List<ProductImage> imageList = listFuture.get();
new ProductUtils().renderProductImageList(imageList);
} catch (InterruptedException e) {
// 6.如果显示图片发生中断异常则重新设置线程的中断状态
// 7.这样做可以让wait中的线程唤醒
Thread.currentThread().interrupt();
// 8.同时取消任务的执行,参数false表示在线程在执行不中断
listFuture.cancel(true);
} catch (ExecutionException e) {
try {
throw new Throwable(e.getCause());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
TestExecutorService cd = new TestExecutorService(Executors.newCachedThreadPool());
cd.renderProductDetail();
System.exit(0);
}
}
- 6.2】CompletionService任务调度实例
public class TestCompletionService {
// 1.构造方法
private final ExecutorService executorService;
public TestCompletionService(ExecutorService executorService) {
this.executorService = executorService;
}
// 2.渲染商品图片的方法
public void renderProductDetail() throws Exception {
// 1.加载数据源(类比查询数据库有几条信息)
final List<ProductInfo> productInfos = new ProductUtils().loadProductInfos();
// 2.创建真实CompletionService调度线程,存放返回结果
CompletionService<ProductImage> completionService = new ExecutorCompletionService<ProductImage>(executorService);
// 3.为每个图像的下载建立一个工作任务(类比数据库下载图片)
for (final ProductInfo info : productInfos) {
completionService.submit(new Callable<ProductImage>() {
@Override
public ProductImage call() throws Exception {
return info.getImage();
}
});
}
// 4.展示商品简介的信息
new ProductUtils().renderProductText(productInfos);
try {
// 5.显示商品图片
for (int i = 0, n = productInfos.size(); i < n; i++){
Future<ProductImage> imageFuture = completionService.take();
ProductImage image = imageFuture.get();
new ProductUtils().renderProductImage(image);
}
} catch (InterruptedException e) {
// 6.如果显示图片发生中断异常则重新设置线程的中断状态
// 7.这样做可以让wait中的线程唤醒
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
try {
throw new Throwable(e.getCause());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
// 1.使用线程池创建线程
TestCompletionService cd = new TestCompletionService(Executors.newCachedThreadPool());
// 2.渲染商品图片和名称
cd.renderProductDetail();
}
}
- 6.3】基础类准备
// 商品图片类
public class ProductImage {
}
// 商品信息类
public class ProductInfo {
private ProductImage image;
public ProductImage getImage() {
return image;
}
public void setImage(ProductImage image) {
this.image = image;
}
}
// 商品方法类
public class ProductUtils {
// 日期格式器
private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
// 真实渲染商品图片的方法
public void renderProductImage(ProductImage image) throws Exception {
Thread.sleep(100);
System.out.println(Thread.currentThread().getName() + " display products images! " + format.format(new Date()));
}
// 真实渲染商品图片列表的方法
public void renderProductImageList(List<ProductImage> imageList) throws Exception {
for (ProductImage image : imageList){
try {
System.out.println("-------商品图片渲染" + image);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " display products images! " + format.format(new Date()));
}
// 真实渲染商品文字的方法
public void renderProductText(List<ProductInfo> productInfos) throws Exception {
for (ProductInfo info : productInfos) {
System.out.println("-------商品文字渲染" + info.getImage());
Thread.sleep(50);
}
System.out.println(
Thread.currentThread().getName() + " display products description! " + format.format(new Date()));
}
// 真实加载图片信息的方法
public List<ProductInfo> loadProductInfos() throws Exception {
List<ProductInfo> list = new ArrayList<>();
TimeUnit.SECONDS.sleep(3);// 模拟线程耗时
for (int i = 0; i < 10; i++) {
ProductInfo info = new ProductInfo();
info.setImage(new ProductImage());
list.add(info);
}
System.out.println(Thread.currentThread().getName() + " load products info! " + format.format(new Date()));
return list;
}
}
- 6.4】思考:将【6.2案例中的步骤3和步骤4对调有什么影响吗?】
1.对程序结果没有影响,但是性能有影响
2.因为步骤4没有使用异步线程执行(数据量小,没有必要强制优化),如果放在3前面,就得等到3的同步结果执行完毕之后再执行
3.如果先执行异步,不会堵塞程序运行,节省了等待同步结果的执行时间
- 6.5】思考:如何规避在异步中出现的异常,导致线程终端
- 1.如果显示图片发生中断异常则重新设置线程的中断状态
- 2.这样做可以让wait中的线程唤醒
- 3.Thread.currentThread().interrupt();
try {
// 5.显示商品图片
for (int i = 0, n = productInfos.size(); i < n; i++){
Future<ProductImage> imageFuture = completionService.take();
ProductImage image = imageFuture.get();
new ProductUtils().renderProductImage(image);
}
} catch (InterruptedException e) {
// 6.如果显示图片发生中断异常则重新设置线程的中断状态
// 7.这样做可以让wait中的线程唤醒
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
try {
throw new Throwable(e.getCause());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
6.6】思考:对比CompletionService和ExecutorService
- ExecutorService使用(一个task,里面串行多个任务):Future<List<ProductImage>> listFuture = executorService.submit(task);
- 如果List<ProductImage> imageList = listFuture.get();(Callable返回的就是一个List<ProductImage>)出现异常,因为是串行执行,先中断线程,再同时取消任务执行listFuture.cancel(true);
- CompletionService使用(多个task,里面单独执行一个任务)
for (final ProductInfo info : productInfos) {
completionService.submit(new Callable<ProductImage>() {
@Override
public ProductImage call() throws Exception {
return info.getImage();
}
}); - 如果completionService.take().get()出现异常,中断线程即可
8.CompletionService小结
- 相比ExecutorService,CompletionService可以更精确和简便地完成异步任务的执行
- CompletionService的一个实现是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合体,Executor完成计算任务,BlockingQueue负责保存异步任务的执行结果
- 在执行大量相互独立和同构的任务时,可以使用CompletionService
- CompletionService可以为任务的执行设置时限,主要是通过BlockingQueue的poll(long time,TimeUnit unit)为任务执行结果的取得限制时间,如果没有完成就取消任务
7.参考网址:
8.课后作业:
- 1.创建线程的三种方式
- 2.CompletionService与ExecutorService对比
- 3.说说高并发情况下线程调度的心得