gl实现 ThreadPool实现一个带优先级的线程池

1.execute和submit的区别
(1)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知execute()方法输入的任务是一个Runnable类的实例。
(2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完

2.线程池的优先级
JDK 中无界优先级队列PriorityBlockingQueue内部使用堆算法保证每次出队都是优先级最高的元素,元素入队时候是如何建堆的,元素出队后如何调整堆的平衡的?
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最好或者最低的元素,内部是平衡二叉树堆的实现。
本身是线程安全的 内部使用显示锁 保证线程安全
PriorityBlockingQueue存储的对象必须是实现Comparable接口的 因为PriorityBlockingQueue队列会根据内部存储的每一个元素的compareTo方法比较每个元素的大小
这样在take出来的时候会根据优先级 将优先级最小的最先取出

public static PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();

public static void main(String[] args) {
    queue.add(new User(1,"wu"));
    queue.add(new User(5,"wu5"));
    queue.add(new User(23,"wu23"));
    queue.add(new User(55,"wu55"));
    queue.add(new User(9,"wu9"));
    queue.add(new User(3,"wu3"));
    for (User user : queue) {
        try {
            System.out.println(queue.take().name);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//静态内部类
static class User implements Comparable<User>{

    public User(int age,String name) {
        this.age = age;
        this.name = name;
    }

    int age;
    String name;

    @Override
    public int compareTo(User o) {
        return this.age > o.age ? -1 : 1;
    }
}

3.PriorityBlockingQueue原理
内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLock 是个自旋锁,用CAS操作来保证只有一个线程可以扩容队列,
状态为0 或者1,其中0标示当前没有在进行扩容,1标示当前正在扩容。

    public void put(E e) {
         offer(e); // never need to block
    }

    public boolean offer(E e) {
        //判断是否为空
        if (e == null)
            throw new NullPointerException();
        //显示锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        //定义临时对象
        int n, cap;
        Object[] array;
        //判断数组是否满了
        while ((n = size) >= (cap = (array = queue).length))
            //数组扩容
            tryGrow(array, cap);
        try {
            //拿到比较器
            Comparator<? super E> cmp = comparator;
            //判断是否有自定义比较器
            if (cmp == null)
                //堆上浮
                siftUpComparable(n, e, array);
            else
                //使用自定义比较器进行堆上浮
                siftUpUsingComparator(n, e, array, cmp);
            //队列长度 +1
            size = n + 1;
            //唤醒休眠的出队线程
            notEmpty.signal();
        } finally {
            //释放锁
            lock.unlock();
        }
        return true;
    }

4.实现一个带优先级的线程池
ThreadPoolExecutor初始化的队列改为 new PriorityBlockingQueue<Runnable>()

  ThreadPoolExecutor threadPool= new ThreadPoolExecutor(2, 10,
                60L, TimeUnit.SECONDS,
                new PriorityBlockingQueue<Runnable>());

*实现带排序功能的Runnable,execute成功;不然没有实现Comparable报错

threadPool.execute(new Runnable(){...});
java.lang.ClassCastException: com.threadpool$PriorityRunnable cannot be cast to java.lang.Comparable
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
    at com.threadpool.main(threadpool.java:19)

*实现带排序功能的FutureTask,execute成功;不然没有实现Comparable报错

threadPool.execute(new FutureTask(){...});
java.lang.ClassCastException: com.threadpool$ComparableFutureTask cannot be cast to java.lang.Comparable
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)

so:实现继承Comparable的Runable和FutureTask
alpha版本:

package com;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
public class threadpool {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool= new ThreadPoolExecutor(2, 10,
                60L, TimeUnit.SECONDS,
                new PriorityBlockingQueue<Runnable>());
        //方案1:实现Comparable的Runnable
        for (int i = 0; i < 8; i++) {
            PriorityRunnable task = new PriorityRunnable(i);
            threadPool.execute(task);
        }
        //方案2:实现Comparable的FutureTask,可选Runnable和Callable
        for (int i = 0; i < 8; i++) {
            ComparableFutureTask task = new ComparableFutureTask(new PriorityRunnable(i),i);
            threadPool.execute(task);
        }
        for (int i = 0; i < 8; i++) {
            ComparableFutureTask task = new ComparableFutureTask(new MyCall(i),i);
            threadPool.execute(task);
        }
        
        threadPool.shutdown();
    }

    static class PriorityRunnable implements Runnable,Comparable<PriorityRunnable> {
        int priority;
        PriorityRunnable(int p){
            priority = p;
        }
        @Override
        public void run() {
            System.out.println("Thread " + Thread.currentThread().getName() +" -"+ priority);
            try{
                sleep(1000);
            }catch (InterruptedException e){
            }
        }
        @Override
        public int compareTo(PriorityRunnable task) {
            if (this.priority < task.priority) {
                return 1;
            } else if (this.priority > task.priority) {
                return -1;
            }
            return 0;
        }
    }
        static class ComparableFutureTask extends FutureTask implements Comparable<ComparableFutureTask> { 
       private Integer priority;
        public Integer getPriority() {
            return priority;
        }
        public ComparableFutureTask(Callable callable,Integer priority) {
            super(callable);
            this.priority = priority;
        } 
       public ComparableFutureTask(Runnable callable,Integer priority) {
            super(callable,null);
            this.priority = priority;
        } 
       @Override
        public int compareTo(ComparableFutureTask task) {
            if (this.getPriority() < task.getPriority()){
                return 1;
            }else if (this.getPriority() > task.getPriority()){
                return -1; 
           } 
           return 0;
        }
    }
}

问题:
1.把execute改成 submit失败 ,报错:

java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.threadpool.main(threadpool.java:24)

可见FutureTask的确没有实现Comparable接口,但是我提交的ComparableFutureTask是实现了Comparable接口的,究竟是因为什么原因导致其成为了FutureTask呢,结果在ThreadPoolExecutor的submit(Callable<T> task)找到原因,它是在AbstractExecutorService中实现的。

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

重点在newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

所以我提交的ComparableFutureTask 被转化为了FutureTask了,而FutureTask没有实现Comparable,所以才会报错
现在解决的方法有:

  • 用一个ComparableFutureTask继承FutureTask并实现Comparable接口,但也必须要override ThreadPoolExecutor的newTaskFor方法
    beta版本1
 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 30, 
            60L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()) {
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            int p = 0;
            if (callable instanceof MyCall) {
                p = ((MyCall) callable).priority;
            }
            if (callable instanceof ComparableFutureTask) {
                p = ((ComparableFutureTask) callable).priority;
            }
            System.out.println("newTaskFor " + Thread.currentThread().getName() + " -" + p);
            return new ComparableFutureTask(callable, p);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T t) {
            int p = 0;
            if (runnable instanceof PriorityRunnable) {
                p = ((PriorityRunnable) runnable).priority;
            }
            if (runnable instanceof ComparableFutureTask) {//千万加上
                p = ((ComparableFutureTask) runnable).priority; 
            }        
            System.out.println(runnable +"newTaskFor " + Thread.currentThread().getName() +" -"+ p);
            return new ComparableFutureTask(runnable,p);    
        }
    };

另外需要注意的是PriorityBlockingQueue的实现是一个最小堆.

  1. execute下两种方案不能同时处理,因为Comparable比较的不是同一个类型的对象
Exception in thread "main" java.lang.ClassCastException: com.threadpool$PriorityRunnable cannot be cast to com.threadpool$ComparableFutureTask
    at com.threadpool$ComparableFutureTask.compareTo(threadpool.java:109)
    at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:361)
    at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1361)
    at com.threadpool.main(threadpool.java:44)

改:PriorityRunnable 和ComparableFutureTask 实现一样类型的 Comparable<Runnable>,可比较优先级即解决问题
beta版本2

   public int compareTo(Runnable task) {
            int p = 0;
            if (task instanceof PriorityRunnable) {
                p = ((PriorityRunnable) task).priority;
            }
            if (task instanceof ComparableFutureTask) {
                p = ((ComparableFutureTask) task).priority;
            }
            if (this.priority < p) {
                return 1;
            } else if (this.priority > p) {
                return -1;
            }
            return 0;
        }

submit下两种方案可以同时处理,因为问题1处 处理后,submit都会通过newTaskFor将callable/runable封装成ComparableFutureTask

完整rc版本:

public class threadPoolTest {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 30, 60L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()) {
            @Override
            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                int p = 0;
                if (callable instanceof MyCall) {
                    p = ((MyCall) callable).priority;
                }
                if (callable instanceof ComparableFutureTask) {
                    p = ((ComparableFutureTask) callable).priority;
                }
                System.out.println("newTaskFor " + Thread.currentThread().getName() + " -" + p);
                return new ComparableFutureTask(callable, p);
            }

            @Override
            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T t) {
                int p = 0;
                if (runnable instanceof PriorityRunnable) {
                    p = ((PriorityRunnable) runnable).priority;
                }
                if (runnable instanceof ComparableFutureTask) {
                    p = ((ComparableFutureTask) runnable).priority;
                }
                System.out.println(runnable + "newTaskFor " + Thread.currentThread().getName() + " -" + p);
                return new ComparableFutureTask(runnable, p);
            }
        };
        for (int i = 10; i < 15; i++) {
            PriorityRunnable task = new PriorityRunnable(i);
            threadPool.submit(task);
        }
        for (int i = 0; i < 8; i++) {
            ComparableFutureTask task = new ComparableFutureTask(new PriorityRunnable(i), i);
            threadPool.submit(task);
        }
        for (int i = 0; i < 8; i++) {
            ComparableFutureTask task = new ComparableFutureTask(new MyCall(i), i);
            threadPool.execute(task);
        }
        threadPool.shutdown();
    }

    static class PriorityRunnable implements Runnable, Comparable<Runnable> {
        int priority;

        PriorityRunnable(int p) {
            priority = p;
        }

        @Override
        public void run() {
            System.out.println("Thread " + Thread.currentThread().getName() + " -" + priority);
            try {
                sleep(100);
            } catch (InterruptedException e) {
            }
        }

        @Override
        public int compareTo(Runnable task) {
            int p = 0;
            if (task instanceof PriorityRunnable) {
                p = ((PriorityRunnable) task).priority;
            }
            if (task instanceof ComparableFutureTask) {
                p = ((ComparableFutureTask) task).priority;
            }
            if (this.priority < p) {
                return 1;
            } else if (this.priority > p) {
                return -1;
            }
            return 0;
        }
    }

    static class MyCall implements Callable<Integer> {
        private Integer priority;

        public MyCall(Integer priority) {
            this.priority = priority;
        }

        @Override
        public Integer call() throws Exception {
            System.out.println("Thread " + Thread.currentThread().getName() + " priority " + priority);
            try {
                Thread.sleep(200);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return priority;
        }
    }

    static class ComparableFutureTask extends FutureTask implements Comparable<Runnable> {
        private Integer priority;

        public Integer getPriority() {
            return priority;
        }

        public ComparableFutureTask(Callable callable, Integer priority) {
            super(callable);
            this.priority = priority;
        }

        public ComparableFutureTask(Runnable callable, Integer priority) {
            super(callable, null);
            this.priority = priority;
        }

        @Override
        public int compareTo(Runnable task) {
            int p = 0;
            if (task instanceof PriorityRunnable) {
                p = ((PriorityRunnable) task).priority;
            }
            if (task instanceof ComparableFutureTask) {
                p = ((ComparableFutureTask) task).priority;
            }
            if (this.getPriority() < p) {
                return 1;
            } else if (this.getPriority() > p) {
                return -1;
            }
            return 0;
        }
    }
}

执行结果:

Thread pool-1-thread-1 -10 //核心线程数1 ,所以最开始执行。剩下的排队
Thread pool-1-thread-1 -14
Thread pool-1-thread-1 -13
Thread pool-1-thread-1 -12
Thread pool-1-thread-1 -11
Thread pool-1-thread-1 priority 7
Thread pool-1-thread-1 -7
Thread pool-1-thread-1 priority 6
Thread pool-1-thread-1 -6
Thread pool-1-thread-1 priority 5
Thread pool-1-thread-1 -5
Thread pool-1-thread-1 priority 4
Thread pool-1-thread-1 -4
Thread pool-1-thread-1 -3
Thread pool-1-thread-1 priority 3
Thread pool-1-thread-1 priority 2
Thread pool-1-thread-1 -2
Thread pool-1-thread-1 priority 1
Thread pool-1-thread-1 -1
Thread pool-1-thread-1 priority 0
Thread pool-1-thread-1 -0

感受下,这样就完成了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容