本文以 Retrofit 整合 RxJava 为例,介绍 Retrofit 的工作原理,使用 Retrofit 2.7.1 版本,不同版本的源码有所差异,但思路是类似的。
Retrofit 相关依赖如下:
implementation 'com.squareup.retrofit2:retrofit:2.7.1'
implementation 'com.squareup.retrofit2:converter-gson:2.7.1'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.1'
RxJava 相关依赖如下:
implementation 'io.reactivex.rxjava2:rxjava:2.2.17'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
一、Retrofit 整合 RxJava 基本使用
基本的使用步骤如下:
// 创建描述网络请求的接口
interface ApiService {
@GET("banner/json")
fun banner(): Observable<BaseResponse<List<BannerBean>>>
}
// 创建 retrofit 对象
val retrofit : Retrofit = Retrofit.Builder()
// 设置数据适配器工厂
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
// 设置数据解析器工厂
.addConverterFactory(GsonConverterFactory.create())
.baseUrl(Const.WAN_ANDROID_URL)
.build()
// 创建service对象
val service : ApiService = retrofit.create(ApiService::class.java)
// 创建0bservable对象
val observable : Observable<BaseResponse<List<BannerBean>>> = service.banner()
// 创建Observer对象
val observer = object : Observer<BaseResponse<List<BannerBean>>> {
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: BaseResponse<List<BannerBean>>) {}
override fun onComplete() {}
override fun onError(e: Throwable) {}
}
// 订阅、请求结果处理
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
通过简单的配置就可以进行网络请求了,可能好奇的你并不满足与此,更关心背后的原理,已经有了些许疑问:
-
retrofit.create()
,是如何使用接口来创建 service 对象的 -
service.banner()
的背后发生了什么 -
Retrofit
是如何与OkHttp
协作的 - 数据适配器工厂
RxJava2CallAdapterFactory
、数据解析器工厂GsonConverterFactory
的作用是什么,如何与Retrofit
协作 - ......
二、原理分析
工作原理的分析结合上边的例子,但是不同的用法、配置可能会导致流程的分析有所差别。
1、生成代理对象
我们以retrofit.create()
作为分析的入口,它把网络请求配置的接口转换成一个实现了该接口的对象。里边核心的技术就是JDK动态代理:
// Retrofit.java
public <T> T create(final Class<T> service) {
// 检查接口的合法性
validateServiceInterface(service);
// 返回代理对象
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
// 获取平台信息,这里得到的是 Android 平台
private final Platform platform = Platform.get();
// 定义空的参数数组
private final Object[] emptyArgs = new Object[0];
// 每次调用接口中的方法都会走该方法,相当于就是接口方法的实现,例如service.banner()
// proxy:代理对象,method:当前调用的接口方法,args:方法的参数
@Override public @Nullable Object invoke(Object proxy, Method method,
@Nullable Object[] args) throws Throwable {
// 如果是 Object 类中的方法则不做额外处理
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
// 如果是 Java8 接口的默认实现方法,同样不做额外处理
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
// 这才是我们需要重点关心的地方,网络请求的具体实现、处理就在里边了
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
}
});
}
到这里我们需要明白两点:
- Retrofit 通过JDK动态代理技术,使用ApiService接口生成了一个实现该接口的代理对象。
- 调用ApiService接口中的方法,会执行上边
InvocationHandler
的invoke()
方法,相当于是对原接口中方法的具体实现,返回的是一个bservable
对象
create()
方法还是比较简单的,其实重要的内容在这里:
loadServiceMethod(method).invoke(args != null ? args : emptyArgs)
它最终返回一个0bservable
对象,这个过程会组织网络请求、解析响应结果、将响应结果发送给订阅0bservable
的0bserver
这里我们需要拆成两步来看:loadServiceMethod()
、invoke()
分别做了什么。
2、网络请求的准备阶段
loadServiceMethod()
完成的主要任务就是解析网络请求接口方法上的注解信息,得到数据适配器、数据解析器对象,最终封装一个CallAdapted
继承了ServiceMethod
的对象返回来,我们来一步一步看:
// Retrofit.java
ServiceMethod<?> loadServiceMethod(Method method) {
// 先从缓存中找
ServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) return result;
// 没找到
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
// 创建ServiceMethod对象
result = ServiceMethod.parseAnnotations(this, method);
// 放入缓存
serviceMethodCache.put(method, result);
}
}
return result;
}
很显然,重点是parseAnnotations()
:
// ServiceMethod.java
abstract class ServiceMethod<T> {
static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
// 解析接口中调用的方法
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
......
return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
}
// 第二步要分析的invoke()方法就是这个
abstract @Nullable T invoke(Object[] args);
}
RequestFactory.parseAnnotations(retrofit, method)
是重要的一步,它会去解析ApiService
中对应方法上的注解信息、参数上的注解信息,准备网络请求的必要信息。
接下来又调用了HttpServiceMethod
的parseAnnotations()
,HttpServiceMethod
继承了ServiceMethod
:
// HttpServiceMethod.java
static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
Retrofit retrofit, Method method, RequestFactory requestFactory) {
boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
......
Annotation[] annotations = method.getAnnotations();
......
// 得到接口方法的返回值信息
adapterType = method.getGenericReturnType();
......
// 得到数据适配器对象
// 还记得创建retrofit对象时设置的数据适配器工厂吗,最终就是通过工厂创建的
CallAdapter<ResponseT, ReturnT> callAdapter =
createCallAdapter(retrofit, method, adapterType, annotations);
......
// 得到数据解析器对象,最终就是通过数据解析器工厂创建的
Converter<ResponseBody, ResponseT> responseConverter =
createResponseConverter(retrofit, method, responseType);
okhttp3.Call.Factory callFactory = retrofit.callFactory;
if (!isKotlinSuspendFunction) {
// 返回一个CallAdapted对象
return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
} else if (continuationWantsResponse) {
......
} else {
......
}
}
之前例子中创建retrofit
对象时设置了RxJava数据适配器、Gson解析器工厂,现在要从工厂中得到对应的数据适配器、解析器,最后将这些数据封装到CallAdapted
里。
HttpServiceMethod
还重写了ServiceMethod
的invoke()
方法:
// HttpServiceMethod.java
@Override final @Nullable ReturnT invoke(Object[] args) {
// 创建一个OkHttpCall对象
Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
// adapt()是HttpServiceMethod中的一个抽象方法
return adapt(call, args);
}
OkHttpCall
是个关键的类,网络请求的具体实现就在里边了,后边具体分析。
CallAdapted
是HttpServiceMethod
的静态内部类,又继承了HttpServiceMethod
,并实现了HttpServiceMethod
的adapt()
方法:
// HttpServiceMethod.java
static final class CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT> {
private final CallAdapter<ResponseT, ReturnT> callAdapter;
CallAdapted(RequestFactory requestFactory, okhttp3.Call.Factory callFactory,
Converter<ResponseBody, ResponseT> responseConverter,
CallAdapter<ResponseT, ReturnT> callAdapter) {
super(requestFactory, callFactory, responseConverter);
this.callAdapter = callAdapter;
}
@Override protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
// 这里调用到了之前得到的数据适配器的adapt()方法,注意这里的参数call是OkHttpCall类型的
return callAdapter.adapt(call);
}
}
它的adapt
方法最终调用了RxJava数据适配器的adapt()
方法
至此,loadServiceMethod()
方法的执行流程就结束了,最终返回的就是一个继承了HttpServiceMethod
的CallAdapted
对象。同时准备好了 OkHttpCall、数据适配器、以及数据解析器。
3、数据适配器
结合之前的分析,调用ServiceMethod
的invoke()
方法最终就是调用RxJava数据适配器的adapt()
方法,参数就是OkHttpCall
。
数据适配器对象是通过数据适配器工厂RxJava2CallAdapterFactory
创建的:
// RxJava2CallAdapterFactory.java
@Override public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
......
return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable,
isSingle, isMaybe, false);
}
RxJava2CallAdapter
的adapt()
方法都做了些什么呢:
// RxJava2CallAdapter.java
@Override public Object adapt(Call<R> call) {
// isAsync为false
// call对象类型是OkHttpCall
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
// isBody为true,会执行这里
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
......
// 调用关联的hook函数。大致的意思就是,
// 如果之前调用过RxJavaPlugins.setOnObservableAssembly()设置了一个observable
// 则会用设置的observable替换掉onAssembly()方法传入的observable
// 当然这里并不会,还是会返回上边创建的observable对象
return RxJavaPlugins.onAssembly(observable);
}
所以adapt()
方法会返回一个Observable
对象,但还是太过粗略了,我们需要进一步探究该对象的奥秘。
按照上边的分析,应该先执行new CallExecuteObservable<>(call)
,得到responseObservable
对象,再执行new BodyObservable<>(responseObservable)
,对其进一步封装,所以我们先看CallExecuteObservable
类是如何实现的:
final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}
// Observable订阅Observer时会执行该方法
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
Call<T> call = originalCall.clone();
// 将call对象封装成CallDisposable,以具有取消请求的功能
CallDisposable disposable = new CallDisposable(call);
// 产生了订阅关系
observer.onSubscribe(disposable);
if (disposable.isDisposed()) {
return;
}
boolean terminated = false;
try {
// call对象类型是OkHttpCall,execute()方法会执行OkHttp的同步网络请求
Response<T> response = call.execute();
if (!disposable.isDisposed()) {
// 将请求结果发送给observer
observer.onNext(response);
}
if (!disposable.isDisposed()) {
terminated = true;
// 通知observer,请求结果发送结束
observer.onComplete();
}
} catch (Throwable t) {
......
try {
// 通知observer,在网络请求、发送结果过程中出现了异常
observer.onError(t);
} catch (Throwable inner) {
......
}
}
}
}
private static final class CallDisposable implements Disposable {
private final Call<?> call;
private volatile boolean disposed;
CallDisposable(Call<?> call) {
this.call = call;
}
@Override public void dispose() {
disposed = true;
call.cancel();
}
@Override public boolean isDisposed() {
return disposed;
}
}
}
CallExecuteObservable
类的核心就是subscribeActual()
方法,当Observer
订阅Observable
时会执行该方法,即调用subscribe()
方法时。
在subscribeActual()
方法里会进行网络请求,并将结果传递给Observer
。
那为什么BodyObservable
类对responseObservable
要进一步封装呢?去里边找找答案:
final class BodyObservable<T> extends Observable<T> {
private final Observable<Response<T>> upstream;
BodyObservable(Observable<Response<T>> upstream) {
this.upstream = upstream;
}
// Observable订阅Observer时会执行该方法
@Override protected void subscribeActual(Observer<? super T> observer) {
// new BodyObserver<T>(observer),将RxJava订阅时,我们自定义的Observer重新封装
upstream.subscribe(new BodyObserver<T>(observer));
}
private static class BodyObserver<R> implements Observer<Response<R>> {
private final Observer<? super R> observer;
private boolean terminated;
BodyObserver(Observer<? super R> observer) {
this.observer = observer;
}
@Override public void onSubscribe(Disposable disposable) {
observer.onSubscribe(disposable);
}
// 将CallExecuteObservable中,observer.onNext(response)这一步拿到的请求结果进一步处理
@Override public void onNext(Response<R> response) {
// 请求成功
if (response.isSuccessful()) {
// 只将响应体发送给我们自定义的observer
observer.onNext(response.body());
} else {
terminated = true;
Throwable t = new HttpException(response);
try {
// 将异常信息发送给我们自定义的Observer
observer.onError(t);
} catch (Throwable inner) {
......
}
}
}
@Override public void onComplete() {
if (!terminated) {
observer.onComplete();
}
}
@Override public void onError(Throwable throwable) {
if (!terminated) {
observer.onError(throwable);
} else {
......
}
}
}
}
可以看到BodyObservable
类的主要作用就是通过BodyObserver
类增强我们自定义的observer
功能,使其可以对CallExecuteObservable
中observer.onNext(response)
拿到的响应结果进一步处理,只取出响应体的数据发送给我们自定义的observer
。
到这里数据适配器的主要任务就结束了,就是为整合RxJava做准备,调用Retrofit封装的OkHttpCall
执行网络请求,并将结果发送给observer
。
但还有些事情我们没搞清楚,那就是OkHttpCall
,在CallExecuteObservable
中执行它的call.execute()
方法时,内部做了些什么,内部是如何通过OkHttp进行网络请求的。
4、OkHttp网络请求
OkHttpCall
的execute()
方法还是比较简单的:
// OkHttpCall.java
@Override public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
......
call = rawCall;
if (call == null) {
try {
// 创建OkHttp的Call对象
call = rawCall = createRawCall();
} catch (IOException | RuntimeException | Error e) {
throwIfFatal(e); // Do not assign a fatal error to creationFailure.
creationFailure = e;
throw e;
}
}
}
if (canceled) {
call.cancel();
}
// call.execute(),调用OkHttp的同步网络请求方法
// parseResponse(),解析请求结果
return parseResponse(call.execute());
}
所以OkHttpCall
的execute()
方法内部会构建OkHttp
的Call
对象,并来发起网络请求,但为什么是同步的请求呢?因为我们已经使用RxJava切换到子线程。这样Retrofit
就和OkHttp
关联上了。
5、数据解析器
OkHttpCall
的parseResponse()
的作用就是使用Gson将响应体的JSON字符串转换成指定对象。
// OkHttpCall.java
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();
int code = rawResponse.code();
if (code < 200 || code >= 300) {
......
}
if (code == 204 || code == 205) {
......
}
ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
try {
// 通过指定的数据解析器来解析响应体
T body = responseConverter.convert(catchingBody);
// 返回请求结果
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
......
}
}
关键的就是responseConverter.convert(catchingBody)
,responseConverter
就是通过配置Retrofit时设置的GsonConverterFactory
数据解析器工厂得到的,对应的数据解析器就是GsonRequestBodyConverter
:
final class GsonRequestBodyConverter<T> implements Converter<T, RequestBody> {
......
@Override public RequestBody convert(T value) throws IOException {
Buffer buffer = new Buffer();
Writer writer = new OutputStreamWriter(buffer.outputStream(), UTF_8);
JsonWriter jsonWriter = gson.newJsonWriter(writer);
adapter.write(jsonWriter, value);
jsonWriter.close();
return RequestBody.create(MEDIA_TYPE, buffer.readByteString());
}
}
就是通过Gson解析响应体,转换成指定类型的对象。如果响应体是加密的,可以在这里先做解密工作,再做Gson解析。
三、最后
到这里Retrofit
结合RxJava
的工作原理基本就结束了,Retrofit
就是让开发者通过接口注解的形式描述网络请求,然后来解析接口,将网络请求封装到OkHttpCall
里;数据适配器就是用Retrofit
提供的OkHttpCall
来组织网络请求,想怎么玩数据适配器自己决定就好了;数据解析器就相对简单了,把网络请求的结果转换成需要的格式即可。Retrofit
、数据适配器、数据解析器,它们之间的职责划分还是很明确的,几乎没有业务上的耦合,完全是可插拔式的。