//监听下载
public interface OnDownLoadListener {
void onPending(DownLoadInfo downLoadInfo);
void OnLOADING(DownLoadInfo downLoadInfo);
void onProgree(DownLoadInfo downLoadInfo, int start, int size);
void onStop(DownLoadInfo downLoadInfo, int start, int size);
void onComplet(DownLoadInfo downLoadInfo);
void onFailed(DownLoadInfo downLoadInfo);
}
model
两个表 使用greendao进行关联
//任务管理
public class TaskDispatcher {
//Cpu核心个数
private static final int CPU_NUM = Runtime.getRuntime().availableProcessors();
private static final int CPRE_POOL_SIZE = CPU_NUM + 1;
//最大下载任务数量
private static final int DOWNLOAD_MAX = CPU_NUM * 2 + 1;
//下载任务线程池
private ExecutorService executorService;
//正在下载的任务队列
private Map<Long, List<DownloadTask>> queueTaskMap = Collections.synchronizedMap(new HashMap<Long, List<DownloadTask>>());
//已经完成下载任务队列
private List<Long> waitloaded = Collections.synchronizedList(new ArrayList<Long>());
//已经完成下载任务队列
private List<Long> downloaded = Collections.synchronizedList(new ArrayList<Long>());
//单例对象
private static TaskDispatcher instance;
//数据库操作对象
private DaoUtils daoUtils;
//下载监听
private OnDownLoadListener onDownLoadListener;
private TaskDispatcher() {
daoUtils = new DaoUtils(MyApplication.getContext());
EventBus.getDefault().register(this);
}
/**
* 线程安全单例模式
*/
public static TaskDispatcher getInstance() {
if (instance == null) {
synchronized (TaskDispatcher.class) {
if (instance == null) {
instance = new TaskDispatcher();
}
}
}
return instance;
}
/**
* 初始化线程池
*/
private ExecutorService getExecutorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(CPRE_POOL_SIZE, DOWNLOAD_MAX,
60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory());
}
return executorService;
}
public synchronized void addListen(OnDownLoadListener onDownLoadListener) {
this.onDownLoadListener = onDownLoadListener;
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void getDownLoadInfo(DownLoadProgree downLoadProgree) {
Intent intent = new Intent(MyApplication.getContext(), DownloadCenterActivity.class);
DownLoadInfo downLoadInfo = downLoadProgree.getDownLoadInfo();
switch (downLoadProgree.getState()) {
case DownloadTask.PENDING:
//任务待进行
break;
case DownloadTask.LOADING:
//任务准备中
downLoadInfo.setState(DownloadTask.LOADING);
daoUtils.updateDownload(downLoadInfo);
NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon, "下载通知", downLoadInfo.getFilename() + "等待下载");
if (onDownLoadListener != null) {
onDownLoadListener.OnLOADING(downLoadInfo);
}
break;
case DownloadTask.PROGREE:
//任务下载中
int downloadSize = downLoadInfo.getDownloadSize() + downLoadProgree.getDownloadSize();
downLoadInfo.setDownloadSize(downloadSize);
downLoadInfo.setState(DownloadTask.PROGREE);
daoUtils.updateDownload(downLoadInfo);
NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
"下载通知", downLoadInfo.getFilename() + "正在下载,当前进度:" + downloadSize * 100 / downLoadInfo.getTotalSize() + "%");
if (onDownLoadListener != null) {
onDownLoadListener.onProgree(downLoadInfo, downLoadInfo.getDownloadSize(), downLoadInfo.getTotalSize());
}
break;
case DownloadTask.STOP:
//任务停止
downLoadInfo.setState(DownloadTask.STOP);
daoUtils.updateDownload(downLoadInfo);
NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
"下载通知", downLoadInfo.getFilename() + "停止下载,当前进度:" + downLoadInfo.getDownloadSize() * 100 / downLoadInfo.getTotalSize() + "%");
if (onDownLoadListener != null) {
onDownLoadListener.onStop(downLoadInfo, downLoadInfo.getDownloadSize(), downLoadInfo.getTotalSize());
}
break;
case DownloadTask.FAILED:
//任务失败
downLoadInfo.setState(DownloadTask.FAILED);
daoUtils.updateDownload(downLoadInfo);
NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
"下载通知", downLoadInfo.getFilename() + "下载失败");
if (onDownLoadListener != null) {
onDownLoadListener.onFailed(downLoadInfo);
}
queueTaskMap.get(downLoadInfo.getId()).get(downLoadProgree.getTaskId().intValue()).setStop(true);
break;
case DownloadTask.FINISHED:
//任务完成
if (downLoadInfo.getDownloadSize() == downLoadInfo.getTotalSize()) {
finished(downLoadInfo.getId());
downLoadInfo.setState(DownloadTask.FINISHED);
daoUtils.updateDownload(downLoadInfo);
NotificationUtils.create(MyApplication.getContext(), downLoadInfo.getId().intValue(), intent, R.drawable.ic_icon,
"下载通知", downLoadInfo.getFilename() + "下载完成");
if (onDownLoadListener != null) {
onDownLoadListener.onComplet(downLoadInfo);
}
}
break;
default:
break;
}
}
/**
* 任务入列下载
*/
public synchronized boolean enqueue(final DownLoadInfo downLoadInfo) {
final long id;
final List<DownloadTask> downloadTasks = new ArrayList<>();
final DownLoadInfo downLoadInfo1 = daoUtils.queryDownloadInfoBuilder(downLoadInfo.getUrl());
if(downLoadInfo1!=null){
id=downLoadInfo1.getId();
//判断下载任务是否存在且状态为暂停
if (downLoadInfo.getState() != 0 && downLoadInfo.getState() == DownloadTask.STOP) {
for (DownLoadProgree downLoadProgree : downLoadInfo.getDownLoadProgree()) {
DownloadTask downloadTask = new DownloadTask(downLoadProgree.getId());
if (queueTaskMap.entrySet().size() < DOWNLOAD_MAX) {
getExecutorService().execute(downloadTask);
downloadTasks.add(downloadTask);
} else {
waitloaded.add(id);
break;
}
}
}
}else{
id =daoUtils.insertDownload(downLoadInfo);
if (queueTaskMap.keySet().contains(id)) {
return false;
}
Api.getInstance().iRetrofit.download(downLoadInfo.getUrl(), "bytes=" + 0 + "-")
.compose(ApiSubscribe.<ResponseBody>io_io())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ResponseBody responseBody) {
int totalSize = (int) responseBody.contentLength();
int rateSzie = totalSize / 3;
downLoadInfo.setTotalSize(totalSize);
downLoadInfo.setState(DownloadTask.PENDING);
daoUtils.updateDownload(downLoadInfo);
for (int i = 0; i < 3; i++) {
DownloadTask downloadTask;
DownLoadProgree downLoadProgree = new DownLoadProgree();
downLoadProgree.setTaskId(downLoadInfo.getId());
downLoadProgree.setDownLoadInfo(downLoadInfo);
if (i == 2) {
downLoadProgree.setStart(i * rateSzie);
downLoadProgree.setEnd(totalSize);
downloadTask = new DownloadTask(daoUtils.insertDownloadTask(downLoadProgree));
} else {
downLoadProgree.setStart(i * rateSzie);
downLoadProgree.setEnd((i + 1) * rateSzie - 1);
downloadTask = new DownloadTask(daoUtils.insertDownloadTask(downLoadProgree));
}
if (queueTaskMap.entrySet().size() < DOWNLOAD_MAX) {
getExecutorService().execute(downloadTask);
downloadTasks.add(downloadTask);
} else {
waitloaded.add(id);
break;
}
}
queueTaskMap.put(downLoadInfo.getId(), downloadTasks);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
return true;
}
/**
* 多个任务入列下载
*/
public synchronized boolean enqueueAll(List<Long> idList) {
for (Long id : idList) {
enqueue(daoUtils.queryDownlodInfo(id));
}
return true;
}
/**
* 任务停止
*/
public synchronized void stop(long id) {
List<DownloadTask> downloadTasks = queueTaskMap.get(id);
for (DownloadTask downloadTask : downloadTasks) {
downloadTask.setStop(true);
}
promoteSyncTask();
}
/**
* 任务下载完成
*/
synchronized void finished(long id) {
queueTaskMap.remove(id);
downloaded.add(id);
promoteSyncTask();
}
/**
* 删除下载任务,是否删除文件
*/
public synchronized void deleteTask(DownLoadInfo downLoadInfo, boolean isDeleteFile) {
if (downLoadInfo != null) {
stop(downLoadInfo.getId());
if (downloaded.contains(downLoadInfo.getId())) {
downloaded.remove(downLoadInfo.getId());
if (isDeleteFile) {
File file = new File(downLoadInfo.getFilepath(), downLoadInfo.getFilename());
file.delete();
}
}
if (queueTaskMap.keySet().contains(downLoadInfo.getId())) {
queueTaskMap.remove(downLoadInfo.getId());
if (isDeleteFile) {
File file = new File(downLoadInfo.getFilepath(), downLoadInfo.getFilename());
file.delete();
}
promoteSyncTask();
}
}
}
/**
* 调度pending状态的任务,开始下载
*/
private synchronized void promoteSyncTask() {
if (waitloaded.isEmpty()) {
enqueueAll(waitloaded);
}
}
}
//任务下载线程
public class DownloadTask extends Thread {
//任务信息
private DownLoadProgree downLoadProgree;
//是否停止
private boolean Stop=false;
//当前任务下载进度
private int off=0;
//数据库操作类
private DaoUtils daoUtils;
public static final int PENDING=1;
public static final int LOADING = 2;
public static final int PROGREE = 3;
public static final int STOP = 4;
public static final int FAILED = 5;
public static final int FINISHED = 6;
@Override
public void run() {
start();
}
public DownloadTask(Long taskId) {
daoUtils=new DaoUtils(MyApplication.getContext());
downLoadProgree=daoUtils.queryDownlodTask(taskId);
}
public void start() {
downLoadProgree.setState(LOADING);
daoUtils.updateDownloadTask(downLoadProgree);
download();
}
private void download() {
Api.getInstance().iRetrofit.download(downLoadProgree.getDownLoadInfo().getUrl(),"bytes=" + downLoadProgree.getStart() + "-" + downLoadProgree.getEnd())
.compose(ApiSubscribe.<ResponseBody>io_io())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ResponseBody responseBody) {
try {
File file = new File(downLoadProgree.getDownLoadInfo().getFilepath(), downLoadProgree.getDownLoadInfo().getFilename());
InputStream is = null;
FileOutputStream fileOutputStream = null;
try {
is = responseBody.byteStream();
fileOutputStream = new FileOutputStream(file);
byte[] buffer = new byte[8192];//缓冲数组2kB
int len;
while ((len = is.read(buffer)) != -1) {
if (!Stop) {
fileOutputStream.write(buffer, 0, len);
off+= len;
//查询数据库
downLoadProgree.setState(PROGREE);
downLoadProgree.setDownloadSize(len);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
} else {
is.close();
fileOutputStream.close();
//更新数据库
downLoadProgree.setState(STOP);
downLoadProgree.setDownloadSize(0);
downLoadProgree.setStart(off);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
return;
}
}
downLoadProgree.setState(FINISHED);
downLoadProgree.setDownloadSize(off);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
} finally {
//关闭IO流
is.close();
}
} catch (IOException e) {
e.printStackTrace();
LogUtils.e("下载出错" + e);
downLoadProgree.setState(FAILED);
downLoadProgree.setDownloadSize(0);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
}
}
@Override
public void onError(Throwable e) {
downLoadProgree.setState(FAILED);
downLoadProgree.setDownloadSize(0);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
}
@Override
public void onComplete() {
}
});
}
private void download1() {
Api.getInstance().iRetrofit.download(downLoadProgree.getDownLoadInfo().getUrl(),"bytes=" + downLoadProgree.getStart() + "-" + downLoadProgree.getEnd())
.compose(ApiSubscribe.<ResponseBody>io_io())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ResponseBody responseBody) {
try {
File file = new File(downLoadProgree.getDownLoadInfo().getFilepath(), downLoadProgree.getDownLoadInfo().getFilename());
BufferedSource bufferedSource = null;
BufferedSink bufferedSink = null;
try {
bufferedSource = responseBody.source();
bufferedSink = Okio.buffer(Okio.sink(file));
byte[] buffer = new byte[8192];//缓冲数组2kB
int len;
while ((len = bufferedSource.read(buffer)) != -1) {
if (!Stop) {
bufferedSink.write(buffer);
off+= len;
//查询数据库
downLoadProgree.setState(PROGREE);
downLoadProgree.setDownloadSize(len);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
} else {
closeQuietly(bufferedSource);
closeQuietly(bufferedSink);
//更新数据库
downLoadProgree.setState(STOP);
downLoadProgree.setDownloadSize(0);
downLoadProgree.setStart(off);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
return;
}
}
downLoadProgree.setState(FINISHED);
downLoadProgree.setDownloadSize(off);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
} finally {
//关闭IO流
closeQuietly(bufferedSource);
closeQuietly(bufferedSink);
}
} catch (IOException e) {
e.printStackTrace();
LogUtils.e("下载出错" + e);
downLoadProgree.setState(FAILED);
downLoadProgree.setDownloadSize(0);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
}
}
@Override
public void onError(Throwable e) {
downLoadProgree.setState(FAILED);
downLoadProgree.setDownloadSize(0);
daoUtils.updateDownloadTask(downLoadProgree);
EventBus.getDefault().post(downLoadProgree);
}
@Override
public void onComplete() {
}
});
}
public void setStop(boolean stop) {
Stop = stop;
}
}
使用
DownLoadInfo downLoadInfo=new DownLoadInfo();
downLoadInfo.setUrl(data.getData().get(0).getUrl());
downLoadInfo.setFilename(data.getData().get(0).getAuthor()+"-"+data.getData().get(0).getTitle()+".mp3");
downLoadInfo.setFilepath(Environment.getExternalStorageDirectory().getPath() + File.separator + "mv");
TaskDispatcher.getInstance().enqueue(downLoadInfo);