当Kotlin遇见RxJava多数据源

code4android.png

温馨提醒

阅读本文最好有Kotlin基础,若没有基础,可参考之前文章Kotlin初探使用Kotlin优雅的开发Android应用,以及RxJava基础(本文基于RxJava2),当然我也会尽可能详细解释让你顺利阅读本文。

源码传送门

写在前面

最近几天回过头,看了之前的总结RxJava操作符系列,感觉对Rxjava多数据源的处理不是很理解,所以在总结学习一波。大家都知道,最近Kotlin语言一直占据热搜榜,褒贬不一,但我想说,不管有什么想法都要抛在脑后,毕竟Google爸爸出手,你不情愿也要跟随它的步伐。鉴于此,本篇对RxJava多数据源的总结是基于Kotlin语言,也让大家明白,使用Kotlin开发应用并不是不能使用Java库,现在有一部分人担心,Kotlin第三方库那么少,如果使用Kotlin开发那不是给自己找罪受,其实你完全错了,当你说这话的时候,我敢断定你都还没有接触Kotlin,因为Koltin有一个最重要的优势就是和Java绝对兼容。

多数据源处理操作符

在RxJava中多数据源处理的操作符很多,但是最经典的就要数merge,contact,zip了。如果对这三个操作符不是很熟悉的话,可以去查看它的使用,当然如果你懒得去看,我也会简单提一下。merge操作符可以处理多个Observable发送的数据,它是一个异步操作,不保证数据发送的顺序,即有可能出现数据交叉,当一个Observable发送了onError后,未执行的Observable不在继续执行,直接执行merge的onError方法。

contact操作符执行时一个同步操作,严格按照contact中传入Observable先后执行,即前面的先执行后面的后执行,并且最终发送的数据也是有序的,即第一个Observable的数据发送完毕再发送第二个,依次类推。

zip操作符和contact和merge有了本质的区别,它会将每个Observable个数据项分布对应返回一个Observable再发送,最终发送的数据量与最小数据长度相同。

使用场景分析

假如现在我们有三种商品,有一个查询商品信息的接口,根据接口可以查询该商品的价格以及出售地点。商品实体类

data class Goods(var id:Int,var price: Int, var address: String)

在Kotlin语言中,实体类创建用data class 关键词,我们不需要和Java一样创建get/set方法,只需一行代码搞定。

创建模拟网络请求

object NetRequest {
    //模拟网络请求
    fun getGoodsObservable(id: Int): Observable<Goods> {
fun getGoodsObservable(id: Int): Observable<Goods> {
        return Observable.create {
            source ->
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            source.onNext(data)
            source.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }
    }
}

在上面我们创建了一个单例类,在Kotlin中使用object修饰类时即给我们自动创建了一个单例对象。在每一句代码结尾我们不需要再和Java一样写一个分号“;”来结束,什么也不用写。

Observable.create使用的是lambda表达式,在Kotlin语言中是支持lambda表达式的。source 就是ObservableEmitter<Goods>,所以我们可以调用onNext发送数据。为了更准确的模拟网络请求,使用Thread.sleep随机的延迟,模拟网络请求的时间。

  fun getGoodsObservable(id: Int): Observable<Goods> {
        return Observable.create {
            source ->
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            source.onNext(data)
            source.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }

当然由于subscribe只有一个参数,所以我们也可以这样写。也就是省略了source ->,此时it就表示该参数数据。

return Observable.create {
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            it.onNext(data)
            it.onComplete()
            Log.e("getGoodsObservable:", "${id}")
        }

在java中实现如下

  return Observable.create(new ObservableOnSubscribe<Goods>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Goods> e) throws Exception {
             //处理逻辑
            }
        });

merge

准备好了请求操作,开始使用merge看看执行的效果。

  fun executeMerge() {
        Observable.merge(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .toList()
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }

merge中有三个网络请求操作,并通过subscribeOn(Schedulers.newThread())将网络请求切换到线程中执行,数据都请求成功后,再通过observeOn(AndroidSchedulers.mainThread())切换到主线程请求数据。为了三请求都成功后,我们在更新UI,所以通过toList()将请求的数据转换成List一块发送。在上面的subscribe依然使用的lambda表达式,subscribe({},{})中第一个括号是onSuccess回调,里面的it是接收到的List< Goods >数据,第二个括号是onError回调,it表示异常Throwable对象。
subscribe部分Java代码

.subscribe(new Consumer<List<Goods>>() {
                    @Override
                    public void accept(@NonNull List<Goods> goodses) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                });

当然如果你想使用RxJava2中onSubscribe(@NonNull Disposable d) ,你可以这样使用subscribe

.subscribe(object : SingleObserver<List<Goods>> {
                    override fun onSubscribe(d: Disposable?) {
                    }
                    override fun onError(e: Throwable?) {
                    }
                    override fun onSuccess(t: List<Goods>?) {
                    }
                })

为了观察,我们将请求成功的数据显示在界面上,我们创建一个Button,TextView。

class MainActivity : AppCompatActivity(), View.OnClickListener {

    val TAG = "MainActivity"
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        setSupportActionBar(toolbar)
        //加入这句import kotlinx.android.synthetic.main.activity_main.*
        //不用再findViewById,可直接使用
        merge.setOnClickListener(this)

    }
    override fun onClick(v: View) {
        when (v.id) {
            R.id.merge -> {
                executeMerge()
            }
        }
        //when 关键字和Java中的Switch关键词是类似的,
        //只不过它比Java中的Switch强大的多,可以接收任何参数,
        //然后判断使用,也可以如下使用
        when (v) {
            merge -> {
            }
        }
    }
}

contact

我们点击执行几次发现,返回的List的数据并不是按照merge参数的先后顺序执行的,它是并发的,最终的顺序,是由网络请求的快慢决定的,请求返回数据越快也就表示该数据最早发送,即在List中最靠前。那么此时出现一个问题,如果我想返回数据的List顺序严格按照位置的先后顺序呢?那此时使用merge的话,是不太现实了。当然前面我们提到contact可以使用。那么直接将merge更改为contact执行以下试试,

    fun executeContact() {
        Observable.concat(getGoodsObservable(1).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(2).subscribeOn(Schedulers.newThread()),
                getGoodsObservable(3).subscribeOn(Schedulers.newThread()))
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .toList()
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }

的确,发现无论执行多少次List的数据都能按照contact中Observable顺序发送,我们想要的效果可以实现了,不过你会发现,效率太差了,这是同步执行啊,只有第一个请求成功,才会去请求第二个,然后第三个,假如一次请求需要一秒,那三次请求至少三秒啊,不能忍。

zip

鉴于上面两种方式的利弊,如果我们既想如merge一样并发执行,又想和contact一样保证顺序,是不是有点强迫症的意思,当然强大的zip就能实现我们想要的效果。如下实现。.

    fun executeZip() {
        Observable.zip(getGoodsObservable(1),
                getGoodsObservable(2),
                getGoodsObservable(3),
                Function3<Goods, Goods, Goods, List<Goods>>
                { goods0, goods1, goods2 ->
                    val list = ArrayList<Goods>()
                    list.add(goods0)
                    list.add(goods1)
                    list.add(goods2)
                    list
                }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }

既然实现了,那我们运行几次,发现完美的实现了我们想要的效果,即并发的执行了,也保证了我们请求数据的顺序性。

在回调中运用RxJava

在上面我们的单个网络请求是一个同步的请求,如果我们的网络请求封装了,在线程中请求,请求成功后在主线程中回调,那我们又该如何创建呢使用呢?
先来模拟一个子线程请求网络,请求成功回调数据给主线程。

    fun getGoods(ctx:Context,id: Int,callbacks:(goods:Goods)->Unit): Unit {
        ctx.doAsync {
            Thread.sleep(Random().nextInt(1000).toLong())
            var data = Goods(id, Random().nextInt(20), "地址${id}")
            ctx.runOnUiThread {
                callbacks(data)
            }
        }
    }

getGoods传了三个参数,第一个Context对象,第二个是商品ID,第三个参数是一个函数,(goods:Goods)->Unit表示第三个参数的类型是一个参数为Goods类型并且返回Unit的函数。使用doAsync 模拟异步请求,请求成功后runOnUiThread 切换到UI线程。然后callbacks(data)将数据回调。这种使用方式比Java中回调优美好用太多了。
接下来就开始在回调成功后创建Observable

  fun getGoodsCallBack(id: Int): Observable<Goods> {
        var subscrbe: ObservableEmitter<Goods>? = null
        var o = Observable.create<Goods> {
            subscrbe = it
        }
        //Kotlin特性
        getGoods(this@MainActivity, id) {
            subscrbe?.onNext(it)
        }
        return o
    }
    fun executeZipCallBack() {
        Observable.zip(getGoodsCallBack(1).subscribeOn(Schedulers.newThread()),
                getGoodsCallBack(2).subscribeOn(Schedulers.newThread()),
                getGoodsCallBack(3).subscribeOn(Schedulers.newThread()),
                Function3<Goods, Goods, Goods, List<Goods>>
                { goods0, goods1, goods2 ->
                    val list = ArrayList<Goods>()
                    list.add(goods0)
                    list.add(goods1)
                    list.add(goods2)
                    list
                }).subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    Log.e(TAG, it.toString())
                }, {
                    Log.e(TAG, it.toString())
                })
    }

ok,到这里回调情况下创建使用RxJava也介绍完毕,到此本篇文章就结束了,有问题欢迎指出,内容杂乱,多多担待,Hava a wonderful day.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,853评论 0 10
  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善阅读 3,551评论 0 0
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,913评论 25 707
  • 很早之前就在想是不是有去中心化分布式的云存储产品,结果发现Sia云这个产品,试用了一下,发现问题多多。 首先,最大...
    Joe_Chen2017阅读 1,077评论 2 0
  • 最近的自己不知道怎么回事,总是容易哭泣。总是容易觉得自己不被在乎,也许真的是自己喜欢上了,所以才会那么无理取闹。 ...
    旅行在路上1993阅读 184评论 1 0