- 设计模式:责任链模式
- 定义:责任链模式是一种对象的行为模式。在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任。
-
结构:
- 如上结构图:责任链模式涉及到角色:
* 抽象处理者角色:接口或者抽象类主要两个作用一是行为处理api另一个是持有下一个处理者对象api。
* 具体处理者角色:上面的抽象角色的具体实现类,主要是处理api的实现,在api的实现中除了当前处理者的处理逻辑还有对下一个处理者的引用。 - 模式demo代码参考下面的参考文章。
-
模式执行的流程图如下:
- 模式使用场景:
* 对于行为,有多个对象处理且对象串行处理,比较常见的就是审核系统,审核系统多级审核且下一级通常在上一级批准的基础上,还有就是这次主题okhttp的请求设计模式。
* 对于行为,有多个对象处理但是不确认是谁处理,需要轮询一遍或者多遍看看谁能处理,场景常见也是上面的审核系统,不同等级有不同的权限,针对同一次请求权限不同能处理的对象也不同,此时责任链就很合适,自己能处理就处理处理不了交给下一个去处理,直到找到能处理的对象为止。 - okhttp框架中责任链模式的不同点:
* 具体使用建议先浏览理解参考文章的okhttp责任链实现demo,理解后直接去看下面的生命周期的实现,看完后再回来对证不同点。
* okhttp和普通责任链不同的是:oper和client之间添加了一层链(RealInterceptorChain)封装,单个oper并不像普通责任链模式那样持有下一级对象,而是在这一层链封装中通过list持有了所有的oper处理者及其提供对应的oper方法,在oper方法中利用维护的index遍历下一个oper调用其真正的oper方法去执行,oper的处理api方法不变还是做自己的处理和下一个对象的调用(oper持有连封装对象直接调用其oper方法即可),这样的优点是:
* 解耦,oper对象更多关注自己的实现然后将下一级的调用交给链封装层处理。
* 单个oper对象不需要关心自己的下一级处理对象,对象之间的处理顺序和关系单个对象不需要了解都是封装在链封装对象中,这样即时有改变也只需要在封装曾修改即可不需要修改每一个对象的链接关系。
- 单次请求生命周期
-
okhttp外层调用api如下(同步和异步):
-
由上很明显看到很重要的一个api:newCall,进入源码内可以看到:
由代码可知:接受Request封装参数返回一个RealCall对象,对象api列表如下:
-
同步请求访问:
* 源码execute方法的具体实现:
如图标注:整个api主要进行了四个操作:
* 1:通过锁的方式让新创建的RealCall对象仅执行一次
* 2:接口访问调用前的回调执行(start回调)
* 3:将本次接口执行添加到okhttp接口调度器中去(接口调度更多可参考本文第三小段)
* 4:接口访问的核心实现。
* 由api中的四个流程可知主要逻辑集中在3和4两个点上,由于同步请求3这个点的实现只是把当次请求添加到正在请求的同步队列中,所以主要的核心逻辑集中在4这个点中。
-
同步接口请求的核心实现:
* 看示意图很明显的责任链设计模式实现,即:
* Interceptor:抽象处理封装接口,接口封装了处理api(intercptor)同时又封装一个处理责任链各个链接点调用匿名内部类接口chain。
* BridgeIntercptor等处理器实现了Intercptor接口是真正的单个链接点的实现逻辑类,类中封装了chain实现类对象,在逻辑处理过程中需要传递给下一个链接点的时候直接调用chain的process方法转交给chain去实现。
* RealIntercptorChain是责任链各个链接点逻辑调度的具体实现类,类中持有一个处理器容器列表和遍历index,processapi中通过index可以遍历到每一个处理器调用其intertor方法去执行,需要注意的是下一个处理调用是在当前的Intercptor中发起的。
* client:责任链调用者,在类中组建都是哪些处理器处理接口访问逻辑封装到chain中并发起第一个处理器去处理。
* 当前责任链模式的场景是接口访问的各个逻辑串行执行,理论上可以写一个很庞大的代码块实现,但是使用责任链设计模式可以将其庞大逻辑拆分,每一个链接点只关注自己需要处理得即可
* RealIntercptorChain的优势上面介绍责任链的时候已描述,更好的解耦了各个链接点,各个链接点不持有具体的下一个链接点更好的关注自己的实现即可。
* 责任链模式具有更好框架扩展性,在接口访问的过程中最外层用户需要额外处理逻辑只需要实现自定义处理器然后扔给client即可
* 系统处理器(拦截器)介绍:
* BridgeInterceptor:桥接拦截器,主要处理用户请求参数和网络请求参数转换,接口访问前将用户参数配置成为网络header的格式,访问后相反将接口header网络格式参数再转化回去。
* RetryAndFollowUpInterceptor:接口失败重试和重定向实现,具体功能后续文章分析实现。
* CacheInterceptor:处理接口请求数据的缓存逻辑
* ConnectInterceptor:处理接口访问的连接请求,创建一个针对目标服务器的连接。
* CallServerInterceptor:责任链的最后一个链接点,负责接口的具体访问和数据的获取 。
* interceptors/networkInterceptors:用户外置拦截器,需要注意的是这两个拦截器的插入时机都在CallServerInterceptor以前,具体实现可以参考下面的源码分析。 - 通过源码进行验证上面的逻辑
* client代码,即RealCall中的getResponseWithInterceptorChain()实现:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
* 看代码可以了解到做了三件事:一是创建各个拦截器包括用户的拦截器第二是姜拦截器封装到拦截器调度类RealInterceptorChain中去最后是调用process()方法启动第一个拦截器。
* 需要特别注意的是用户拦截器的配置时机,用户拦截器分为两种:普通和网络,普通在最初时候添加到list中去,但是net是在发起正式前添加到list中去,原因是系统拦截器针对接口header等已做特殊处理,倘若用户需要修改设置需要在系统拦截器后面,不然会被系统重置。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
* IIntercptor的具体实现类,由于系统拦截器过多,不再一一描述,此处列举一个是ConnectInterceptor,另一个是最后拦截器CallServerInterceptor:
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
* 综上可看到普通的链接拦截器持有chain并在合适的时机通过chain的process启动下一个拦截器,而CallServerInterceptor作为最后一个拦截器则是直接发起请求并将响应数据返回到上一个拦截器,上一个拦截器处理后依次返回知道最终返回到client。
* 接口访问异步请求放在下一个章节结合okhttp的异步调度去解释说明。
- 请求异步调度
-
okhttp的异步调度控制
* 异步调度线程池如下:
* 所有线程优先级一样,不存在核心线程
* 线程的数量级很大是整型最大值
* 线程逻辑执行完成空闲60s会被回收
* 超出线程最大值后任务会被放入同步队列中这个由于线程数量级很大结合项目同时请求不会超过这个值所以在okhttp中这个队列暂时无用。
* 异步调度请求数量控制:
-
okhttp的异步接口请求流程
* enqueue()前面的逻辑操作可以参考execute(),下面从enqueue()开始分析。
* enqueue()代码如下:
* 类同execute方法,前面代码逻辑处理了不重复加载和接口回调最后将异步执行代码交给okhttp的调度器(dispatcher)处理。
* 调度器(dispatcher)中 enqueue() 代码如下:
* 代码很简洁做了两件事:校验异步请求是否超标,未超标直接交给线程池去处理,超标放给等待队列等待已请求的释放以后再次发起请求。
* 没有使用线程池最大线程数控制我的理解是异步控制除了最大64请求控制还有针对同一个服务器的最大5个请求限制,很显然线程池单个限制做不到双向控制。
* 调度代码可知很明显异步调用核心代码是AsyncCall对象的实现,即:
备注:很熟悉的代码结构,这段代码主要处理了两个方面,一是调用实现,另一个方面是对响应数据的处理,第一个标注代码和同步请求一致就不再描述。
- 异步调用在Android中属于耗时操作,必须在异步线程处理,通常异步调用可以使用Okhttp内部调度和封装接口外部异步调用处理那么两者的区别:
* 逻辑上没有区别,都是通过线程池实现的,内部虽有64和5的限制但是这两个数字都是可配的,即时外部异步封装也不可能无限制服务同时请求。
* 外部异步封装调用更灵活一些,特别是现在rxjava的使用,首先能够更好的和其他逻辑一样交给同一套异步框架处理,其次接口访问前可能有其他的耗时操作在外部可以封装在同一个线程,使用内部比较受限制,可能需要线程中启动线程。 -
okhttp的同步调度
* 调度器(dispatcher)针对同步调度仅仅是持有一个正在运行的任务容器列表,提供了添加,移除,取消,重新运行(单个/批量任务)即:
参考文章:
责任链模式
彻底理解okhttp的设计思想