1.execute和submit的区别
(1)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute()方法输入的任务是一个Runnable类的实例。
(2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完
2.线程池的优先级
JDK 中无界优先级队列PriorityBlockingQueue内部使用堆算法保证每次出队都是优先级最高的元素,元素入队时候是如何建堆的,元素出队后如何调整堆的平衡的?
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最好或者最低的元素,内部是平衡二叉树堆的实现。
本身是线程安全的 内部使用显示锁 保证线程安全
PriorityBlockingQueue存储的对象必须是实现Comparable接口的 因为PriorityBlockingQueue队列会根据内部存储的每一个元素的compareTo方法比较每个元素的大小
这样在take出来的时候会根据优先级 将优先级最小的最先取出
public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();
public static void main(String[] args) {
queue.add(new User(1,"wu"));
queue.add(new User(5,"wu5"));
queue.add(new User(23,"wu23"));
queue.add(new User(55,"wu55"));
queue.add(new User(9,"wu9"));
queue.add(new User(3,"wu3"));
for (User user : queue) {
try {
System.out.println(queue.take().name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//静态内部类
static class User implements Comparable<User>{
public User(int age,String name) {
this.age = age;
this.name = name;
}
int age;
String name;
@Override
public int compareTo(User o) {
return this.age > o.age ? -1 : 1;
}
}
3.PriorityBlockingQueue原理
内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLock 是个自旋锁,用CAS操作来保证只有一个线程可以扩容队列,
状态为0 或者1,其中0标示当前没有在进行扩容,1标示当前正在扩容。
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
//判断是否为空
if (e == null)
throw new NullPointerException();
//显示锁
final ReentrantLock lock = this.lock;
lock.lock();
//定义临时对象
int n, cap;
Object[] array;
//判断数组是否满了
while ((n = size) >= (cap = (array = queue).length))
//数组扩容
tryGrow(array, cap);
try {
//拿到比较器
Comparator<? super E> cmp = comparator;
//判断是否有自定义比较器
if (cmp == null)
//堆上浮
siftUpComparable(n, e, array);
else
//使用自定义比较器进行堆上浮
siftUpUsingComparator(n, e, array, cmp);
//队列长度 +1
size = n + 1;
//唤醒休眠的出队线程
notEmpty.signal();
} finally {
//释放锁
lock.unlock();
}
return true;
}
4.实现一个带优先级的线程池
ThreadPoolExecutor初始化的队列改为 new PriorityBlockingQueue<Runnable>()
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(2, 10,
60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>());
*实现带排序功能的Runnable,execute成功;不然没有实现Comparable报错
threadPool.execute(new Runnable(){...});
java.lang.ClassCastException: com.threadpool$PriorityRunnable cannot be cast to java.lang.Comparable
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
at com.threadpool.main(threadpool.java:19)
*实现带排序功能的FutureTask,execute成功;不然没有实现Comparable报错
threadPool.execute(new FutureTask(){...});
java.lang.ClassCastException: com.threadpool$ComparableFutureTask cannot be cast to java.lang.Comparable
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
so:实现继承Comparable的Runable和FutureTask
alpha版本:
package com;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
public class threadpool {
public static void main(String[] args) {
ThreadPoolExecutor threadPool= new ThreadPoolExecutor(2, 10,
60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>());
//方案1:实现Comparable的Runnable
for (int i = 0; i < 8; i++) {
PriorityRunnable task = new PriorityRunnable(i);
threadPool.execute(task);
}
//方案2:实现Comparable的FutureTask,可选Runnable和Callable
for (int i = 0; i < 8; i++) {
ComparableFutureTask task = new ComparableFutureTask(new PriorityRunnable(i),i);
threadPool.execute(task);
}
for (int i = 0; i < 8; i++) {
ComparableFutureTask task = new ComparableFutureTask(new MyCall(i),i);
threadPool.execute(task);
}
threadPool.shutdown();
}
static class PriorityRunnable implements Runnable,Comparable<PriorityRunnable> {
int priority;
PriorityRunnable(int p){
priority = p;
}
@Override
public void run() {
System.out.println("Thread " + Thread.currentThread().getName() +" -"+ priority);
try{
sleep(1000);
}catch (InterruptedException e){
}
}
@Override
public int compareTo(PriorityRunnable task) {
if (this.priority < task.priority) {
return 1;
} else if (this.priority > task.priority) {
return -1;
}
return 0;
}
}
static class ComparableFutureTask extends FutureTask implements Comparable<ComparableFutureTask> {
private Integer priority;
public Integer getPriority() {
return priority;
}
public ComparableFutureTask(Callable callable,Integer priority) {
super(callable);
this.priority = priority;
}
public ComparableFutureTask(Runnable callable,Integer priority) {
super(callable,null);
this.priority = priority;
}
@Override
public int compareTo(ComparableFutureTask task) {
if (this.getPriority() < task.getPriority()){
return 1;
}else if (this.getPriority() > task.getPriority()){
return -1;
}
return 0;
}
}
}
问题:
1.把execute改成 submit失败 ,报错:
java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.threadpool.main(threadpool.java:24)
可见FutureTask的确没有实现Comparable接口,但是我提交的ComparableFutureTask是实现了Comparable接口的,究竟是因为什么原因导致其成为了FutureTask呢,结果在ThreadPoolExecutor的submit(Callable<T> task)找到原因,它是在AbstractExecutorService中实现的。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
重点在newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
所以我提交的ComparableFutureTask 被转化为了FutureTask了,而FutureTask没有实现Comparable,所以才会报错
现在解决的方法有:
- 用一个ComparableFutureTask继承FutureTask并实现Comparable接口,但也必须要override ThreadPoolExecutor的newTaskFor方法
beta版本1
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 30,
60L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
int p = 0;
if (callable instanceof MyCall) {
p = ((MyCall) callable).priority;
}
if (callable instanceof ComparableFutureTask) {
p = ((ComparableFutureTask) callable).priority;
}
System.out.println("newTaskFor " + Thread.currentThread().getName() + " -" + p);
return new ComparableFutureTask(callable, p);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T t) {
int p = 0;
if (runnable instanceof PriorityRunnable) {
p = ((PriorityRunnable) runnable).priority;
}
if (runnable instanceof ComparableFutureTask) {//千万加上
p = ((ComparableFutureTask) runnable).priority;
}
System.out.println(runnable +"newTaskFor " + Thread.currentThread().getName() +" -"+ p);
return new ComparableFutureTask(runnable,p);
}
};
另外需要注意的是PriorityBlockingQueue的实现是一个最小堆.
- execute下两种方案不能同时处理,因为Comparable比较的不是同一个类型的对象
Exception in thread "main" java.lang.ClassCastException: com.threadpool$PriorityRunnable cannot be cast to com.threadpool$ComparableFutureTask
at com.threadpool$ComparableFutureTask.compareTo(threadpool.java:109)
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:361)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
at com.threadpool.main(threadpool.java:44)
改:PriorityRunnable 和ComparableFutureTask 实现一样类型的 Comparable<Runnable>,可比较优先级即解决问题
beta版本2
public int compareTo(Runnable task) {
int p = 0;
if (task instanceof PriorityRunnable) {
p = ((PriorityRunnable) task).priority;
}
if (task instanceof ComparableFutureTask) {
p = ((ComparableFutureTask) task).priority;
}
if (this.priority < p) {
return 1;
} else if (this.priority > p) {
return -1;
}
return 0;
}
submit下两种方案可以同时处理,因为问题1处 处理后,submit都会通过newTaskFor将callable/runable封装成ComparableFutureTask
完整rc版本:
public class threadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 30, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()) {
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
int p = 0;
if (callable instanceof MyCall) {
p = ((MyCall) callable).priority;
}
if (callable instanceof ComparableFutureTask) {
p = ((ComparableFutureTask) callable).priority;
}
System.out.println("newTaskFor " + Thread.currentThread().getName() + " -" + p);
return new ComparableFutureTask(callable, p);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T t) {
int p = 0;
if (runnable instanceof PriorityRunnable) {
p = ((PriorityRunnable) runnable).priority;
}
if (runnable instanceof ComparableFutureTask) {
p = ((ComparableFutureTask) runnable).priority;
}
System.out.println(runnable + "newTaskFor " + Thread.currentThread().getName() + " -" + p);
return new ComparableFutureTask(runnable, p);
}
};
for (int i = 10; i < 15; i++) {
PriorityRunnable task = new PriorityRunnable(i);
threadPool.submit(task);
}
for (int i = 0; i < 8; i++) {
ComparableFutureTask task = new ComparableFutureTask(new PriorityRunnable(i), i);
threadPool.submit(task);
}
for (int i = 0; i < 8; i++) {
ComparableFutureTask task = new ComparableFutureTask(new MyCall(i), i);
threadPool.execute(task);
}
threadPool.shutdown();
}
static class PriorityRunnable implements Runnable, Comparable<Runnable> {
int priority;
PriorityRunnable(int p) {
priority = p;
}
@Override
public void run() {
System.out.println("Thread " + Thread.currentThread().getName() + " -" + priority);
try {
sleep(100);
} catch (InterruptedException e) {
}
}
@Override
public int compareTo(Runnable task) {
int p = 0;
if (task instanceof PriorityRunnable) {
p = ((PriorityRunnable) task).priority;
}
if (task instanceof ComparableFutureTask) {
p = ((ComparableFutureTask) task).priority;
}
if (this.priority < p) {
return 1;
} else if (this.priority > p) {
return -1;
}
return 0;
}
}
static class MyCall implements Callable<Integer> {
private Integer priority;
public MyCall(Integer priority) {
this.priority = priority;
}
@Override
public Integer call() throws Exception {
System.out.println("Thread " + Thread.currentThread().getName() + " priority " + priority);
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return priority;
}
}
static class ComparableFutureTask extends FutureTask implements Comparable<Runnable> {
private Integer priority;
public Integer getPriority() {
return priority;
}
public ComparableFutureTask(Callable callable, Integer priority) {
super(callable);
this.priority = priority;
}
public ComparableFutureTask(Runnable callable, Integer priority) {
super(callable, null);
this.priority = priority;
}
@Override
public int compareTo(Runnable task) {
int p = 0;
if (task instanceof PriorityRunnable) {
p = ((PriorityRunnable) task).priority;
}
if (task instanceof ComparableFutureTask) {
p = ((ComparableFutureTask) task).priority;
}
if (this.getPriority() < p) {
return 1;
} else if (this.getPriority() > p) {
return -1;
}
return 0;
}
}
}
执行结果:
Thread pool-1-thread-1 -10 //核心线程数1 ,所以最开始执行。剩下的排队
Thread pool-1-thread-1 -14
Thread pool-1-thread-1 -13
Thread pool-1-thread-1 -12
Thread pool-1-thread-1 -11
Thread pool-1-thread-1 priority 7
Thread pool-1-thread-1 -7
Thread pool-1-thread-1 priority 6
Thread pool-1-thread-1 -6
Thread pool-1-thread-1 priority 5
Thread pool-1-thread-1 -5
Thread pool-1-thread-1 priority 4
Thread pool-1-thread-1 -4
Thread pool-1-thread-1 -3
Thread pool-1-thread-1 priority 3
Thread pool-1-thread-1 priority 2
Thread pool-1-thread-1 -2
Thread pool-1-thread-1 priority 1
Thread pool-1-thread-1 -1
Thread pool-1-thread-1 priority 0
Thread pool-1-thread-1 -0
感受下,这样就完成了。