Transformer 在RxJava中的使用

Transformer.jpeg

Transformer 用途

Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer,在2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。由于 RxJava2 将Observable拆分成 Observable 和 Flowable,所以多了一个FlowableTransformer。同时,Maybe是 RxJava2 新增的一个类型,所以多了MaybeTransformer。

Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。

举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。

public static <String> ObservableTransformer<Integer, java.lang.String> transformer() {
        return new ObservableTransformer<Integer, java.lang.String>() {
            @Override
            public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) {
                return upstream.map(new Function<Integer, java.lang.String>() {
                    @Override
                    public java.lang.String apply(@NonNull Integer integer) throws Exception {
                        return java.lang.String.valueOf(integer);
                    }
                });
            }
        };
    }

接下来是使用transformer()方法,通过标准的RxJava的操作。

Observable.just(123,456)
       .compose(transformer())
       .subscribe(new Consumer<String>() {
              @Override
               public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                   System.out.println("s="+s);
                    }
                });

最后打印了二次,分别是

s=123
s=456

通过这个例子,可以简单和直观地了解到Transformer的作用。

其实,在大名鼎鼎的图片加载框架 Glide 以及 Picasso 中也有类似的transform概念,能够将图形进行变换。

跟compose操作符相结合

compose操作于整个数据流中,能够从数据流中得到原始的Observable<T>/Flowable<T>...
当创建Observable/Flowable...时,compose操作符会立即执行,而不像其他的操作符需要在onNext()调用后才执行。

关于compose操作符,老外的这篇文章不错Don't break the chain: use RxJava's compose() operator
国内也有相应的翻译【译】避免打断链式结构:使用.compose( )操作符

常用的场景

1. 切换到主线程

对于网络请求,我们经常会做如下的操作来切换线程。

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

于是,我做了一个简单的封装。

import io.reactivex.FlowableTransformer
import io.reactivex.ObservableTransformer
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers

/**
 * Created by Tony Shen on 2017/7/13.
 */
object RxJavaUtils {

    @JvmStatic
    fun <T> observableToMain():ObservableTransformer<T, T> {

        return ObservableTransformer{
            upstream ->
            upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
        }
    }

    @JvmStatic
    fun <T> flowableToMain(): FlowableTransformer<T, T> {

        return FlowableTransformer{
            upstream ->
            upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
        }
    }
}

上面这段代码是Kotlin写的,为啥不用Java?个人习惯把一些工具类来用Kotlin来编写,而且使用lambda表达式也更为直观。

对于Flowable切换到主线程的操作,可以这样使用

.compose(RxJavaUtils.flowableToMain())

2. RxLifecycle中的LifecycleTransformer

trello出品的RxLifecycle能够配合Android的生命周期,防止App内存泄漏,其中就使用了LifecycleTransformer。
知乎也做了一个类似的RxLifecycle,能够做同样的事情。

在我的项目中也使用了知乎的RxLifecycle,根据个人的习惯和爱好,我对LifecycleTransformer稍微做了一些修改,将五个Transformer合并成了一个。

import org.reactivestreams.Publisher;

import io.reactivex.Completable;
import io.reactivex.CompletableSource;
import io.reactivex.CompletableTransformer;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.ObservableTransformer;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.SingleTransformer;
import io.reactivex.annotations.NonNull;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.processors.BehaviorProcessor;

/**
 * Created by Tony Shen on 2017/5/25.
 */

public class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
        FlowableTransformer<T, T>,
        SingleTransformer<T, T>,
        MaybeTransformer<T, T>,
        CompletableTransformer {

    private final BehaviorProcessor<Integer> lifecycleBehavior;

    private LifecycleTransformer() throws IllegalAccessException {
        throw new IllegalAccessException();
    }

    public LifecycleTransformer(@NonNull BehaviorProcessor<Integer> lifecycleBehavior) {
        this.lifecycleBehavior = lifecycleBehavior;
    }

    @Override
    public CompletableSource apply(Completable upstream) {
        return upstream.ambWith(
                lifecycleBehavior.filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event == LifecyclePublisher.ON_DESTROY_VIEW ||
                                event == LifecyclePublisher.ON_DESTROY ||
                                event == LifecyclePublisher.ON_DETACH;
                    }
                }).take(1).flatMapCompletable(new Function<Integer, Completable>() {
                    @Override
                    public Completable apply(Integer flowable) throws Exception {
                        return Completable.complete();
                    }
                })
        );
    }

    @Override
    public Publisher<T> apply(final Flowable<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                })
        );
    }

    @Override
    public MaybeSource<T> apply(Maybe<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                })
        );
    }

    @Override
    public ObservableSource<T> apply(Observable<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                }).toObservable()
        );
    }

    @Override
    public SingleSource<T> apply(Single<T> upstream) {
        return upstream.takeUntil(
                lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                        return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                event != LifecyclePublisher.ON_DESTROY &&
                                event != LifecyclePublisher.ON_DETACH;
                    }
                })
        );
    }
}

3. 缓存的使用

对于缓存,我们大致都会这样写

cache.put(key,value);

更优雅一点的做法是使用AOP,大致会这样写

@Cacheable(key = "...")
getValue() {
    ....
}

如果你想在RxJava的链式调用中也使用缓存,还可以考虑使用transformer的方式,下面我写了一个简单的方法

/**
 * Created by Tony Shen on 2017/7/13.
 */

public class RxCache {

    public static <T> FlowableTransformer<T, T> transformer(final String key, final Cache cache) {
        return new FlowableTransformer<T, T>() {
            @Override
            public Publisher<T> apply(@NonNull Flowable<T> upstream) {

                return upstream.map(new Function<T, T>() {
                    @Override
                    public T apply(@NonNull T t) throws Exception {
                        cache.put(key,(Serializable) t);
                        return t;
                    }
                });

            }
        };
    }
}

结合上述三种使用场景,封装了一个方法用于获取内容,在这里网络框架使用Retrofit。虽然Retrofit本身支持通过Interceptor的方式来添加Cache,但是可能某些业务场景下还是想用自己的Cache,那么可以采用下面类似的封装。

    /**
     * 获取内容
     * @param fragment
     * @param param
     * @param cacheKey
     * @return
     */
    public Flowable<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) {

        return apiService.loadVideoContent(param)
                .compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer())
                .compose(RxJavaUtils.<ContentModel>flowableToMain())
                .compose(RxCache.<ContentModel>transformer(cacheKey,App.getInstance().cache));
    }

4. 追踪RxJava的使用

初学者可能会对RxJava内部的数据流向会感到困惑,所以我写了一个类用于追踪RxJava的使用,对于调试代码还蛮有帮助的。

先来看一个简单的例子

Observable.just("tony","cafei","aaron")
                .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                        System.out.println("s="+s);
                    }
                });

下图显示了上面代码中的数据流向。


第一次做Trace.png

然后,再刚才代码的基础上加一个map操作符,把小写的字符串都转换成大写。

Observable.just("tony","cafei","aaron")
                .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                .map(new Function<String, String>() {


                    @Override
                    public String apply(@io.reactivex.annotations.NonNull String s) throws
                            Exception {
                        return s.toUpperCase();
                    }
                })
                .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                        System.out.println("s="+s);
                    }
                });

看看这一次数据是怎样流向的,由于显示器不够大,其实截图还少了一点内容:(,但是能够看明白日志的展示。


第二次做Trace.png

最后,加上监测onComlete和OnTerminate

Observable.just("tony","cafei","aaron")
                .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                .map(new Function<String, String>() {


                    @Override
                    public String apply(@io.reactivex.annotations.NonNull String s) throws
                            Exception {
                        return s.toUpperCase();
                    }
                })
                .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA))
                .compose(RxJavaUtils.<String>observableToMain())
                .compose(RxTrace.<String>logObservable("third",RxTrace.LOG_COMPLETE|RxTrace.LOG_TERMINATE))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                        System.out.println("s="+s);
                    }
                });

上面已经展示过的截图就不显示了,就展示最后的onComlete和OnTerminate。


第三次做Trace.png

最后,我已经把RxTrace的代码放到https://github.com/fengzhizi715/SAF-Kotlin-log
为何不单独开一个repository呢?它只有一个类,我就懒得创建了:(

总结

compose操作符和Transformer结合使用,一方面让代码看起来更加简洁化,另一方面能够提高代码的复用性。RxJava提倡链式调用,compose能够防止链式被打破。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容