OkGo是一个基于okhttp的标准RESTful风格的网络框架,它除了能实现基本的网络请求功能外,还支持的文件的上传和下载功能,我们主要分析它的下载功能,使用时需要引入以下两个依赖:
implementation 'com.lzy.net:okgo:3.0.4'
implementation 'com.lzy.net:okserver:2.0.5'
全局配置:
OkDownload okDownload = OkDownload.getInstance();
okDownload.setFolder(Environment.getExternalStorageDirectory().getAbsolutePath() + File.separator + "Download" + File.separator); //设置全局下载目录
okDownload.getThreadPool().setCorePoolSize(3); //设置同时下载数量
okDownload.addOnAllTaskEndListener(listener); //设置所有任务监听
基本下载方法:
GetRequest<File> request = OkGo.<File>get(URL); //构建下载请求
DownloadTask task = OkDownload.request(tag, request); //创建下载任务,tag为一个任务的唯一标示
task.register(new com.lzy.okserver.download.DownloadListener(tag) {
@Override
public void onStart(Progress progress) {
}
@Override
public void onProgress(Progress progress) {
}
@Override
public void onError(Progress progress) {
}
@Override
public void onFinish(File file, Progress progress) {
}
@Override
public void onRemove(Progress progress) {
}
}).save();
task.fileName("update.zip"); //设置下载的文件名
task.start(); //开始或继续下载
task.restart(); //重新下载
task.pause(); //暂停下载
task.remove(); //删除下载,只删除记录,不删除文件
task.remove(true); //删除下载,同时删除记录和文件
接下来我们就深入源码看一下它是如何实现的:
OkDownload
OkDownload是一个单例,看一下他的构造方法:
private OkDownload() {
folder = Environment.getExternalStorageDirectory() + File.separator + "download" + File.separator;
IOUtils.createFolder(folder);
threadPool = new DownloadThreadPool();
taskMap = new ConcurrentHashMap<>();
//获取所有未完成的下载任务,并校验数据的有效性,防止下载过程中退出,第二次进入的时候,由于状态没有更新导致的状态错误
List<Progress> taskList = DownloadManager.getInstance().getDownloading();
for (Progress info : taskList) {
if (info.status == Progress.WAITING || info.status == Progress.LOADING || info.status == Progress.PAUSE) {
info.status = Progress.NONE;
}
}
//将任务状态更新至数据库
DownloadManager.getInstance().replace(taskList);
}
Request
GetRequest<File> request = OkGo.<File>get(URL)创建了一个request网络请求对象。
public Request(String url) {
this.url = url;
baseUrl = url;
OkGo go = OkGo.getInstance();
//默认添加 Accept-Language
String acceptLanguage = HttpHeaders.getAcceptLanguage();
if (!TextUtils.isEmpty(acceptLanguage)) headers(HttpHeaders.HEAD_KEY_ACCEPT_LANGUAGE, acceptLanguage);
//默认添加 User-Agent
String userAgent = HttpHeaders.getUserAgent();
if (!TextUtils.isEmpty(userAgent)) headers(HttpHeaders.HEAD_KEY_USER_AGENT, userAgent);
//添加公共请求参数
if (go.getCommonParams() != null) params(go.getCommonParams());
if (go.getCommonHeaders() != null) headers(go.getCommonHeaders());
//添加缓存模式
retryCount = go.getRetryCount();
cacheMode = go.getCacheMode();
cacheTime = go.getCacheTime();
}
DonwloadTask
将Request封装成一个下载任务对象DownloadTask:
public static DownloadTask request(String tag, Request<File, ? extends Request> request) {
Map<String, DownloadTask> taskMap = OkDownload.getInstance().getTaskMap();
//注意:tag是任务的唯一标示,相同的tag不同的url也会被认为是同一个任务
DownloadTask task = taskMap.get(tag);
if (task == null) {
task = new DownloadTask(tag, request);
taskMap.put(tag, task);
}
return task;
}
我们看一下DownloadTask的构造方法:
public DownloadTask(String tag, Request<File, ? extends Request> request) {
HttpUtils.checkNotNull(tag, "tag == null");
progress = new Progress();
progress.tag = tag;
progress.folder = OkDownload.getInstance().getFolder();
progress.url = request.getBaseUrl();
progress.status = Progress.NONE;
progress.totalSize = -1;
progress.request = request;
executor = OkDownload.getInstance().getThreadPool().getExecutor();
listeners = new HashMap<>();
}
这个构造方法主要创建了一个Progress对象,那我们看一下Progress这个类:
public class Progress implements Serializable {
private static final long serialVersionUID = 6353658567594109891L;
public static final int NONE = 0; //无状态
public static final int WAITING = 1; //等待
public static final int LOADING = 2; //下载中
public static final int PAUSE = 3; //暂停
public static final int ERROR = 4; //错误
public static final int FINISH = 5; //完成
public static final String TAG = "tag";
public static final String URL = "url";
public static final String FOLDER = "folder";
public static final String FILE_PATH = "filePath";
public static final String FILE_NAME = "fileName";
public static final String FRACTION = "fraction";
public static final String TOTAL_SIZE = "totalSize";
public static final String CURRENT_SIZE = "currentSize";
public static final String STATUS = "status";
public static final String PRIORITY = "priority";
public static final String DATE = "date";
public static final String REQUEST = "request";
public static final String EXTRA1 = "extra1";
public static final String EXTRA2 = "extra2";
public static final String EXTRA3 = "extra3";
public String tag; //下载的标识键
public String url; //网址
public String folder; //保存文件夹
public String filePath; //保存文件地址
public String fileName; //保存的文件名
public float fraction; //下载的进度,0-1
public long totalSize; //总字节长度, byte
public long currentSize; //本次下载的大小, byte
public transient long speed; //网速,byte/s
public int status; //当前状态
public int priority; //任务优先级
public long date; //创建时间
public Request<?, ? extends Request> request; //网络请求
public Serializable extra1; //额外的数据
public Serializable extra2; //额外的数据
public Serializable extra3; //额外的数据
public Throwable exception; //当前进度出现的异常
private transient long tempSize; //每一小段时间间隔的网络流量
private transient long lastRefreshTime; //最后一次刷新的时间
private transient List<Long> speedBuffer; //网速做平滑的缓存,避免抖动过快
......
可见一个Progress保存了一个下载任务的各种信息,可以看到一个任务总共有六种状态,接下来我们就开始执行下载任务,在执行下载任务前,必须要调用DownloadTask的save方法,将任务信息更新至数据库:
public DownloadTask save() {
if (!TextUtils.isEmpty(progress.folder) && !TextUtils.isEmpty(progress.fileName)) {
progress.filePath = new File(progress.folder, progress.fileName).getAbsolutePath();
}
DownloadManager.getInstance().replace(progress);
return this;
}
/**
* replace 语句有如下行为特点
* 1. replace语句会删除原有的一条记录, 并且插入一条新的记录来替换原记录。
* 2. 一般用replace语句替换一条记录的所有列, 如果在replace语句中没有指定某列, 在replace之后这列的值被置空 。
* 3. replace语句根据主键的值确定被替换的是哪一条记录
* 4. 如果执行replace语句时, 不存在要替换的记录, 那么就会插入一条新的记录。
* 5. replace语句不能根据where子句来定位要被替换的记录
* 6. 如果新插入的或替换的记录中, 有字段和表中的其他记录冲突, 那么会删除那条其他记录。
*/
public boolean replace(T t) {
if (t == null) return false;
long start = System.currentTimeMillis();
lock.lock();
try {
database.beginTransaction();
database.replace(getTableName(), null, getContentValues(t));
database.setTransactionSuccessful();
return true;
} catch (Exception e) {
OkLogger.printStackTrace(e);
} finally {
database.endTransaction();
lock.unlock();
OkLogger.v(TAG, System.currentTimeMillis() - start + " replaceT");
}
return false;
}
然后开始执行下载任务:
public void start() {
if (OkDownload.getInstance().getTask(progress.tag) == null || DownloadManager.getInstance().get(progress.tag) == null) {
throw new IllegalStateException("you must call DownloadTask#save() before DownloadTask#start()!");
}
if (progress.status == Progress.NONE || progress.status == Progress.PAUSE || progress.status == Progress.ERROR) {
postOnStart(progress);
postWaiting(progress);
priorityRunnable = new PriorityRunnable(progress.priority, this);
executor.execute(priorityRunnable);
} else if (progress.status == Progress.FINISH) {
if (progress.filePath == null) {
postOnError(progress, new StorageException("the file of the task with tag:" + progress.tag + " may be invalid or damaged, please call the method restart() to download again!"));
} else {
File file = new File(progress.filePath);
if (file.exists() && file.length() == progress.totalSize) {
postOnFinish(progress, new File(progress.filePath));
} else {
postOnError(progress, new StorageException("the file " + progress.filePath + " may be invalid or damaged, please call the method restart() to download again!"));
}
}
} else {
OkLogger.w("the task with tag " + progress.tag + " is already in the download queue, current task status is " + progress.status);
}
}
任务在无状态、暂停和错误状态下,调用start方法会执行PriorityRunnable,任务已完成状态下给用户相应的提示,在等待和下载中不执行任务操作:
public class PriorityRunnable extends PriorityObject<Runnable> implements Runnable {
public PriorityRunnable(int priority, Runnable obj) {
super(priority, obj);
}
@Override
public void run() {
this.obj.run();
}
}
可以看到它最终执行了DownloadTask的run方法:
@Override
public void run() {
//检查断点信息
long startPosition = progress.currentSize;
if (startPosition < 0) {
postOnError(progress, OkGoException.BREAKPOINT_EXPIRED());
return;
}
if (startPosition > 0) {
if (!TextUtils.isEmpty(progress.filePath)) {
File file = new File(progress.filePath);
if (!file.exists()) {
postOnError(progress, OkGoException.BREAKPOINT_NOT_EXIST());
return;
}
}
}
//请求从断点开始的数据
Response response;
try {
Request<?, ? extends Request> request = progress.request;
request.headers(HttpHeaders.HEAD_KEY_RANGE, "bytes=" + startPosition + "-");
response = request.execute();
} catch (IOException e) {
postOnError(progress, e);
return;
}
//检查状态码
int code = response.code();
if (code == 404 || code >= 500) {
postOnError(progress, HttpException.NET_ERROR());
return;
}
ResponseBody body = response.body();
if (body == null) {
postOnError(progress, new HttpException("response body is null"));
return;
}
if (progress.totalSize == -1) {
progress.totalSize = body.contentLength();
}
//创建下载的文件名
String fileName = progress.fileName;
if (TextUtils.isEmpty(fileName)) {
fileName = HttpUtils.getNetFileName(response, progress.url);
progress.fileName = fileName;
}
if (!IOUtils.createFolder(progress.folder)) {
postOnError(progress, StorageException.NOT_AVAILABLE());
return;
}
//创建文件并检查断点合法性
File file;
if (TextUtils.isEmpty(progress.filePath)) {
file = new File(progress.folder, fileName);
progress.filePath = file.getAbsolutePath();
} else {
file = new File(progress.filePath);
}
if (startPosition > 0 && !file.exists()) {
postOnError(progress, OkGoException.BREAKPOINT_EXPIRED());
return;
}
if (startPosition > progress.totalSize) {
postOnError(progress, OkGoException.BREAKPOINT_EXPIRED());
return;
}
if (startPosition == 0 && file.exists()) {
IOUtils.delFileOrFolder(file);
}
if (startPosition == progress.totalSize && startPosition > 0) {
if (file.exists() && startPosition == file.length()) {
postOnFinish(progress, file);
return;
} else {
postOnError(progress, OkGoException.BREAKPOINT_EXPIRED());
return;
}
}
//开始下载
RandomAccessFile randomAccessFile;
try {
randomAccessFile = new RandomAccessFile(file, "rw");
randomAccessFile.seek(startPosition);
progress.currentSize = startPosition;
} catch (Exception e) {
postOnError(progress, e);
return;
}
try {
DownloadManager.getInstance().replace(progress);
//文件下载操作
download(body.byteStream(), randomAccessFile, progress);
} catch (IOException e) {
postOnError(progress, e);
return;
}
//下载结束检查进度状态
if (progress.status == Progress.PAUSE) {
postPause(progress);
} else if (progress.status == Progress.LOADING) {
if (file.length() == progress.totalSize) {
postOnFinish(progress, file);
} else {
postOnError(progress, OkGoException.BREAKPOINT_EXPIRED());
}
} else {
postOnError(progress, OkGoException.UNKNOWN());
}
}
//执行文件下载
private void download(InputStream input, RandomAccessFile out, Progress progress) throws IOException {
if (input == null || out == null) return;
progress.status = Progress.LOADING;
byte[] buffer = new byte[BUFFER_SIZE];
BufferedInputStream in = new BufferedInputStream(input, BUFFER_SIZE);
int len;
try {
while ((len = in.read(buffer, 0, BUFFER_SIZE)) != -1 && progress.status == Progress.LOADING) {
out.write(buffer, 0, len);
Progress.changeProgress(progress, len, progress.totalSize, new Progress.Action() {
@Override
public void call(Progress progress) {
postLoading(progress);
}
});
}
} finally {
IOUtils.closeQuietly(out);
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(input);
}
}
每次回调下载进度时,它的下载速度计算会取最近10次的速度平均值(不足1次则取全部速度的平均值),这样可以避免网速抖动过大。
/** 平滑网速,避免抖动过大 */
private long bufferSpeed(long speed) {
speedBuffer.add(speed);
if (speedBuffer.size() > 10) {
speedBuffer.remove(0);
}
long sum = 0;
for (float speedTemp : speedBuffer) {
sum += speedTemp;
}
return sum / speedBuffer.size();
}
这样,一个完整的下载流程就结束了。当然,它支持多任务同时下载,可以看它负责下载管理的线程池:
public class DownloadThreadPool {
private static final int MAX_POOL_SIZE = 5; //最大线程池的数量
private static final int KEEP_ALIVE_TIME = 1; //存活的时间
private static final TimeUnit UNIT = TimeUnit.HOURS; //时间单位
private int corePoolSize = 3; //核心线程池的数量,同时能执行的线程数量,默认3个
private XExecutor executor; //线程池执行器
public XExecutor getExecutor() {
if (executor == null) {
synchronized (DownloadThreadPool.class) {
if (executor == null) {
executor = new XExecutor(corePoolSize, MAX_POOL_SIZE, KEEP_ALIVE_TIME, UNIT, //
new PriorityBlockingQueue<Runnable>(), //无限容量的缓冲队列
Executors.defaultThreadFactory(), //线程创建工厂
new ThreadPoolExecutor.AbortPolicy()); //继续超出上限的策略,阻止
}
}
}
return executor;
}
/** 必须在首次执行前设置,否者无效 ,范围1-5之间 */
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize <= 0) corePoolSize = 1;
if (corePoolSize > MAX_POOL_SIZE) corePoolSize = MAX_POOL_SIZE;
this.corePoolSize = corePoolSize;
}
/** 执行任务 */
public void execute(Runnable runnable) {
if (runnable != null) {
getExecutor().execute(runnable);
}
}
/** 移除线程 */
public void remove(Runnable runnable) {
if (runnable != null) {
getExecutor().remove(runnable);
}
}
}
该线程池的任务缓冲队列可根据优先级进行插入任务操作:
private void _enqueue(Node<E> node) {
boolean added = false;
Node<E> curr = head;
Node<E> temp = null;
while (curr.next != null) {
temp = curr.next;
if (temp.getPriority() < node.getPriority()) {
curr.next = node;
node.next = temp;
added = true;
break;
}
curr = curr.next;
}
if (!added) {
last = last.next = node;
}
}
总的来说,OkGo的下载还是比较简单的,优点如下:
1.网络请求基于OkHttp进行了封装;
2.使用简单方便,支持断点续传;
3.可以设置任务优先级。
当然,也有不足的地方,如不支持一个文件的多线程下载。