Retrofit源码分析

Retrofit

Retrofit发起一个简单地网络请求

导入retrofit

compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile ('com.squareup.retrofit2:adapter-rxjava:2.0.0'){
    exclude group: 'io.reactivex', module: 'rxjava'
}//支持rxjava
compile ('io.reactivex:rxandroid:1.2.1'){
    exclude group: 'io.reactivex', module: 'rxjava'
}

新建一个api

public interface Api {
    @GET("api/v1/abc/def")
    Observable<ResponseBody> getTotal();

    @GET("api/v1/mnl/xyz")
    Observable<ResponseBody> getMoneyTotal();
}

初始化Retrofit和Okhttp

初始化Okhttp

private OkHttpClient initOkHttpClient(){
    OkHttpClient client = new OkHttpClient.Builder()
            .addInterceptor(new Interceptor() {
                @Override
                public Response intercept(Chain chain) throws IOException {
                    Request r = chain.request().newBuilder()
                            .addHeader("x-device-info","HUAWEI/7.0/").build();
                    return chain.proceed(r);
                }
            }).build();
    return client;
}

初始化Retrofit

Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("http://****.com/")
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
        .client(initOkHttpClient())
        .build();

RxJavaCallAdapterFactory.createWithScheduler 接受Scheduler
可以在发起请求的时候指定Schedulers.io();

发起网络请求

Api api = retrofit.create(Api.class);
Observable<ResponseBody> observable = api.getTotal();
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<ResponseBody>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onNext(ResponseBody responseBody) {
        try {
            Log.e(TAG, "call: "+ new String(responseBody.bytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});

看完上面代码产生以下疑问

  • API 接口怎么映射到对应的请求retrofit.create(Api.class)
  • ConverterFactoryConverterFactory怎么工作的
  • Retrofit怎么支持Rxjava的

解决问题最粗暴的方法断点跟踪,大致跟下代码。
Copy源代码到工程里面,导入对应的包

compile 'com.squareup.okhttp3:okhttp:3.2.0'
compile 'com.google.code.gson:gson:2.6.1'
compile 'io.reactivex:rxjava:1.2.1'
compile 'com.google.code.findbugs:jsr305:3.0.0'

流程图

  1. 先来看看create(Api.class)的代码
public <T> T create(final Class<T> service) {
 
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
      new InvocationHandler() {
        private final Platform platform = Platform.get();

        @Override public Object invoke(Object proxy, Method method, @Nullable Object[] args)
            throws Throwable {
          Log.e(TAG, "invoke: "+ method.getName() );
          // If the method is a method from Object then defer to normal invocation.
              if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
              ServiceMethod<Object, Object> serviceMethod =
              (ServiceMethod<Object, Object>) loadServiceMethod(method);
          OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
          return serviceMethod.callAdapter.adapt(okHttpCall);
        }
      });
}

这里用到了动态代理模式Proxy.newProxyInstance
每次调用到Api中的方法都会回调InvocationHandler中的invoke打印出来的log如下


image-55fe26-1514379291279)]
我们发现有toString方法也打印出来了,如果是Object的方法就直接返回当前方法自己

  1. 下来是loadMethod(Method method) 返回一个
    Adapts an invocation of an interface method into an HTTP call
    映射一个接口的调用到网络请求
    看到这个基本就能想到loadMethod是通过method初始化一个ServiceMethod

    CallAdapter<R, T>
    Adapts a {@link Call} with response type {@code R} into the type of {@code T}. Instances are
    created by {@linkplain Factory a factory} which is
    {@linkplain Retrofit.Builder#addCallAdapterFactory(Factory) installed} into the {@link Retrofit}
    instance.

    //Retrofit.loadMethod
    result = new ServiceMethod.Builder<>(this, method).build();// this是retrofit
    
    // ServiceMthod.Builder
    Builder(Retrofit retrofit, Method method) {
       this.retrofit = retrofit;
       this.method = method;
       this.methodAnnotations = method.getAnnotations();
       //获取方法的参数类型
       this.parameterTypes = method.getGenericParameterTypes();
       //获取参数的注解
       this.parameterAnnotationsArray = method.getParameterAnnotations();
    }
    

    build中主要完成以下功能

    • 通过createCallAdapter()初始化callAdapter
    • 通过createResponseConverter初始化responseConverter
    • 解析方法上面的注解parseMethodAnnotation
      • httpMethod @GET @POST
      • relativeUrl
      • headers
    • 解析参数的注解parseParameter
  2. 新建一个OkHttpCall extens Call
    Call
    An invocation of a Retrofit method that sends a request to a webserver and returns a response.
    Each call yields its own HTTP request and response pair. Use {@link #clone} to make multiple
    calls with the same parameters to the same webserver; this may be used to implement polling or
    to retry a failed call.
    <p>Calls may be executed synchronously with {@link #execute}, or asynchronously with {@link
    enqueue}. In either case the call can be canceled at any time with {@link cancel}. A call that
    is busy writing its request or reading its response may receive a {@link IOException}; this is
    working as designed.
    @param <T> Successful response body type.

public interface Call<T> extends Cloneable {
  /**
   * Synchronously send the request and return its response.
   *
   * @throws IOException if a problem occurred talking to the server.
   * @throws RuntimeException (and subclasses) if an unexpected error occurs creating the request
   * or decoding the response.
   */
  Response<T> execute() throws IOException;

  /**
   * Asynchronously send the request and notify {@code callback} of its response or if an error
   * occurred talking to the server, creating the request, or processing the response.
   */
  void enqueue(Callback<T> callback);

  /**
   * Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain
   * #enqueue(Callback) enqueued}. It is an error to execute or enqueue a call more than once.
   */
  boolean isExecuted();

  /**
   * Cancel this call. An attempt will be made to cancel in-flight calls, and if the call has not
   * yet been executed it never will be.
   */
  void cancel();

  /** True if {@link #cancel()} was called. */
  boolean isCanceled();

  /**
   * Create a new, identical call to this one which can be enqueued or executed even if this call
   * has already been.
   */
  Call<T> clone();

  /** The original HTTP request. */
  Request request();
}

重点注意execute这个真正发起请求的方法(代码在下面),而request仅仅是原生http请求时调用

OkHttpCall<Object> okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);

通过RxJavaCallAdapter.adapt

@Override public Object adapt(Call<R> call) {
  OnSubscribe<Response<R>> callFunc = isAsync
      ? new CallEnqueueOnSubscribe<>(call)
      : new CallExecuteOnSubscribe<>(call);

  OnSubscribe<?> func;
  if (isResult) {
    func = new ResultOnSubscribe<>(callFunc);
  } else if (isBody) {
    func = new BodyOnSubscribe<>(callFunc);
  } else {
    func = callFunc;
  }
  Observable<?> observable = Observable.create(func);

  if (scheduler != null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isSingle) {
    return observable.toSingle();
  }
  if (isCompletable) {
    return observable.toCompletable();
  }
  return observable;
}

定义一个CallExecuteOnSubscribe,当Observeable被订阅,回调call方法发起网络请求
同时这也可以为Observale指定线程类型

final class CallExecuteOnSubscribe<T> implements OnSubscribe<Response<T>> {
  private final Call<T> originalCall;

  CallExecuteOnSubscribe(Call<T> originalCall) {
    this.originalCall = originalCall;
  }

  @Override public void call(Subscriber<? super Response<T>> subscriber) {
    // Since Call is a one-shot type, clone it for each new subscriber.
    Call<T> call = originalCall.clone();
    CallArbiter<T> arbiter = new CallArbiter<>(call, subscriber);
    subscriber.add(arbiter);
    subscriber.setProducer(arbiter);

    Response<T> response;
    try {
      response = call.execute();
    } catch (Throwable t) {
      Exceptions.throwIfFatal(t);
      arbiter.emitError(t);
      return;
    }
    arbiter.emitResponse(response);
  }
}
/**OKHttpCall**/
@Override public Response<T> execute() throws IOException {
  okhttp3.Call call;
   synchronized (this) {
    if (executed) throw new IllegalStateException("Already executed.");
    executed = true;
     .......
    call = rawCall;
    if (call == null) {
       call = rawCall = createRawCall();
    }
  }
  if (canceled) {
    call.cancel();
  }
  return parseResponse(call.execute());
}

private okhttp3.Call createRawCall() throws IOException {
  Request request = serviceMethod.toRequest(args);
  okhttp3.Call call = serviceMethod.callFactory.newCall(request);
  return call;
}

这里的callFactory就是我们初始化的OkhttpClient

  • response = call.execute() 是阻塞的,这个是在那个线程里面跑,这个线程什么时候开启的?
  • CallArbiter deliverResponse中会分别调用 subscriber.onNext(response)和subscriber.onComplete()。按我的理解他应该直接回调MainActivity中的Subscriber.onNext
    顺序如下
CallArbiter----deliverResponse---onNext
MainActivity---Subscriber--------onNext
CallArbiter----deliverResponse---onComplete
MainActivity---Subscriber--------onComplete

加上线程可以看出因为不在一个线程里面。

MainActivity------------------开始调用API--------------- ---------------------main
Retrofit.InvocationHandler----invoke 调用;getTotal---------------------------main
ServiceMethod.Builder---------createCallAdapter------------------------------main
ServiceMethod.Builder---------createResponseConverter------------------------main
ServiceMethod.Builder---------parseMethodAnnotation -------------------------main
RxJavaCallAdapter-------------adapt -----------------------------------------main
MainActivity------------------调用API结束-------------------------------------main
MainActivity------------------已经添加订阅者-----------------------------------main
CallExecuteOnSubscribe--------call -------------------------------RxIoScheduler-2
CallArbiter-------------------request---(state=0)-----------------RxIoScheduler-2
OkHttpCall--------------------execute ----------------------------RxIoScheduler-2
CallArbiter-------------------emitResponse----(state=1)-----------RxIoScheduler-2
CallArbiter-------------------request.deliverResponse.onNext -----RxIoScheduler-2
CallArbiter-------------------equest.deliverResponse.onCompleted---RxIoScheduler-2
MainActivity.Subscriber-------onNext-----------------------------------------main
MainActivity.Subscriber-------onCompleted------------------------------------main

看到这个log基本上面那个问题:什么时候切换的线程就清楚了,当Observable添加到订阅者后,切换线程。

  • CallArbiter是干嘛的
final class CallArbiter<T> extends AtomicInteger implements Subscription, Producer {
   
    CallArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
        super(STATE_WAITING);
        this.call = call;
        this.subscriber = subscriber;
    }
 
    @Override
    public void request(long amount) {
        if (amount == 0) {
            return;
        }
        while (true) {
            int state = get();
            switch (state) {
                case STATE_WAITING:
                    if (compareAndSet(STATE_WAITING, STATE_REQUESTED)) {
                        return;
                    }
                    break; // State transition failed. Try again.

                case STATE_HAS_RESPONSE:
                    if (compareAndSet(STATE_HAS_RESPONSE, STATE_TERMINATED)) {
                        deliverResponse(response);
                        return;
                    }
                    break; // State transition failed. Try again.
                            ......
            }
        }
    }

    void emitResponse(Response<T> response) {
        while (true) {
            int state = get();
            switch (state) {
                case STATE_WAITING:
                    this.response = response;
                    if (compareAndSet(STATE_WAITING, STATE_HAS_RESPONSE)) {
                        return;
                    }
                    break; // State transition failed. Try again.

                case STATE_REQUESTED:
                    if (compareAndSet(STATE_REQUESTED, STATE_TERMINATED)) {
                        deliverResponse(response);
                        return;
                    }
                    break; // State transition failed. Try again.
                        .......
            }
        }
    }

    private void deliverResponse(Response<T> response) {
        try {
            if (!isUnsubscribed()) {
                subscriber.onNext(response);
            }
        }  
        ......
        try {
            if (!isUnsubscribed()) {
                subscriber.onCompleted();
            }
        } .......
    }

}

这个类继承Producer,查看下这个接口的定义

Interface that establishes a request-channel between an Observable and a Subscriber and allows
the Subscriber to request a certain amount of items from the Observable (otherwise known as
backpressure).
大致意思是在Observable和Subscriber之间建立一个请求通道,允许订阅者从可观察对象处请求大量项目
当Observable注册到Subscriber上时,会调用request.

public interface Producer {

    /**
     * Request a certain maximum number of items from this Producer. This is a way of requesting backpressure.
     * To disable backpressure, pass {@code Long.MAX_VALUE} to this method.
     * <p>
     * Requests are additive but if a sequence of requests totals more than {@code Long.MAX_VALUE} then
     * {@code Long.MAX_VALUE} requests will be actioned and the extras <i>may</i> be ignored. Arriving at
     * {@code Long.MAX_VALUE} by addition of requests cannot be assumed to disable backpressure. For example,
     * the code below may result in {@code Long.MAX_VALUE} requests being actioned only.**/
    void request(long n);
}//rxjava 中 Observeable是怎么注册到Subscriber上的

通过request把当前状态设置为STATE_REQUESTED
在手动调用emitResponse时,执行deliverResonse,完成回调

继承Subscription是为了把Subscriber的取消映射到Call中
继承AtomicInteger为了线程安全,按照Java编程思想里面说的,如果不是JVM的大神请不要使用这个类。这里就知道干嘛就好了
基本先这样吧,还有几个地方不太清楚,以后再补吧。
1.网络错误怎么处理的
2.超时了,怎么发起请求

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,656评论 18 139
  • # Node.js学习笔记 ## 简介 - 编写高性能网络服务器的JavaScript工具包 - 单线程、异步、事...
    LYX_Rain阅读 287评论 0 0
  • 简介 刚接触Retrofit的时候,就写了一篇简单的使用介绍:Retrofit 2.0基本使用方法,算是对Retr...
    Whyn阅读 2,844评论 4 24
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,127评论 25 707
  • ​忙了一天,你是否想要心疼下自己, 让身心脱离工作,给自己放个假? 但一直处于高强度状态中 一时间又不知道怎样去放...
    哆咗阅读 1,893评论 2 7