系统学习详见OKhttp源码解析详解系列
1 主动关闭
1.1 WebSocket 接口的 close(int code, String reason)
- 通过 WebSocket 接口的 close(int code, String reason) 我们可以关闭一个 WebSocket 连接
- 1.在执行关闭连接动作前,会先检查一下 close code 的有效性在合法范围内。关于不同 close code 的详细说明,可以参考 WebSocket 协议规范
- 2.构造一个 Close 消息放入发送消息队列。
- 3.调度 writerRunnable 执行。
- Close 消息可以带有不超出 123 字节的字符串,以作为 Close message,来说明连接关闭的原因。
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
@Override
public boolean close(int code, String reason) {
return close(code, reason, CANCEL_AFTER_CLOSE_MILLIS);
}
synchronized boolean close(int code, String reason, long cancelAfterCloseMillis) {
//先检查一下 close code 的有效性在合法范围内
validateCloseCode(code);
//构造一个 Close 消息放入发送消息队列
ByteString reasonBytes = null;
if (reason != null) {
reasonBytes = ByteString.encodeUtf8(reason);
if (reasonBytes.size() > CLOSE_MESSAGE_MAX) {
throw new IllegalArgumentException("reason.size() > " + CLOSE_MESSAGE_MAX + ": " + reason);
}
}
if (failed || enqueuedClose) return false;
// Immediately prevent further frames from being enqueued.
enqueuedClose = true;
// Enqueue the close frame.
messageAndCloseQueue.add(new Close(code, reasonBytes, cancelAfterCloseMillis));
//调度 writerRunnable 执行
runWriter();
return true;
}
private void runWriter() {
assert (Thread.holdsLock(this));
if (executor != null) {
executor.execute(writerRunnable);
}
}
}
1.2 writerRunnable 执行的 writeOneFrame() 实际发送 CLOSE 帧
- 发送 CLOSE 帧也分为主动关闭的发送还是被动关闭的发送。
- 已读取关闭帧关闭的发送,在发送完 CLOSE 帧之后,连接被最终关闭,因而,发送 CLOSE 帧之前,这里会停掉发送消息用的 executor。而在发送之后,则会通过 onClosed() 通知用户。
此Web套接字已读取关闭帧,则为-1- 未读取关闭帧关闭的发送,则在发送前会调度 CancelRunnable 的执行,发送后不会通过 onClosed() 通知用户。
此Web套接字尚未读取关闭帧,则为-1
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
public RealWebSocket(Request request, WebSocketListener listener, Random random,
long pingIntervalMillis) {
...
//初始化了 writerRunnable
this.writerRunnable = new Runnable() {
@Override
public void run() {
try {
while (writeOneFrame()) {
}
} catch (IOException e) {
failWebSocket(e, null);
}
}
};
}
boolean writeOneFrame() throws IOException {
...
synchronized (RealWebSocket.this) {
if (failed) {
return false; // websocket连接失败,跳出循环
}
writer = this.writer;
pong = pongQueue.poll();
//判断是否有pong消息
if (pong == null) {
messageOrClose = messageAndCloseQueue.poll();
//判断是否为关闭帧
if (messageOrClose instanceof Close) {
//如果此Web套接字尚未读取关闭帧,则为-1
receivedCloseCode = this.receivedCloseCode;
receivedCloseReason = this.receivedCloseReason;
if (receivedCloseCode != -1) {
//被动关闭的发送-停掉发送消息用的 executor
//已经读取到关闭帧--则再向服务端发一个关闭帧过去
//会通过 onClosed() 通知用户
streamsToClose = this.streams;
this.streams = null;
this.executor.shutdown();
} else {
//主动关闭的发送
//不会通过 onClosed() 通知用户
// When we request a graceful close also schedule a cancel of the websocket.
//当我们要求优雅的关闭时也安排取消websocket
cancelFuture = executor.schedule(new CancelRunnable(),
((Close) messageOrClose).cancelAfterCloseMillis, MILLISECONDS);
}
} else if (messageOrClose == null) {
return false; //消息队列为空,跳出循环
}
}
}
try {
if (pong != null) {
...
} else if (messageOrClose instanceof Message) {
...
} else if (messageOrClose instanceof Close) {
Close close = (Close) messageOrClose;
writer.writeClose(close.code, close.reason);
// 回调onClosed
if (streamsToClose != null) {
listener.onClosed(this, receivedCloseCode, receivedCloseReason);
}
} else {
throw new AssertionError();
}
return true;
} finally {
//释放资源
closeQuietly(streamsToClose);
}
}
final class CancelRunnable implements Runnable {
@Override
public void run() {
cancel();
}
}
final class CancelRunnable implements Runnable {
@Override
public void run() {
cancel();
}
}
@Override
public void cancel() {
call.cancel();
}
}
final class RealCall implements Call {
@Override
public void cancel() {
retryAndFollowUpInterceptor.cancel();
}
}
public final class RetryAndFollowUpInterceptor implements Interceptor {
/**
* Immediately closes the socket connection if it's currently held. Use this to interrupt an
* in-flight request from any thread. It's the caller's responsibility to close the request body
* and response body streams; otherwise resources may be leaked.
* <p>
* <p>This method is safe to be called concurrently, but provides limited guarantees. If a
* transport layer connection has been established (such as a HTTP/2 stream) that is terminated.
* Otherwise if a socket connection is being established, that is terminated.
* <p>
* 如果当前持有,立即关闭套接字连接。
* 使用它来中断来自任何线程的正在进行的请求。
* 关闭请求主体和响应主体流是调用者的责任; 否则资源可能会泄露。
* <p>
* <p>此方法可安全地同时调用,但提供有限的保证。
* 如果传输层连接已建立(例如HTTP / 2流)已终止。
* 否则,如果套接字连接正在建立,则终止。
*/
public void cancel() {
canceled = true;
StreamAllocation streamAllocation = this.streamAllocation;
if (streamAllocation != null) streamAllocation.cancel();
}
}
public final class StreamAllocation {
public void cancel() {
HttpCodec codecToCancel;
RealConnection connectionToCancel;
synchronized (connectionPool) {
canceled = true;
codecToCancel = codec;
connectionToCancel = connection;
}
if (codecToCancel != null) {
codecToCancel.cancel();
} else if (connectionToCancel != null) {
connectionToCancel.cancel();
}
}
}
2 CLOSE 的读取
2.1 readControlFrame
CLOSE 的读取在 WebSocketReader 的 readControlFrame()中:
并最终回调给RealWebSocket的onReadClose处理
final class WebSocketReader {
private void readControlFrame() throws IOException {
if (frameLength > 0) {
source.readFully(controlFrameBuffer, frameLength);
if (!isClient) {
controlFrameBuffer.readAndWriteUnsafe(maskCursor);
maskCursor.seek(0);
toggleMask(maskCursor, maskKey);
maskCursor.close();
}
}
switch (opcode) {
case OPCODE_CONTROL_PING:
frameCallback.onReadPing(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_PONG:
frameCallback.onReadPong(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_CLOSE:
int code = CLOSE_NO_STATUS_CODE;
String reason = "";
long bufferSize = controlFrameBuffer.size();
if (bufferSize == 1) {
throw new ProtocolException("Malformed close payload length of 1.");
} else if (bufferSize != 0) {
code = controlFrameBuffer.readShort();
reason = controlFrameBuffer.readUtf8();
String codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code);
if (codeExceptionMessage != null)
throw new ProtocolException(codeExceptionMessage);
}
frameCallback.onReadClose(code, reason);
closed = true;
break;
default:
throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
}
}
}
2.2 回调给RealWebSocket的onReadClose处理
读到 CLOSE 帧时,WebSocketReader 会将这一事件通知出去:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
@Override
public void onReadClose(int code, String reason) {
if (code == -1) throw new IllegalArgumentException();
Streams toClose = null;
synchronized (this) {
if (receivedCloseCode != -1) throw new IllegalStateException("already closed");
receivedCloseCode = code;
receivedCloseReason = reason;
if (enqueuedClose && messageAndCloseQueue.isEmpty()) {
toClose = this.streams;
this.streams = null;
if (cancelFuture != null) cancelFuture.cancel(false);
this.executor.shutdown();
}
}
try {
listener.onClosing(this, code, reason);
if (toClose != null) {
listener.onClosed(this, code, reason);
}
} finally {
closeQuietly(toClose);
}
}
}