Android开源库Retrofit中RxJava的工作流程分析

写这么一篇文章主要是为了解惑,我们都知道Retrofit可以配合RxJava一起使用,而且那种链式的调用简直了,但是一直有个疑惑:

getObservable().subscribe(new Observer<String>() {
       @Override
       public void onNext(String value) {
           // ... 得到数据
       }
   })

看上面那段伪代码之后我们都知道Observable是需要subscribe才会真正执行的,那么Retrofit是怎么实现这个流程的呢?不然老是能得到数据却不懂的怎么来的,所以为了解读这一脸的懵逼只能从源码中去寻找答案。

简单使用

val mRetrofit: Retrofit = Retrofit.Builder()
        .baseUrl(HttpUrl.parse(Constant.URL))
        .client(okHttpBuilder.build())
        .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
        .addConverterFactory(GsonConverterFactory.create(GsonBuilder().create()))
        .build()

嗯嗯,只要加了RxJava2CallAdapterFactory.createWithScheduler之后就能愉快的结合RxJava一起使用了:

GankHttpRequest.instance.mApiService.getAndroidData()
        .compose(Constant.transformer())
        .subscribe(object : Consumer<MutableList<AndroidResult>> {
            override fun accept(t: MutableList<AndroidResult>?) {
                callback?.onHttpSuccess(t)
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
            }
        })

更多例子源码查看:https://github.com/Neacy/GankKotlin
总是很佩服Square开源的项目,因为解决了很多难题,以上就是RetrofitRxJava的简单效果。

开始分析

我们直接奔主题进入口开始分析mRetrofit.create(ApiService::class.java)也就是Retrofit中的create方法:

  public <T> T create(final Class<T> service) {
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
        new InvocationHandler() {
          private final Platform platform = Platform.get();
          @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
              throws Throwable {
            ServiceMethod<Object, Object> serviceMethod =
                (ServiceMethod<Object, Object>) loadServiceMethod(method);
            OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        });
  }

这里重点分析loadServiceMethod方法,点进源码可以看到主要执行new ServiceMethod.Builder<>(this, method).build()经过一系列折腾最后回到Retrofit中的nextCallAdapter方法:

public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType,
    Annotation[] annotations) {
  checkNotNull(returnType, "returnType == null");
  checkNotNull(annotations, "annotations == null");

  int start = adapterFactories.indexOf(skipPast) + 1;
  for (int i = start, count = adapterFactories.size(); i < count; i++) {
    CallAdapter<?, ?> adapter = adapterFactories.get(i).get(returnType, annotations, this);
    if (adapter != null) {
      return adapter;
    }
  }
}

这里主要是调用adapterFactories.get(returnType, annotations, this)这里的adapterFactories就是我们初始化传进来的RxJava2CallAdapterFactory类所以很自然get方法执行之后返回的是RxJava2CallAdapter,很好;终于看到跟主题相关的Rx开头的类了。

执行完ServiceMethod的初始化后代码继续走:

OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);

首先我们明白一点serviceMethod.callAdapter也就是我们前面返回的RxJava2CallAdapter对象,所以自然进入该类中的adapt方法:

  @Override public Object adapt(Call<R> call) {
    Observable<Response<R>> responseObservable = isAsync
        ? new CallEnqueueObservable<>(call)
        : new CallExecuteObservable<>(call);

    Observable<?> observable;
    if (isResult) {
      observable = new ResultObservable<>(responseObservable);
    } else if (isBody) {
      observable = new BodyObservable<>(responseObservable);
    } else {
      observable = responseObservable;
    }

    if (scheduler != null) {
      observable = observable.subscribeOn(scheduler);
    }

    if (isFlowable) {
      return observable.toFlowable(BackpressureStrategy.LATEST);
    }
    if (isSingle) {
      return observable.singleOrError();
    }
    if (isMaybe) {
      return observable.singleElement();
    }
    if (isCompletable) {
      return observable.ignoreElements();
    }
    return observable;
  }

Observable好刺眼的词,这不就是RxJava的类嘛,有点慌仿佛就要结束了。
这里简单点我们只挑CallExecuteObservable来分析,这个类的代码不长直接贴上来看看:

final class CallExecuteObservable<T> extends Observable<Response<T>> {
  private final Call<T> originalCall;

  CallExecuteObservable(Call<T> originalCall) {
    this.originalCall = originalCall;
  }

  @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
    // Since Call is a one-shot type, clone it for each new observer.
    Call<T> call = originalCall.clone();
    observer.onSubscribe(new CallDisposable(call));

    boolean terminated = false;
    try {
      Response<T> response = call.execute();
      if (!call.isCanceled()) {
        observer.onNext(response);
      }
      if (!call.isCanceled()) {
        terminated = true;
        observer.onComplete();
      }
    } catch (Throwable t) {
      Exceptions.throwIfFatal(t);
      if (terminated) {
        RxJavaPlugins.onError(t);
      } else if (!call.isCanceled()) {
        try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(new CompositeException(t, inner));
        }
      }
    }
  }

  private static final class CallDisposable implements Disposable {
    private final Call<?> call;

    CallDisposable(Call<?> call) {
      this.call = call;
    }

    @Override public void dispose() {
      call.cancel();
    }

    @Override public boolean isDisposed() {
      return call.isCanceled();
    }
  }
}

那什么时候开始执行呢?
这时候我们需要回头看一下Retrofit.create这里用到了动态代理所以再invokeserviceMethod.callAdapter.adapt(okHttpCall)就是把RxJava2CallAdapter中的Observable返回回去,所以:

当我们代码中调用subscribe的时候会执行Observable.subscribeActual,回头看看这方法中做了什么:

Response<T> response = call.execute();// 使用OkHttp执行接口请求
if (!call.isCanceled()) {
  observer.onNext(response);
}
if (!call.isCanceled()) {
  terminated = true;
  observer.onComplete();
}

很轻松地我们界面就得到了Retrofit中的Observable发射出来的数据了然后我们就可以做任何处理了。
我们再回头看看一下RxJava2CallAdapter.adapt方法:

Observable<?> observable;
if (isResult) {
  observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
  observable = new BodyObservable<>(responseObservable);
} else {
  observable = responseObservable;
}

if (scheduler != null) {
  observable = observable.subscribeOn(scheduler);
}

看到observable被各种条件进行赋值,不过我们知道了CallExecuteObservable这个是怎么发射数据了现在再回头看已经很清晰了。

综上:
整个流程其实就是RxJava2CallAdapterFactory -> RxJava2CallAdapter -> xxxxxxObservable -> onNext

例子的源码地址: https://github.com/Neacy/GankKotlin

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容