Android 线程池+多线程+断点下载

//监听下载
public interface OnDownLoadListener {

    void onPending(DownLoadInfo downLoadInfo);

    void OnLOADING(DownLoadInfo downLoadInfo);

    void onProgree(DownLoadInfo downLoadInfo, int start, int size);

    void onStop(DownLoadInfo downLoadInfo, int start, int size);

    void onComplet(DownLoadInfo downLoadInfo);

    void onFailed(DownLoadInfo downLoadInfo);


}

model


image.png
image.png

两个表 使用greendao进行关联

//任务管理
public class TaskDispatcher {
    //Cpu核心个数
    private static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
    private static final int CPRE_POOL_SIZE = CPU_NUM + 1;
    //最大下载任务数量
    private static final int DOWNLOAD_MAX = CPU_NUM * 2 + 1;
    //下载任务线程池
    private ExecutorService executorService;
    //正在下载的任务队列
    private Map<Long, List<DownloadTask>> queueTaskMap = Collections.synchronizedMap(new HashMap<Long, List<DownloadTask>>());
    //已经完成下载任务队列
    private List<Long> waitloaded = Collections.synchronizedList(new ArrayList<Long>());
    //已经完成下载任务队列
    private List<Long> downloaded = Collections.synchronizedList(new ArrayList<Long>());
    //单例对象
    private static TaskDispatcher instance;
    //数据库操作对象
    private DaoUtils daoUtils;
    //下载监听
    private OnDownLoadListener onDownLoadListener;


    private TaskDispatcher() {
        daoUtils = new DaoUtils(MyApplication.getContext());
        EventBus.getDefault().register(this);
    }

    /**
     * 线程安全单例模式
     */
    public static TaskDispatcher getInstance() {
        if (instance == null) {
            synchronized (TaskDispatcher.class) {
                if (instance == null) {
                    instance = new TaskDispatcher();
                }
            }
        }
        return instance;
    }

    /**
     * 初始化线程池
     */
    private ExecutorService getExecutorService() {
        if (executorService == null) {
            executorService = new ThreadPoolExecutor(CPRE_POOL_SIZE, DOWNLOAD_MAX,
                    60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                    Executors.defaultThreadFactory());
        }

        return executorService;
    }


    public synchronized void addListen(OnDownLoadListener onDownLoadListener) {
        this.onDownLoadListener = onDownLoadListener;
    }


    @Subscribe(threadMode = ThreadMode.BACKGROUND)
    public void getDownLoadInfo(DownLoadProgree downLoadProgree) {
        Intent intent = new Intent(MyApplication.getContext(), DownloadCenterActivity.class);
        DownLoadInfo downLoadInfo = downLoadProgree.getDownLoadInfo();
        switch (downLoadProgree.getState()) {
            case DownloadTask.PENDING:
                //任务待进行
                break;
            case DownloadTask.LOADING:
                //任务准备中
                downLoadInfo.setState(DownloadTask.LOADING);
                daoUtils.updateDownload(downLoadInfo);
                NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon, "下载通知", downLoadInfo.getFilename() + "等待下载");
                if (onDownLoadListener != null) {
                    onDownLoadListener.OnLOADING(downLoadInfo);
                }
                break;
            case DownloadTask.PROGREE:
                //任务下载中
                int downloadSize = downLoadInfo.getDownloadSize() + downLoadProgree.getDownloadSize();
                downLoadInfo.setDownloadSize(downloadSize);
                downLoadInfo.setState(DownloadTask.PROGREE);
                daoUtils.updateDownload(downLoadInfo);
                NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
                        "下载通知", downLoadInfo.getFilename() + "正在下载,当前进度:" + downloadSize * 100 / downLoadInfo.getTotalSize() + "%");
                if (onDownLoadListener != null) {
                    onDownLoadListener.onProgree(downLoadInfo, downLoadInfo.getDownloadSize(), downLoadInfo.getTotalSize());
                }
                break;
            case DownloadTask.STOP:
                //任务停止
                downLoadInfo.setState(DownloadTask.STOP);
                daoUtils.updateDownload(downLoadInfo);
                NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
                        "下载通知", downLoadInfo.getFilename() + "停止下载,当前进度:" + downLoadInfo.getDownloadSize() * 100 / downLoadInfo.getTotalSize() + "%");
                if (onDownLoadListener != null) {
                    onDownLoadListener.onStop(downLoadInfo, downLoadInfo.getDownloadSize(), downLoadInfo.getTotalSize());
                }
                break;
            case DownloadTask.FAILED:
                //任务失败
                downLoadInfo.setState(DownloadTask.FAILED);
                daoUtils.updateDownload(downLoadInfo);
                NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
                        "下载通知", downLoadInfo.getFilename() + "下载失败");
                if (onDownLoadListener != null) {
                    onDownLoadListener.onFailed(downLoadInfo);
                }
                queueTaskMap.get(downLoadInfo.getId()).get(downLoadProgree.getTaskId().intValue()).setStop(true);
                break;
            case DownloadTask.FINISHED:
                //任务完成
                if (downLoadInfo.getDownloadSize() == downLoadInfo.getTotalSize()) {
                    finished(downLoadInfo.getId());
                    downLoadInfo.setState(DownloadTask.FINISHED);
                    daoUtils.updateDownload(downLoadInfo);
                    NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
                            "下载通知", downLoadInfo.getFilename() + "下载完成");
                    if (onDownLoadListener != null) {
                        onDownLoadListener.onComplet(downLoadInfo);
                    }
                }
                break;
            default:
                break;
        }
    }


    /**
     * 任务入列下载
     */
    public synchronized boolean enqueue(final DownLoadInfo downLoadInfo) {
        final long id;
        final List<DownloadTask> downloadTasks = new ArrayList<>();
        final DownLoadInfo downLoadInfo1 = daoUtils.queryDownloadInfoBuilder(downLoadInfo.getUrl());
        if(downLoadInfo1!=null){
            id=downLoadInfo1.getId();
            //判断下载任务是否存在且状态为暂停
            if (downLoadInfo.getState() != 0 && downLoadInfo.getState() == DownloadTask.STOP) {
                for (DownLoadProgree downLoadProgree : downLoadInfo.getDownLoadProgree()) {
                    DownloadTask downloadTask = new DownloadTask(downLoadProgree.getId());
                    if (queueTaskMap.entrySet().size() < DOWNLOAD_MAX) {
                        getExecutorService().execute(downloadTask);
                        downloadTasks.add(downloadTask);
                    } else {
                        waitloaded.add(id);
                        break;
                    }
                }
            }
        }else{
             id =daoUtils.insertDownload(downLoadInfo);
            if (queueTaskMap.keySet().contains(id)) {
                return false;
            }
            Api.getInstance().iRetrofit.download(downLoadInfo.getUrl(), "bytes=" + 0 + "-")
                    .compose(ApiSubscribe.<ResponseBody>io_io())
                    .subscribe(new Observer<ResponseBody>() {
                        @Override
                        public void onSubscribe(Disposable d) {

                        }

                        @Override
                        public void onNext(ResponseBody responseBody) {
                            int totalSize = (int) responseBody.contentLength();
                            int rateSzie = totalSize / 3;
                            downLoadInfo.setTotalSize(totalSize);
                            downLoadInfo.setState(DownloadTask.PENDING);
                            daoUtils.updateDownload(downLoadInfo);
                            for (int i = 0; i < 3; i++) {
                                DownloadTask downloadTask;
                                DownLoadProgree downLoadProgree = new DownLoadProgree();
                                downLoadProgree.setTaskId(downLoadInfo.getId());
                                downLoadProgree.setDownLoadInfo(downLoadInfo);
                                if (i == 2) {
                                    downLoadProgree.setStart(i * rateSzie);
                                    downLoadProgree.setEnd(totalSize);
                                    downloadTask = new DownloadTask(daoUtils.insertDownloadTask(downLoadProgree));
                                } else {
                                    downLoadProgree.setStart(i * rateSzie);
                                    downLoadProgree.setEnd((i + 1) * rateSzie - 1);
                                    downloadTask = new DownloadTask(daoUtils.insertDownloadTask(downLoadProgree));
                                }
                                if (queueTaskMap.entrySet().size() < DOWNLOAD_MAX) {
                                    getExecutorService().execute(downloadTask);
                                    downloadTasks.add(downloadTask);
                                } else {
                                    waitloaded.add(id);
                                    break;
                                }
                            }
                            queueTaskMap.put(downLoadInfo.getId(), downloadTasks);
                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onComplete() {

                        }
                    });

        }
        return true;
    }

    /**
     * 多个任务入列下载
     */
    public synchronized boolean enqueueAll(List<Long> idList) {
        for (Long id : idList) {
            enqueue(daoUtils.queryDownlodInfo(id));
        }
        return true;
    }


    /**
     * 任务停止
     */
    public synchronized void stop(long id) {
        List<DownloadTask> downloadTasks = queueTaskMap.get(id);
        for (DownloadTask downloadTask : downloadTasks) {
            downloadTask.setStop(true);
        }
        promoteSyncTask();
    }

    /**
     * 任务下载完成
     */
    synchronized void finished(long id) {
        queueTaskMap.remove(id);
        downloaded.add(id);
        promoteSyncTask();
    }


    /**
     * 删除下载任务,是否删除文件
     */
    public synchronized void deleteTask(DownLoadInfo downLoadInfo, boolean isDeleteFile) {
        if (downLoadInfo != null) {
            stop(downLoadInfo.getId());
            if (downloaded.contains(downLoadInfo.getId())) {
                downloaded.remove(downLoadInfo.getId());
                if (isDeleteFile) {
                    File file = new File(downLoadInfo.getFilepath(), downLoadInfo.getFilename());
                    file.delete();
                }
            }
            if (queueTaskMap.keySet().contains(downLoadInfo.getId())) {
                queueTaskMap.remove(downLoadInfo.getId());
                if (isDeleteFile) {
                    File file = new File(downLoadInfo.getFilepath(), downLoadInfo.getFilename());
                    file.delete();
                }
                promoteSyncTask();
            }
        }
    }

    /**
     * 调度pending状态的任务,开始下载
     */
    private synchronized void promoteSyncTask() {
        if (waitloaded.isEmpty()) {
            enqueueAll(waitloaded);
        }
    }


}

//任务下载线程
public class DownloadTask extends Thread {
    //任务信息
    private DownLoadProgree downLoadProgree;
    //是否停止
    private boolean Stop=false;
    //当前任务下载进度
    private int off=0;
    //数据库操作类
    private DaoUtils daoUtils;

    public static final int PENDING=1;
    public static final int LOADING = 2;
    public static final int PROGREE = 3;
    public static final int STOP = 4;
    public static final int FAILED = 5;
    public static final int FINISHED = 6;

    @Override
    public void run() {
        start();
    }


    public DownloadTask(Long taskId) {
        daoUtils=new DaoUtils(MyApplication.getContext());
        downLoadProgree=daoUtils.queryDownlodTask(taskId);
    }

    public void start() {
        downLoadProgree.setState(LOADING);
        daoUtils.updateDownloadTask(downLoadProgree);
        download();
    }

    private void download() {
        Api.getInstance().iRetrofit.download(downLoadProgree.getDownLoadInfo().getUrl(),"bytes=" + downLoadProgree.getStart() + "-" + downLoadProgree.getEnd())
                .compose(ApiSubscribe.<ResponseBody>io_io())
                .subscribe(new Observer<ResponseBody>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(ResponseBody responseBody) {
                        try {
                            File file = new File(downLoadProgree.getDownLoadInfo().getFilepath(), downLoadProgree.getDownLoadInfo().getFilename());
                            InputStream is = null;
                            FileOutputStream fileOutputStream = null;
                            try {
                                is = responseBody.byteStream();
                                fileOutputStream = new FileOutputStream(file);
                                byte[] buffer = new byte[8192];//缓冲数组2kB
                                int len;
                                while ((len = is.read(buffer)) != -1) {
                                    if (!Stop) {
                                        fileOutputStream.write(buffer, 0, len);
                                        off+= len;
                                        //查询数据库
                                        downLoadProgree.setState(PROGREE);
                                        downLoadProgree.setDownloadSize(len);
                                        daoUtils.updateDownloadTask(downLoadProgree);
                                        EventBus.getDefault().post(downLoadProgree);
                                    } else {
                                        is.close();
                                        fileOutputStream.close();
                                        //更新数据库
                                        downLoadProgree.setState(STOP);
                                        downLoadProgree.setDownloadSize(0);
                                        downLoadProgree.setStart(off);
                                        daoUtils.updateDownloadTask(downLoadProgree);
                                        EventBus.getDefault().post(downLoadProgree);
                                        return;
                                    }
                                }
                                downLoadProgree.setState(FINISHED);
                                downLoadProgree.setDownloadSize(off);
                                daoUtils.updateDownloadTask(downLoadProgree);
                                EventBus.getDefault().post(downLoadProgree);
                            } finally {
                                //关闭IO流
                                is.close();

                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            LogUtils.e("下载出错" + e);
                            downLoadProgree.setState(FAILED);
                            downLoadProgree.setDownloadSize(0);
                            daoUtils.updateDownloadTask(downLoadProgree);
                            EventBus.getDefault().post(downLoadProgree);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        downLoadProgree.setState(FAILED);
                        downLoadProgree.setDownloadSize(0);
                        daoUtils.updateDownloadTask(downLoadProgree);
                        EventBus.getDefault().post(downLoadProgree);
                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    private void download1() {
        Api.getInstance().iRetrofit.download(downLoadProgree.getDownLoadInfo().getUrl(),"bytes=" + downLoadProgree.getStart() + "-" + downLoadProgree.getEnd())
                .compose(ApiSubscribe.<ResponseBody>io_io())
                .subscribe(new Observer<ResponseBody>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(ResponseBody responseBody) {
                        try {
                            File file = new File(downLoadProgree.getDownLoadInfo().getFilepath(), downLoadProgree.getDownLoadInfo().getFilename());
                            BufferedSource bufferedSource = null;
                            BufferedSink bufferedSink = null;
                            try {
                                bufferedSource = responseBody.source();
                                bufferedSink = Okio.buffer(Okio.sink(file));
                                byte[] buffer = new byte[8192];//缓冲数组2kB
                                int len;
                                while ((len = bufferedSource.read(buffer)) != -1) {
                                    if (!Stop) {
                                        bufferedSink.write(buffer);
                                        off+= len;
                                        //查询数据库
                                        downLoadProgree.setState(PROGREE);
                                        downLoadProgree.setDownloadSize(len);
                                        daoUtils.updateDownloadTask(downLoadProgree);
                                        EventBus.getDefault().post(downLoadProgree);
                                    } else {
                                        closeQuietly(bufferedSource);
                                        closeQuietly(bufferedSink);
                                        //更新数据库
                                        downLoadProgree.setState(STOP);
                                        downLoadProgree.setDownloadSize(0);
                                        downLoadProgree.setStart(off);
                                        daoUtils.updateDownloadTask(downLoadProgree);
                                        EventBus.getDefault().post(downLoadProgree);
                                        return;
                                    }
                                }
                                downLoadProgree.setState(FINISHED);
                                downLoadProgree.setDownloadSize(off);
                                daoUtils.updateDownloadTask(downLoadProgree);
                                EventBus.getDefault().post(downLoadProgree);
                            } finally {
                                //关闭IO流
                                closeQuietly(bufferedSource);
                                closeQuietly(bufferedSink);
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            LogUtils.e("下载出错" + e);
                            downLoadProgree.setState(FAILED);
                            downLoadProgree.setDownloadSize(0);
                            daoUtils.updateDownloadTask(downLoadProgree);
                            EventBus.getDefault().post(downLoadProgree);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        downLoadProgree.setState(FAILED);
                        downLoadProgree.setDownloadSize(0);
                        daoUtils.updateDownloadTask(downLoadProgree);
                        EventBus.getDefault().post(downLoadProgree);
                    }

                    @Override
                    public void onComplete() {

                    }
                });

    }

    public void setStop(boolean stop) {
        Stop = stop;
    }


}

使用

 DownLoadInfo downLoadInfo=new DownLoadInfo();
                                            downLoadInfo.setUrl(data.getData().get(0).getUrl());
                                            downLoadInfo.setFilename(data.getData().get(0).getAuthor()+"-"+data.getData().get(0).getTitle()+".mp3");
                                            downLoadInfo.setFilepath(Environment.getExternalStorageDirectory().getPath() + File.separator + "mv");
                                            TaskDispatcher.getInstance().enqueue(downLoadInfo);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352