okhttp之旅(十五)--websocket连接的关闭

系统学习详见OKhttp源码解析详解系列

1 主动关闭

1.1 WebSocket 接口的 close(int code, String reason)

  • 通过 WebSocket 接口的 close(int code, String reason) 我们可以关闭一个 WebSocket 连接
  • 1.在执行关闭连接动作前,会先检查一下 close code 的有效性在合法范围内。关于不同 close code 的详细说明,可以参考 WebSocket 协议规范
  • 2.构造一个 Close 消息放入发送消息队列。
  • 3.调度 writerRunnable 执行。
  • Close 消息可以带有不超出 123 字节的字符串,以作为 Close message,来说明连接关闭的原因。
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
    @Override
    public boolean close(int code, String reason) {
        return close(code, reason, CANCEL_AFTER_CLOSE_MILLIS);
    }
    synchronized boolean close(int code, String reason, long cancelAfterCloseMillis) {
        //先检查一下 close code 的有效性在合法范围内
        validateCloseCode(code);
        //构造一个 Close 消息放入发送消息队列
        ByteString reasonBytes = null;
        if (reason != null) {
            reasonBytes = ByteString.encodeUtf8(reason);
            if (reasonBytes.size() > CLOSE_MESSAGE_MAX) {
                throw new IllegalArgumentException("reason.size() > " + CLOSE_MESSAGE_MAX + ": " + reason);
            }
        }

        if (failed || enqueuedClose) return false;

        // Immediately prevent further frames from being enqueued.
        enqueuedClose = true;

        // Enqueue the close frame.
        messageAndCloseQueue.add(new Close(code, reasonBytes, cancelAfterCloseMillis));
        //调度 writerRunnable 执行
        runWriter();
        return true;
    }
    private void runWriter() {
        assert (Thread.holdsLock(this));

        if (executor != null) {
            executor.execute(writerRunnable);
        }
    }
}

1.2 writerRunnable 执行的 writeOneFrame() 实际发送 CLOSE 帧

  • 发送 CLOSE 帧也分为主动关闭的发送还是被动关闭的发送
  • 已读取关闭帧关闭的发送,在发送完 CLOSE 帧之后,连接被最终关闭,因而,发送 CLOSE 帧之前,这里会停掉发送消息用的 executor。而在发送之后,则会通过 onClosed() 通知用户。
    此Web套接字已读取关闭帧,则为-1
  • 未读取关闭帧关闭的发送,则在发送前会调度 CancelRunnable 的执行,发送后不会通过 onClosed() 通知用户。
    此Web套接字尚未读取关闭帧,则为-1
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
    public RealWebSocket(Request request, WebSocketListener listener, Random random,
                         long pingIntervalMillis) {
        ...
        //初始化了 writerRunnable
        this.writerRunnable = new Runnable() {
            @Override
            public void run() {
                try {
                    while (writeOneFrame()) {
                    }
                } catch (IOException e) {
                    failWebSocket(e, null);
                }
            }
        };
    }
    boolean writeOneFrame() throws IOException {
          ...
        synchronized (RealWebSocket.this) {
            if (failed) {
                return false; // websocket连接失败,跳出循环
            }

            writer = this.writer;
            pong = pongQueue.poll();
            //判断是否有pong消息
            if (pong == null) {
                messageOrClose = messageAndCloseQueue.poll();
                //判断是否为关闭帧
                if (messageOrClose instanceof Close) {
                    //如果此Web套接字尚未读取关闭帧,则为-1
                    receivedCloseCode = this.receivedCloseCode;
                    receivedCloseReason = this.receivedCloseReason;
                    if (receivedCloseCode != -1) {
                        //被动关闭的发送-停掉发送消息用的 executor
                        //已经读取到关闭帧--则再向服务端发一个关闭帧过去
                        //会通过 onClosed() 通知用户
                        streamsToClose = this.streams;
                        this.streams = null;
                        this.executor.shutdown();
                    } else {
                        //主动关闭的发送
                        //不会通过 onClosed() 通知用户
                        // When we request a graceful close also schedule a cancel of the websocket.
                        //当我们要求优雅的关闭时也安排取消websocket
                        cancelFuture = executor.schedule(new CancelRunnable(),
                                ((Close) messageOrClose).cancelAfterCloseMillis, MILLISECONDS);
                    }
                } else if (messageOrClose == null) {
                    return false; //消息队列为空,跳出循环
                }
            }
        }

        try {
            if (pong != null) {
          ...

            } else if (messageOrClose instanceof Message) {
          ...

            } else if (messageOrClose instanceof Close) {
                Close close = (Close) messageOrClose;
                writer.writeClose(close.code, close.reason);

                // 回调onClosed
                if (streamsToClose != null) {
                    listener.onClosed(this, receivedCloseCode, receivedCloseReason);
                }

            } else {
                throw new AssertionError();
            }

            return true;
        } finally {
            //释放资源
            closeQuietly(streamsToClose);
        }
    }
    final class CancelRunnable implements Runnable {
        @Override
        public void run() {
            cancel();
        }
    }
    final class CancelRunnable implements Runnable {
        @Override
        public void run() {
            cancel();
        }
    }
    @Override
    public void cancel() {
        call.cancel();
    }
}
final class RealCall implements Call {
    @Override
    public void cancel() {
        retryAndFollowUpInterceptor.cancel();
    }
}
public final class RetryAndFollowUpInterceptor implements Interceptor {
    /**
     * Immediately closes the socket connection if it's currently held. Use this to interrupt an
     * in-flight request from any thread. It's the caller's responsibility to close the request body
     * and response body streams; otherwise resources may be leaked.
     * <p>
     * <p>This method is safe to be called concurrently, but provides limited guarantees. If a
     * transport layer connection has been established (such as a HTTP/2 stream) that is terminated.
     * Otherwise if a socket connection is being established, that is terminated.
     * <p>
     * 如果当前持有,立即关闭套接字连接。 
     * 使用它来中断来自任何线程的正在进行的请求。 
     * 关闭请求主体和响应主体流是调用者的责任; 否则资源可能会泄露。
     * <p>
     * <p>此方法可安全地同时调用,但提供有限的保证。
     * 如果传输层连接已建立(例如HTTP / 2流)已终止。
     * 否则,如果套接字连接正在建立,则终止。
     */
    public void cancel() {
        canceled = true;
        StreamAllocation streamAllocation = this.streamAllocation;
        if (streamAllocation != null) streamAllocation.cancel();
    }
}
public final class StreamAllocation {
    public void cancel() {
        HttpCodec codecToCancel;
        RealConnection connectionToCancel;
        synchronized (connectionPool) {
            canceled = true;
            codecToCancel = codec;
            connectionToCancel = connection;
        }
        if (codecToCancel != null) {
            codecToCancel.cancel();
        } else if (connectionToCancel != null) {
            connectionToCancel.cancel();
        }
    }
}

2 CLOSE 的读取

2.1 readControlFrame

CLOSE 的读取在 WebSocketReader 的 readControlFrame()中:
并最终回调给RealWebSocket的onReadClose处理

final class WebSocketReader {
    private void readControlFrame() throws IOException {
        if (frameLength > 0) {
            source.readFully(controlFrameBuffer, frameLength);

            if (!isClient) {
                controlFrameBuffer.readAndWriteUnsafe(maskCursor);
                maskCursor.seek(0);
                toggleMask(maskCursor, maskKey);
                maskCursor.close();
            }
        }

        switch (opcode) {
            case OPCODE_CONTROL_PING:
                frameCallback.onReadPing(controlFrameBuffer.readByteString());
                break;
            case OPCODE_CONTROL_PONG:
                frameCallback.onReadPong(controlFrameBuffer.readByteString());
                break;
            case OPCODE_CONTROL_CLOSE:
                int code = CLOSE_NO_STATUS_CODE;
                String reason = "";
                long bufferSize = controlFrameBuffer.size();
                if (bufferSize == 1) {
                    throw new ProtocolException("Malformed close payload length of 1.");
                } else if (bufferSize != 0) {
                    code = controlFrameBuffer.readShort();
                    reason = controlFrameBuffer.readUtf8();
                    String codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code);
                    if (codeExceptionMessage != null)
                        throw new ProtocolException(codeExceptionMessage);
                }
                frameCallback.onReadClose(code, reason);
                closed = true;
                break;
            default:
                throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
        }
    }
}

2.2 回调给RealWebSocket的onReadClose处理

读到 CLOSE 帧时,WebSocketReader 会将这一事件通知出去:

public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
    @Override
    public void onReadClose(int code, String reason) {
        if (code == -1) throw new IllegalArgumentException();

        Streams toClose = null;
        synchronized (this) {
            if (receivedCloseCode != -1) throw new IllegalStateException("already closed");
            receivedCloseCode = code;
            receivedCloseReason = reason;
            if (enqueuedClose && messageAndCloseQueue.isEmpty()) {
                toClose = this.streams;
                this.streams = null;
                if (cancelFuture != null) cancelFuture.cancel(false);
                this.executor.shutdown();
            }
        }

        try {
            listener.onClosing(this, code, reason);

            if (toClose != null) {
                listener.onClosed(this, code, reason);
            }
        } finally {
            closeQuietly(toClose);
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容