期待很久的大神的第三篇文章终于出来了,现在翻译一下作为记录供以后学习使用。
原文地址:https://proandroiddev.com/modern-android-development-with-kotlin-part-3-8721fb843d1b
现在有了一些新版本的依赖
- Gradle wrapper 4.1. 现在是稳定版
- 最新的kotlin版本是:1.2.20
- Android体系结构组件现在是稳定的1.0.0,现在已弃用LifecycleActivity。我们现在应该使用AppCompatActivity。
- 现在我们使用targetSdkVersion 27 , buildToolsVersion 27.0.2
Android官方文档中将会同时提供Kotlin和Java的示例。
RxJava是什么
这不只是关于RxJava,这个概念更加广泛。 RxJava只是用于异步编程的API的Java实现 与可观察的流,reactivex
。实际上,它是由三个概念组成的:Observer模式,Iterator模式和函数式编程。有其他编程语言的库:RxSwift,RxNet,RxJs ...
是否真的需要
不,RxJava只是您可以在Android开发中使用的一个库。kotlin不是必须的,databinding也不是必须的,他们都只是像其他库一样帮助你开发而已。
学习RxJava2之前需要学习RxJava1吗
你可以从RxJava2开始。不过,作为一名Android开发人员,要了解这两个的原因是,您可以参与维护其他人的RxJava1代码。
我发现了RxAndroid,我应该使用RxAndroid还是RxJava
Rxjava可以被用在任何的java开发中,而不只是Android开发,RxJava可以和Spring框架一起用于后端开发,RxAndroid是一个包含在Android中使用RxJava所需的库。如果你想在Android开发中使用RxJava,你必须添加一个库RxAndroid。稍后,我将解释RxAndroid在RxJava的基础上添加的内容。
我们在使用Kotlin开发,为什么不用RxKotlin
不需要写额外的库,因为java的所有代码都被kotlin支持,但是的确存在 RxKotlin库,但是这个库只是在RxJava的基础上编写了一些功能,所以你可以使用RxJava和kotlin,而不必要使用RxKotlin。
添加RxJava2到项目中
dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.1.8"
implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
...
}
RxJava都包含什么
我喜欢将RxJava分成三个部分
- 包含观察者模式和数据流的类:Observables and Observers
- Schedulers ( concurrency )
- 数据流的操作符
Observables and Observers
我们已经解释了这种模式,你可以把Observable当做数据源,把Observer当做获取数据的。
有很多种方式可以用来创建observables,其中最简单的一种就是Observable.just()用来获取一个item并创建Observables来发送item。
让我们修改GitRepoRemoteDataSource中的getRepositories方法返回Observable:
class GitRepoRemoteDataSource {
fun getRepositories() : Observable<ArrayList<Repository>> {
var arrayList = ArrayList<Repository>()
arrayList.add(Repository("First from remote", "Owner 1", 100, false))
arrayList.add(Repository("Second from remote", "Owner 2", 30, true))
arrayList.add(Repository("Third from remote", "Owner 3", 430, false))
return Observable.just(arrayList).delay(2,TimeUnit.SECONDS)
}
}
Observable<ArrayList<Repository>>意味着Observable发送Repository对象的数组,如果你想发送Repository对象的话,使用Observable.from(arrayList).
.delay(2,TimeUnit.SECONDS)表示发送数据之前等待两秒。
但是我们并没有说Observable什么时候开始发送数据,一旦Observer开始订阅了Observable就开始发送数据。
现在我们就不需要下面的接口了:
interface OnRepoRemoteReadyCallback {
fun onRemoteDataReady(data: ArrayList<Repository>)
}
让我们对GitRepoLocalDataSource做同样的更改
class GitRepoLocalDataSource {
fun getRepositories() : Observable<ArrayList<Repository>> {
var arrayList = ArrayList<Repository>()
arrayList.add(Repository("First From Local", "Owner 1", 100, false))
arrayList.add(Repository("Second From Local", "Owner 2", 30, true))
arrayList.add(Repository("Third From Local", "Owner 3", 430, false))
return Observable.just(arrayList).delay(2, TimeUnit.SECONDS)
}
fun saveRepositories(arrayList: ArrayList<Repository>) {
//todo save repositories in DB
}
}
现在 我们需要修改我们的repository返回Observable
class GitRepoRepository(private val netManager: NetManager) {
private val localDataSource = GitRepoLocalDataSource()
private val remoteDataSource = GitRepoRemoteDataSource()
fun getRepositories(): Observable<ArrayList<Repository>> {
netManager.isConnectedToInternet?.let {
if (it) {
//todo save those data to local data store
return remoteDataSource.getRepositories()
}
}
return localDataSource.getRepositories()
}
}
如果有网络连接,我们从远程数据源返回Observable,否则我们从本地数据源返回它。而且,当然,也不需要OnRepositoryReadyCallback接口。
我们需要修改在MainViewModel中获取数据的方式。现在我们应该从gitRepoRepository获取Observable并订阅它。一旦我们订阅Observer的观察者,Observable将开始发射数据:
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
fun loadRepositories() {
isLoading.set(true)
gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
override fun onSubscribe(d: Disposable) {
//todo
}
override fun onError(e: Throwable) {
//todo
}
override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}
override fun onComplete() {
isLoading.set(false)
}
})
}
}
当Observable发送数据的时候,onNext方法会被调用,一旦Observable完成发射,onComplete会被调用,之后Observable会被终止。
一旦出现错误,onError()会被调用,然后终止发射,这意味着不会再发送数据,onNext和onComplete都不会被调用
如果你尝试订阅已经终止的Observable,就会得到IllegalStateException。
RxJava能帮助我们什么?
- 首先,我们可以摆脱那些接口
- 如果我们使用接口,并且如果我们的数据层发生错误,我们的应用程序可能会崩溃。使用RxJava错误将在onError()方法中返回,以便我们可以向用户显示相应的错误消息。
- 使我们的数据层看起来更清晰
- 以前的做法可能导致内存泄漏。
使用RxJava2和ViewModel如何避免内存溢出
[图片上传失败...(image-e4dc58-1517130055266)]
一旦Activity被销毁,VideModel的onCleared方法将会被调用,在onCleared方法中我们可以dispose所有的Disposables。
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
lateinit var disposable: Disposable
fun loadRepositories() {
isLoading.set(true)
gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
override fun onSubscribe(d: Disposable) {
disposable = d
}
override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}
override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}
override fun onComplete() {
isLoading.set(false)
}
})
}
override fun onCleared() {
super.onCleared()
if(!disposable.isDisposed){
disposable.dispose()
}
}
}
我们可以改进这些代码
首先,我们可以使用实现Disposable的DisposableObserver而不是Observer,并使用dispose()方法。所以我们不需要onSubscribe()方法,因为我们可以直接在DisposableObserver实例上调用dispose()。
其次,相较于.subscribe方法返回的Void,我们使用.subscribeWith()方法返回Observer
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
lateinit var disposable: Disposable
fun loadRepositories() {
isLoading.set(true)
disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {
override fun onError(e: Throwable) {
// todo
}
override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}
override fun onComplete() {
isLoading.set(false)
}
})
}
override fun onCleared() {
super.onCleared()
if(!disposable.isDisposed){
disposable.dispose()
}
}
}
我们可以接着改进这部分代码:
我们保存Disposable实例,一旦onCleared被调用的时候可以直接dispose,但是如果我们有很多的数据层逻辑,每个都需要做相同的操作。
现在有一个更聪明的方法,我们可以将他们都存储到一起,一旦onCleared方法调用,我们可以使用CompositeDisposable来同时disopose。
CompositeDisposable是Disposable的容器,一次可以存储多个Disposable
每次我们只要创建的Disposable的时候就将其添加CompositeDisposable
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
private val compositeDisposable = CompositeDisposable()
fun loadRepositories() {
isLoading.set(true)
compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {
override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}
override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}
override fun onComplete() {
isLoading.set(false)
}
}))
}
override fun onCleared() {
super.onCleared()
if(!compositeDisposable.isDisposed){
compositeDisposable.dispose()
}
}
}
多谢Kotlin的扩展方法可以让我们我们改进的更多
让我们创建一个新的extensions包,然后创建RxExtensions.kt
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
add(disposable)
}
现在我们将Disposable添加到CompositeDisposable使用+=符号。
class MainViewModel(application: Application) : AndroidViewModel(application) {
...
private val compositeDisposable = CompositeDisposable()
fun loadRepositories() {
isLoading.set(true)
compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {
override fun onError(e: Throwable) {
//if some error happens in our data layer our app will not crash, we will
// get error here
}
override fun onNext(data: ArrayList<Repository>) {
repositories.value = data
}
override fun onComplete() {
isLoading.set(false)
}
})
}
override fun onCleared() {
super.onCleared()
if (!compositeDisposable.isDisposed) {
compositeDisposable.dispose()
}
}
现在我们来尝试运行该应用程序。一旦你点击加载数据按钮,应用程序将在两秒钟内崩溃。然后,如果你去看日志,你会看到,在onNext方法内发生的错误和异常的原因是:
java.lang.IllegalStateException: Cannot invoke setValue on a background thread
为什么会这样
Schedulers ( concurrency )
RxJava带有调度程序,可以让我们选择执行哪个线程代码。更准确地说,我们可以使用subscribeOn()方法选择哪个线程执行可观察操作,哪个线程执行Observer操作将使用observeOn()方法。通常,所有来自数据层的代码都应该在后台线程上运行。例如,如果我们使用Schedulers.newThread(),调度程序将在每次调用它时提供新的线程。为了简单起见,还有其他来自调度程序的方法,我不会在这个博客文章中介绍。
你已经知道所有的UI代码都需要在Android主线程中执行,Rxjava是Java库所以他不知道Android的主线程,这就是我们为什么要使用RxAndroid,RxAndroid使我们有可能选择Android主线程作为我们的代码将被执行的线程。显然,我们的Observer应该在Android主线程上运行。
接下来进一步修改:
...
fun loadRepositories() {
isLoading.set(true)
compositeDisposable += gitRepoRepository
.getRepositories()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {
...
})
}
...
接下来再运行程序,一切OK
更多的observables类型
- Single<T>:Observable只发出一条数据或者是错误
- Maybe<T>:不发射数据,发射一条数据或者错误
- Completable:发射onSuccess事件或者错误
- Flowable<T>:和Observable<T>相似,发送n条数据,没有数据或者是error,Observable不支持背压,但是Flowable支持
什么是背压
记住一些概念,我喜欢用现实生活来举例:
[站外图片上传中...(image-c18c5c-1517130055266)]
我认为这是一个漏斗。如果你想填充的更多,那么你的瓶颈会首先被填满,同样的事情在这里。有时你的观察者不能处理它正在接收的事件的数量,所以需要放慢速度。
你可以在RxJava文档中找到更多的内容。
操作符
关于RxJava的很酷的事情是它的操作符。在RxJava中只需要一行代码就可以解决一些需要10行以上的常见问题。操作符可以帮助我们。
- 合并observables
- 过滤
- 作条件判断
- 转换observables到另外一种类型
我将举一个例子,让我们完成在GitRepoLocalDataSource中保存数据。因为我们正在保存数据,所以我们需要Completable来模拟它。假设我们也想模拟1秒的延迟。真正天真的做法是:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
return Completable.complete().delay(1,TimeUnit.SECONDS)
}
为什么天真呢?
Completable.complete()返回一个在订阅时立即完成的Completable实例:
一旦你的Completable完成,它将被终止。所以,任何操作符(延迟是其中的一个操作符)将不会被执行。在这种情况下,我们的Completable将不会有任何延迟。让我们找到办法:
fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable()
}
Single.just(1)将会创建一个Single并且发送一个1,因为我们使用了delay(1,TimeUnit.SECONDS),所以发送回延迟一秒。
toCompletable()返回一个Completable,放弃Single的结果,并在此源Single调用onSuccess时调用onComplete。
所以,这段代码将返回Completable,它会在1秒后调用onComplete。
现在我们需要修改GitRepoRepository,我们检查网络连接。如果有网络连接,我们必须从远程数据源获取数据,将其保存在本地数据源中并返回数据。否则,我们只从本地数据源获取数据。看一看:
fun getRepositories(): Observable<ArrayList<Repository>> {
netManager.isConnectedToInternet?.let {
if (it) {
return remoteDataSource.getRepositories().flatMap {
return@flatMap localDataSource.saveRepositories(it)
.toSingleDefault(it)
.toObservable()
}
}
}
return localDataSource.getRepositories()
}
使用.flatMap,一旦remoteDataSource.getRepositories()发射数据,该数据将被映射到发出相同数据的新Observable。我们从Completable创建的新的Observable将数据存储到本地并且返回发射同样数据的Single,但是我们需要将Single转换成Observable。