Kotlin+Coroutines(协程)+Retrofit+LiveData+Room安卓全局多文件断点下载

引言

做这个起初的目的是为了学习Kotlin协程,以及JetPack中的相关组件,并且老项目准备重构,于是打算彻彻底底的进行换血.
其实中间写了很多版,包括用RxJava也写过,不过写到一半儿,看了一篇Rxjava已过时的文章直接放弃了,其实我没有在项目中"真正"用过Rxjava,正打算用,就看到了这种言论,不过说得其实也有道理,Rxjava的出现是当时Java环境的需要,但是Kotlin以及kotlin协程的稳定可能真的是Rxjava消亡的契机吧.但是真正让我放弃的是Rxjava的复杂性.Rxjava为了突显链式调用,定义了很多云里雾里的函数(至少让我云里雾里),我觉得很多函数是没必要的或者说不应该做为Rxjava标准库的一部分,它扩展的功能太多了.
官方本身也为我们提供了一个用于管理下载的组件DownloadManager,但是它针对的是所有的Android app,所以它对app自身的切合度太低了,想要实现自定义上的操作,过于复杂了.如果我们要做一个拥有下载管理功能的app,那么这个功能还是应该自己来实现的.
这个Demo本来就是在学习中诞生的,可能代码中很多不足,或者出现了某些错误,还请各位大佬予以指正和提示

预览

按钮没有做状态选择器,看不出来点击步骤,抱歉

Screenrecorder-2020-04-01-10-33-40-139 00_00_00-00_00_30.gif

1 Room 用于存储下载数据

Room想要配合协程使用,必须加入room-ktx依赖,导入room-ktx之后同时也会导入Kotlin协程相关的部分

implementation 'androidx.room:room-runtime:2.2.5'
implementation 'androidx.room:room-ktx:2.2.5'
kapt 'androidx.room:room-compiler:2.2.5'

同时需要加入对kotlin注解编译器的支持

apply plugin: 'kotlin-kapt'

1.1 DownloadInfo用于保存下载数据的数据类

@Entity
@TypeConverters(Converters::class)
data class DownloadInfo(
    @PrimaryKey
    var url: String = "",
    var path: String? = null,
    var data: Serializable? = null,//跟下载相关的数据信息
    var fileName: String? = null,
    var contentLength: Long = -1,
    var currentLength: Long = 0,
    var status: Int = NONE,
    var lastRefreshTime: Long = 0
) {
    companion object Status {
        const val NONE = 0  //无状态
        const val WAITING = 1 //等待中
        const val LOADING = 2 //下载中
        const val PAUSE = 3 //暂停
        const val ERROR = 4 //错误
        const val DONE = 5 //完成
    }

    /**
     * 重置任务
     */
    fun reset() {
        currentLength = 0
        contentLength = -1
        status = NONE
        lastRefreshTime = 0
    }
}

Serializable字段我们需要为其添加类型转换器,详情参考官网

class Converters {

    @TypeConverter
    fun toByteArray(serializable: Serializable?): ByteArray? {
        serializable ?: return null
        var byteArrayOutputStream: ByteArrayOutputStream? = null
        var objectOutputStream: ObjectOutputStream? = null
        try {
            byteArrayOutputStream = ByteArrayOutputStream()
            objectOutputStream = ObjectOutputStream(byteArrayOutputStream)
            objectOutputStream.writeObject(serializable)

            objectOutputStream.flush()
            return byteArrayOutputStream.toByteArray()
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            byteArrayOutputStream?.close()
            objectOutputStream?.close()
        }
        return null
    }

    @TypeConverter
    fun toSerializable(byteArray: ByteArray?): Serializable? {
        byteArray ?: return null
        var byteArrayOutputStream: ByteArrayInputStream? = null
        var objectInputStream: ObjectInputStream? = null
        try {
            byteArrayOutputStream = ByteArrayInputStream(byteArray)
            objectInputStream = ObjectInputStream(byteArrayOutputStream)
            return objectInputStream.readObject() as Serializable
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            byteArrayOutputStream?.close()
            objectInputStream?.close()
        }
        return null
    }
}

1.2 DownloadDao

定义了访问下载数据的方法

@Dao
interface DownloadDao {

    /**
     * 获取所有
     */
    @Query("select * from DownloadInfo")
    suspend fun queryAll(): MutableList<DownloadInfo>

    /**
     * 通过状态查询任务
     */
    @Query("select * from DownloadInfo where status =:status")
    suspend fun queryByStatus(status: Int): MutableList<DownloadInfo>

    /**
     * 查询正在下载的任务
     */
    @Query("select * from DownloadInfo where status != ${DownloadInfo.DONE} and status != ${DownloadInfo.NONE}")
    suspend fun queryLoading(): MutableList<DownloadInfo>

    /**
     * 查询正在下载的任务的url
     */
    @Query("select url from DownloadInfo where status != ${DownloadInfo.DONE}  and status != ${DownloadInfo.NONE}")
    suspend fun queryLoadingUrls(): MutableList<String>

    /**
     * 查询下载完成的任务
     */
    @Query("select * from DownloadInfo where status == ${DownloadInfo.DONE}")
    suspend fun queryDone(): MutableList<DownloadInfo>

    /**
     * 查询下载完成的任务的url
     */
    @Query("select url from DownloadInfo where status == ${DownloadInfo.DONE}")
    suspend fun queryDoneUrls(): MutableList<String>

    /**
     * 通过url查询,每一个任务他们唯一的标志就是url
     */
    @Query("select * from DownloadInfo where url like:url")
    suspend fun queryByUrl(url: String): DownloadInfo?

    /**
     * 插入或替换
     */
    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertOrReplace(vararg downloadData: DownloadInfo): List<Long>

    /**
     * 删除
     */
    @Delete
    suspend fun delete(downloadDao: DownloadInfo)

}

1.3 AppDataBase

数据库持有者

@Database(entities = [DownloadInfo::class], version = 1)
abstract class AppDataBase : RoomDatabase() {

    abstract fun downloadDao(): DownloadDao
}

1.4 RoomClient

构建AppDataBase,请忽略createMigrations方法,这里暂时没有数据库升级的需求

object RoomClient {

    private const val DATA_BASE_NAME = "download.db"

    val dataBase: AppDataBase by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
        Room
            .databaseBuilder(
                App.instance.applicationContext,
                AppDataBase::class.java,
                DATA_BASE_NAME
            )
            .build()
    }

    private fun createMigrations(): Array<Migration> {
        return arrayOf()
    }

}

2 下载逻辑

下载逻辑包含以下几部分

1.DownloadService
2.RetrofitDownload
3.DownloadScope
4.AppDownload

2.1 DownloadService

定义了断点下载的方法,Retrofit2.6之后直接支持配合协程使用

interface DownloadService {

    @Streaming
    @GET
    suspend fun download(@Header("RANGE") start: String? = "0", @Url url: String?): Response<ResponseBody>
}

2.2 RetrofitDownload

构建Retrofit,baseurl的填写满足http://或者https://开头且后面有内容就可以了

object RetrofitDownload {

    val downloadService: DownloadService by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
        val okHttpClient = createOkHttpClient()
        val retrofit = createRetrofit(okHttpClient)
        retrofit.create(DownloadService::class.java)
    }

    private fun createOkHttpClient(): OkHttpClient {
        return OkHttpClient.Builder().build()
    }

    private fun createRetrofit(client: OkHttpClient): Retrofit {
        return Retrofit.Builder()
            .baseUrl("http://download")
            .client(client)
            .build()
    }
}

2.3 DownloadScope[核心]

代表一个下载任务,实际的下载也在里面进行

/**
 * 代表一个下载任务
 * url将做为下载任务的唯一标识
 * 不要直接在外部直接创建此对象,那样就可能无法同一管理下载任务,请通过[AppDownload.request]获取此对象
 */
class DownloadScope(
    var url: String,
    var path: String? = null,
    private val data: Serializable? = null
) : CoroutineScope by CoroutineScope(EmptyCoroutineContext) {

    private var downloadJob: Job? = null
    private val downloadData = MutableLiveData<DownloadInfo>()

    init {
        launch(Dispatchers.Main) {
            val downloadInfoDeferred = async(Dispatchers.IO) {
                RoomClient.dataBase.downloadDao().queryByUrl(url)
            }
            var downloadInfo = downloadInfoDeferred.await()
            //数据库中并没有任务,这是一个新的下载任务
            if (downloadInfo == null)
                downloadInfo = DownloadInfo(url = url, path = path, data = data)
            //将原本正在下载中的任务恢复到暂停状态,防止意外退出出现的状态错误
            if (downloadInfo.status == DownloadInfo.LOADING)
                downloadInfo.status = DownloadInfo.PAUSE
            downloadData.value = downloadInfo
        }
    }

    /**
     * 获取[DownloadInfo]
     */
    fun downloadInfo(): DownloadInfo? {
        return downloadData.value
    }

    /**
     * 添加下载任务观察者
     */
    fun observer(lifecycleOwner: LifecycleOwner, observer: Observer<DownloadInfo>) {
        downloadData.observe(lifecycleOwner, observer)
    }

    /**
     * 开始任务的下载
     * [DownloadInfo]是在协程中进行创建的,它的创建会优先从数据库中获取,但这种操作是异步的,详情请看init代码块
     * 我们需要通过观察者观察[DownloadInfo]来得知它是否已经创建完成,只有当他创建完成且不为空(如果创建完成,它一定不为空)
     * 才可以交由[AppDownload]进行下载任务的启动
     * 任务的开始可能并不是立即的,任务会受到[AppDownload]的管理
     */
    fun start() {
        var observer: Observer<DownloadInfo>? = null
        observer = Observer { downloadInfo ->
            downloadInfo?.let {
                observer?.let { downloadData.removeObserver(it) }
                when (downloadInfo.status) {
                    DownloadInfo.PAUSE, DownloadInfo.ERROR, DownloadInfo.NONE -> {
                        change(DownloadInfo.WAITING)
                        AppDownload.launchScope(this@DownloadScope)
                    }
                }
            }
        }
        downloadData.observeForever(observer)
    }

    /**
     * 启动协程进行下载
     * 请不要尝试在外部调用此方法,那样会脱离[AppDownload]的管理
     */
    fun launch() = launch {
        try {
            download()
            change(DownloadInfo.DONE)
        } catch (e: Throwable) {
            Log.w("DownloadScope", "error:${e.message}")
            when (e) {
                !is CancellationException -> change(DownloadInfo.ERROR)
            }
        } finally {
            AppDownload.launchNext(url)
        }
    }.also { downloadJob = it }

    private suspend fun download() = withContext(context = Dispatchers.IO, block = {
        change(DownloadInfo.LOADING)
        val downloadInfo = downloadData.value
        downloadInfo ?: throw IOException("Download info is null")
        val startPosition = downloadInfo.currentLength
        //验证断点有效性
        if (startPosition < 0) throw IOException("Start position less than zero")
        //下载的文件是否已经被删除
        if (startPosition > 0 && !TextUtils.isEmpty(downloadInfo.path))
            if (!File(downloadInfo.path).exists()) throw IOException("File does not exist")
        val response = RetrofitDownload.downloadService.download(
            start = "bytes=$startPosition-",
            url = downloadInfo.url
        )
        val responseBody = response.body()
        responseBody ?: throw IOException("ResponseBody is null")
        //文件长度
        if (downloadInfo.contentLength < 0)
            downloadInfo.contentLength = responseBody.contentLength()
        //保存的文件名称
        if (TextUtils.isEmpty(downloadInfo.fileName))
            downloadInfo.fileName = UrlUtils.getUrlFileName(downloadInfo.url)
        //创建File,如果已经指定文件path,将会使用指定的path,如果没有指定将会使用默认的下载目录
        val file: File
        if (TextUtils.isEmpty(downloadInfo.path)) {
            file = File(AppDownload.downloadFolder, downloadInfo.fileName)
            downloadInfo.path = file.absolutePath
        } else file = File(downloadInfo.path)
        //再次验证下载的文件是否已经被删除
        if (startPosition > 0 && !file.exists())
            throw IOException("File does not exist")
        //再次验证断点有效性
        if (startPosition > downloadInfo.contentLength)
            throw IOException("Start position greater than content length")
        //验证下载完成的任务与实际文件的匹配度
        if (startPosition == downloadInfo.contentLength && startPosition > 0)
            if (file.exists() && startPosition == file.length()) {
                change(DownloadInfo.DONE)
                return@withContext
            } else throw IOException("The content length is not the same as the file length")
        //写入文件
        val randomAccessFile = RandomAccessFile(file, "rw")
        randomAccessFile.seek(startPosition)
        downloadInfo.currentLength = startPosition
        val inputStream = responseBody.byteStream()
        val bufferSize = 1024 * 8
        val buffer = ByteArray(bufferSize)
        val bufferedInputStream = BufferedInputStream(inputStream, bufferSize)
        var readLength: Int
        try {
            while (bufferedInputStream.read(
                    buffer, 0, bufferSize
                ).also {
                    readLength = it
                } != -1 && downloadInfo.status == DownloadInfo.LOADING && isActive//isActive保证任务能被及时取消
            ) {
                randomAccessFile.write(buffer, 0, readLength)
                downloadInfo.currentLength += readLength
                val currentTime = System.currentTimeMillis()
                if (currentTime - downloadInfo.lastRefreshTime > 300) {
                    change(DownloadInfo.LOADING)
                    downloadInfo.lastRefreshTime = currentTime
                }
            }
        } finally {
            inputStream.close()
            randomAccessFile.close()
            bufferedInputStream.close()
        }
    })

    /**
     * 更新任务
     * @param status [DownloadInfo.Status]
     */
    private fun change(status: Int) = launch(Dispatchers.Main) {
        val downloadInfo = downloadData.value
        downloadInfo ?: return@launch
        downloadInfo.status = status
        withContext(Dispatchers.IO) {
            RoomClient.dataBase.downloadDao().insertOrReplace(downloadInfo)
        }
        downloadData.value = downloadInfo
    }

    /**
     * 暂停任务
     * 只有等待中的任务和正在下载中的任务才可以进行暂停操作
     */
    fun pause() {
        cancel(CancellationException("pause"))
        val downloadInfo = downloadData.value
        downloadInfo?.let {
            if (it.status == DownloadInfo.LOADING || it.status == DownloadInfo.WAITING)
                change(DownloadInfo.PAUSE)
        }
    }

    /**
     * 删除任务,删除任务会同时删除已经在数据库中保存的下载信息
     */
    fun remove() = launch(Dispatchers.Main) {
        this@DownloadScope.cancel(CancellationException("remove"))
        val downloadInfo = downloadData.value
        downloadInfo?.reset()
        downloadData.value = downloadInfo
        withContext(Dispatchers.IO) {
            downloadInfo?.let {
                RoomClient.dataBase.downloadDao().delete(it)
                //同时删除已下载的文件
                it.path?.let { path ->
                    val file = File(path)
                    if (file.exists()) file.delete()
                }
            }
        }
    }

    /**
     * 取消[downloadJob],将会中断正在进行的下载任务
     */
    private fun cancel(cause: CancellationException) {
        downloadJob?.cancel(cause)
    }

    /**
     * 是否是等待任务
     */
    fun isWaiting(): Boolean {
        val downloadInfo = downloadData.value
        downloadInfo ?: return false
        return downloadInfo.status == DownloadInfo.WAITING
    }

    /**
     * 是否是正在下载的任务
     */
    fun isLoading():Boolean {
        val downloadInfo = downloadData.value
        downloadInfo ?: return false
        return downloadInfo.status == DownloadInfo.LOADING
    }
}

2.4 AppDownload[核心]

对DownloadScope的管理,同时也是获取DownloadScope的唯一途径

object AppDownload {

    private const val MAX_SCOPE = 3

    val downloadFolder: String? by lazy {
        Environment.getExternalStorageState()
        App.instance.applicationContext.getExternalFilesDir(Environment.DIRECTORY_DOWNLOADS)
            ?.absolutePath
    }

    private val scopeMap: ConcurrentHashMap<String, DownloadScope> by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
        ConcurrentHashMap<String, DownloadScope>()
    }

    private val taskScopeMap: ConcurrentHashMap<String, DownloadScope> by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) {
        ConcurrentHashMap<String, DownloadScope>()
    }

    /**
     * 请求一个下载任务[DownloadScope]
     * 这是创建[DownloadScope]的唯一途径,请不要通过其他方式创建[DownloadScope]
     * 首次任务调用此方法获取[DownloadScope]并不会在数据库中生成数据
     * 首次任务只有调用了[DownloadScope.start]并且成功进入[DownloadInfo.WAITING]状态才会在数据库中生成数据
     * 首次任务的判断依据为数据库中是否保留有当前的任务数据
     */
    fun request(url: String?, data: Serializable? = null, path: String? = null): DownloadScope? {
        if (TextUtils.isEmpty(url)) return null
        var downloadScope = scopeMap[url]
        if (downloadScope == null) {
            downloadScope = DownloadScope(url = url!!, data = data, path = path)
            scopeMap[url] = downloadScope
        }
        return downloadScope
    }

    /**
     * 通过url恢复任务
     *
     * @param urls 需要恢复任务的连接,url请通过DownloadDao进行获取
     */
    fun restore(urls: List<String>): MutableList<DownloadScope> {
        val downloadScopes = mutableListOf<DownloadScope>()
        for (url in urls) {
            var downloadScope = scopeMap[url]
            if (downloadScope == null) {
                downloadScope = DownloadScope(url = url)
                scopeMap[url] = downloadScope
            }
            downloadScopes.add(downloadScope)
        }
        return downloadScopes
    }

    /**
     * 暂停所有的任务
     * 只有任务的状态为[DownloadInfo.WAITING]和[DownloadInfo.LOADING]才可以被暂停
     * 暂停任务会先暂停[DownloadInfo.WAITING]的任务而后再暂停[DownloadInfo.LOADING]的任务
     */
    fun pauseAll() {
        for (entry in scopeMap) {
            val downloadScope = entry.value
            if (downloadScope.isWaiting())
                downloadScope.pause()
        }
        for (entry in scopeMap) {
            val downloadScope = entry.value
            if (downloadScope.isLoading())
                downloadScope.pause()
        }
    }

    /**
     * 移除所有的任务
     * 移除任务会先移除状态不为[DownloadInfo.LOADING]的任务
     * 而后再移除状态为[DownloadInfo.LOADING]的任务
     */
    fun removeAll() {
        for (entry in scopeMap) {
            val downloadScope = entry.value
            if (!downloadScope.isLoading())
                downloadScope.remove()
        }
        for (entry in scopeMap) {
            val downloadScope = entry.value
            if (downloadScope.isLoading())
                downloadScope.pause()
        }
    }

    /**
     * 启动下载任务
     * 请不要直接使用此方法启动下载任务,它是交由[DownloadScope]进行调用
     */
    fun launchScope(scope: DownloadScope) {
        if (taskScopeMap.size >= MAX_SCOPE) return
        if (taskScopeMap.contains(scope.url)) return
        taskScopeMap[scope.url] = scope
        scope.launch()
    }

    /**
     * 启动下一个任务,如果有正在等待中的任务的话
     * 请不要直接使用此方法启动下载任务,它是交由[DownloadScope]进行调用
     * @param previousUrl 上一个下载任务的下载连接
     */
    fun launchNext(previousUrl: String) {
        taskScopeMap.remove(previousUrl)
        for (entrySet in scopeMap) {
            val downloadScope = entrySet.value
            if (downloadScope.isWaiting()) {
                launchScope(downloadScope)
                break
            }
        }
    }
}

全部代码

https://gitee.com/tomato_wl/CoroutineDownlaod.git

总结

1.由于没有新建一个module,所以很多不应该暴露出来的方法都暴露出来了,不过不应该暴露出来的都已经注释注明了.
2.pauseAll和removeAll这两个方法并没有经过实际的测试
3.部分逻辑参考了开源项目OkGo,感谢!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351