Retrofit 2.1 + Rxjava 源码解析(一)

1.创建Retrofit对象


OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
                .client(okHttpClient.build())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(base_url)
                .build();

这里是普通的 Retrofit 对象创建过程,传入一些必要的参数:okHttpClientconverterFactorycallAdapterFactory(不搭配 Rxjava 的时候使用 Retrofit 默认的 callAdapterFactory,什么都不做),baseUrl 。

这里特别要注意的是传入了 RxJavaCallAdapterFactory.create() 这个RxjavaCallAdapter 对象,这个对象将彻底改变 Retrofit 的使用。使得 Retrofit 搭配 Rxjava 变成可能,不得不佩服 Retrofit 作者的编程功底,开放 CallAdapterFactory 这个接口,使 Retrofit 的灵活性更高。

2.创建接口的动态代理对象

给出实验的接口

public interface NetApiService {

    //post请求
    @FormUrlEncoded
    @POST("{url}")
    Observable<ResponseBody> executePost(
            @Path("url") String url,
            @Field("params") String params,
            @Field("signature") String signature
    );

}
netApiService = retrofit.create(NetApiService .class);  //返回一个动态代理对象

这里也是 Retrofit 神奇的地方,传入一个接口,就可以生成实现了这个接口的对象,当然这个只是 Java 代码生成的动态代理对象。下面我们进
create() 方法看看。

public <T> T create(final Class<T> service) {
    Utils.validateServiceInterface(service);  //验证外部传进的“服务”接口是否合法
    if (validateEagerly) {
      eagerlyValidateMethods(service);  //根据validateEagerly判断是否对接口中的全部方法进行缓存
    }
    //使用Proxy工厂类返回一个泛型动态代理实例。
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
        new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        });
  }

在这里我们首先对传入的接口进行检验是否是接口,然后根据 validateEagerly 判断是否对接口中的全部方法进行缓存,最后我们用 java.lang.reflect.Proxy; 创建一个泛型的动态代理对象,返回这个对象。(不懂 java动态代理技术 的同学别着急,我会在文末给出参考资料)

3.创建Observable

Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);

调用动态代理对象的接口方法,这时候会调用

new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        }

InvocationHandlerinvoke() 方法,在这里我们可以看到有三个参数:
proxy 表示通过 Proxy.newProxyInstance() 生成的代理类对象。
method 表示代理对象被调用的函数。
args 表示代理对象被调用的函数的参数。
调用代理对象的每个函数实际最终都是调用了 InvocationHandlerinvoke 函数。

由于这个是接口的方法,所以不会进第一个 if ,因为也不是默认方法,所以也不会进第二个 if 。这样就可以看到我们的代理对象在调用了接口的方法后实际上是 new 了一个 okHttpCall<> 对象,然后将这个对象作为参数传进了 callAdapter.adapt(); 方法中

由于我们之前传入的是 RxJavaCallAdapterFactory.create() ,所以我们深入 RxJavaCallAdapterFactory.java 看看构造 Observable 的方法,在adapt()可以看到:

static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
    private final Type responseType;
    private final Scheduler scheduler;

    ResponseCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType() {
      return responseType;
    }

    @Override public <R> Observable<Response<R>> adapt(Call<R> call) {
      Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
  }

在这里我们看到,这里将传入的 okHttp 对象 作为参数,构造了 CallOnSubscribe 对象
CallOnSubscribe 是何方神圣???按照 Rxjava 构造 Observable 方法来说,这个 CallOnSubscribe 应该是一个实现了Observable.OnSubscribe<T> 接口的对象。

我们看看源码,果然如此。

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
      subscriber.add(requestArbiter);
      subscriber.setProducer(requestArbiter);
    }
  }

看到这里,相信你大概也懂了为什么调用生成的动态代理对象的接口方法不像只使用 Retrofit 那样返回一个 okHttpCall<> 对象,而是返回一个 Observable<ResponseBody> 对象。其实这就是
RxJavaCallAdapterFactory 做的转换。

仔细看 CallOnSubscribe<T>call() 方法,我们发现这里的 subscriber (其实是调用 subscribe() 方法时传进来的 subscriber ,就是外部的观察者)添加了一个 requestArbiter 对象。这个对象很重要,在 subscriber.setProducer(requestArbiter); 时,它会控制
okHttpCall对象 直接联网获取数据,然后回调给观察者 subscriber

4.observable.subscribe(subscriber);订阅

这里的代码不多,就一行 observable.subscribe(subscriber); 。我们仔细看看在 subscribe 方法里面发生了什么神奇的事?(提前剧透一下,Observable.OnSubscribe<T>对象 很棒,它就相当于桥梁,将 Observable 和 Observer 连接起来)

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
    //判断传进来的参数,即是观察者对象,被观察者对象,是否为空。
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        
        // new Subscriber so onStart it
       //重要的操作,可以在订阅之前做一些准备工作
        subscriber.onStart();
        
        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

在这段代码中,我们看到 subscribe() 过程中,会先调用 onStart() ,一般这个方法在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart()就不适用了(因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe()
方法)(先记住这个知识点,请保留,现在先不管线程切换)

高能来了!!!
我们重点看看这句代码:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
 public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }

其实 onSubscribeStart() 方法直接返回了 onSubscribe 对象,然后直接调用 onSubscribecall(subscriber) 方法。记得我们刚刚分析,这里其实是调用了 CallOnSubscribe<T>对象call() 方法。也就是在这里进行了联网获取数据,然后回调 Subscriber 观察者的方法。(具体的代码就是 call() 方法的subscriber.setProducer(requestArbiter);)。

public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

最后会调用 producer.request(toRequest); 方法。
这个 request() 方法,就是 RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);request()

static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
    private final Call<T> call;
    private final Subscriber<? super Response<T>> subscriber;

    RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
      this.call = call;
      this.subscriber = subscriber;
    }

    @Override public void request(long n) {
      if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
      if (n == 0) return; // Nothing to do when requesting 0.
      if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
        Response<T> response = call.execute();
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);
        }
        return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }

    @Override public void unsubscribe() {
      call.cancel();
    }

    @Override public boolean isUnsubscribed() {
      return call.isCanceled();
    }
  }

Response<T> response = call.execute(); 这里进行了网络请求;
if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); } 这里进行回调。

接下来的 onError()onCompleted() 方法的回调一样的,就不分析了。

至此,我们就完整地了解了 Retrofit + Rxjava 中从创建 Observable 和 Observer 到 Observable 订阅 Observer 的流程,以及中间隐藏的联网和回调的过程。

参考资料

Retrofit 2.1 源码分析
Java 动态代理技术
Rxjava 源码分析一

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容