封装WebSocket:
import Starscream
import SwiftyJSON
// MARK: - WebSocketManagerDelegate
protocol WebSocketManagerDelegate: class {
/// 已建立连接,包括正常连接和自动重新连接两种情况。
func webSocketManagerDidConnect(manager: WebSocketManager)
/// 已断开连接,包括正常和非正常断开连接两种情况。参数 `isReconnecting` 表示是否处于等待重新连接状态。
func webSocketManagerDidDisconnect(manager: WebSocketManager, error: Error?, isReconnecting: Bool)
}
// MARK: - WebSocketManager
class WebSocketManager {
/// 收到新的打印异常
var hasNewPrintException = false
/// 尚未收到过打印异常
var hasNotReceivedPrintException = true
weak var delegate: WebSocketManagerDelegate?
var enableLog = false
private var isHeartbeatTimeout = false
private var heartbeatInterval: TimeInterval = 5
private var heartbeatTimeout: TimeInterval = 10
private var heartbeatTimestamp: TimeInterval = 0 // 毫秒
private var reconnectRetryCount = 0
private var maxReconnectRetryCount = 3
private var reconnectOperation: DispatchWorkItem?
private var reconnectRetryInterval: TimeInterval = 5
private var error: Error?
private(set) var state = State.closed {
didSet {
stateChanged(oldState: oldValue)
}
}
private var isHeartbeatTimerSuspended = true
private lazy var heartbeatTimer: DispatchSourceTimer = {
let timer = DispatchSource.makeTimerSource(queue: .main)
timer.schedule(deadline: .now(), repeating: heartbeatInterval, leeway: .milliseconds(100))
timer.setEventHandler { [weak self] in
guard let `self` = self else { return }
if !self.isHeartbeatTimerSuspended {
self.sendHeartbeat()
}
}
return timer
}()
private let url: URL
private lazy var socket: WebSocket = {
let socket = WebSocket(url: url)
socket.delegate = self
return socket
}()
deinit {
disconnect()
destroyHeartbeatTimer()
}
init(url: URL) {
self.url = url
}
}
// MARK: - 连接相关
extension WebSocketManager {
enum State {
/// 等待重连中
case reconnecting
/// 正在连接中
case connecting
/// 已连接
case connected
/// 断开连接中
case closing
/// 已断开
case closed
}
/// 只在 `state` 值为 `closed` 时有效果。
func connect() {
if state == .closed {
state = .connecting
}
}
/// 只在 `state` 值为 `reconnecting` 或 `connecting` 或 `connected` 时有效果。
func disconnect() {
switch state {
case .reconnecting, .connecting, .connected:
state = .closing
case .closing, .closed:
break
}
}
private func reconnect() {
guard state == .reconnecting else { return }
guard reconnectRetryCount < maxReconnectRetryCount else {
state = .closed
return
}
delegate?.webSocketManagerDidDisconnect(manager: self, error: error, isReconnecting: true)
// webSocketManagerDidDisconnect 方法中可能会进行 disconnect 操作
guard state == .reconnecting else { return }
reconnectOperation = DispatchWorkItem { [weak self] in
guard let `self` = self else { return }
self.increaseReconnectRetryCount()
self.clearReconnectOperation()
self.state = .connecting
}
if reconnectRetryCount == 0 {
reconnectOperation?.perform()
} else {
DispatchQueue.main.asyncAfter(deadline: .now() + reconnectRetryInterval, execute: reconnectOperation!)
}
}
private func clearReconnectOperation() {
reconnectOperation = nil
}
private func cancelAndClearReconnectOperation() {
reconnectOperation?.cancel()
clearReconnectOperation()
}
private func increaseReconnectRetryCount() {
reconnectRetryCount = min(reconnectRetryCount + 1, maxReconnectRetryCount)
}
private func resetReconnectRetryCount() {
reconnectRetryCount = 0
}
private func stateChanged(oldState: State) {
switch state {
case .connecting:
handleConnectingState()
case .connected:
handleConnectedState()
case .reconnecting:
handleReconnectingState()
case .closing:
handleClosingState(oldState: oldState)
case .closed:
handleClosedState(oldState: oldState)
}
}
private func handleConnectingState() {
socket.connect()
}
private func handleConnectedState() {
resumeHeartbeatTimer()
resetReconnectRetryCount()
delegate?.webSocketManagerDidConnect(manager: self)
}
private func handleReconnectingState() {
suspendHeartbeatTimer()
reconnect()
}
private func handleClosingState(oldState: State) {
switch oldState {
case .connecting, .connected:
suspendHeartbeatTimer()
socket.disconnect()
case .reconnecting:
state = .closed
case .closing, .closed:
fatalError()
}
}
private func handleClosedState(oldState: State) {
switch oldState {
case .closing: // 主动断开
resetReconnectRetryCount()
cancelAndClearReconnectOperation()
if isHeartbeatTimeout {
isHeartbeatTimeout = false
state = .reconnecting // 心跳超时重连
} else {
delegate?.webSocketManagerDidDisconnect(
manager: self,
error: error,
isReconnecting: false)
}
case .reconnecting: // 重连次数上限
resetReconnectRetryCount()
delegate?.webSocketManagerDidDisconnect(
manager: self,
error: error,
isReconnecting: false)
case .connected, .connecting, .closed:
fatalError()
}
}
}
// MARK: - 心跳相关
private extension WebSocketManager {
func sendHeartbeat() {
if socket.isConnected {
updateHeartbeatTimestamp()
socket.write(string: "\(heartbeatTimestamp)")
}
}
func updateHeartbeatTimestamp() {
heartbeatTimestamp = Date().timeIntervalSince1970 * 1_000
}
private func handleHeartbeatTimeout() {
isHeartbeatTimeout = true
disconnect()
}
func resumeHeartbeatTimer() {
if isHeartbeatTimerSuspended {
isHeartbeatTimerSuspended = false
heartbeatTimer.resume()
}
}
func suspendHeartbeatTimer() {
if !isHeartbeatTimerSuspended {
isHeartbeatTimerSuspended = true
heartbeatTimer.suspend()
}
}
func destroyHeartbeatTimer() {
heartbeatTimer.cancel()
resumeHeartbeatTimer()
}
}
// MARK: - 消息处理
extension WebSocketManager {
enum MessageType: String {
case beat
case print
fileprivate func messageHandler(_ manager: WebSocketManager) -> (JSON) -> Void {
switch self {
case .beat:
return WebSocketManager.handleBeatMessage(manager)
case .print:
return WebSocketManager.handlePrintMessage(manager)
}
}
}
private func handleBeatMessage(_ msg: JSON) {
// {"msgType":"beat","ts":"1536134255660.87"}
guard let ts = Double(msg["ts"].stringValue) else {
return
}
// 超时重连
if ts - heartbeatTimestamp > heartbeatTimeout {
handleHeartbeatTimeout()
}
}
private func handlePrintMessage(_ msg: JSON) {
hasNewPrintException = true
hasNotReceivedPrintException = false
NotificationCenter.default.post(name: .PrintExceptionDidReceive, object: nil)
}
}
// MARK: - WebSocketDelegate
extension WebSocketManager: WebSocketDelegate {
func websocketDidConnect(socket: WebSocketClient) {
log("websocketDidConnect")
state = .connected
}
func websocketDidDisconnect(socket: WebSocketClient, error: Error?) {
log("websocketDidDisconnect error: \(error as Any)")
self.error = error
switch state {
case .connecting, .connected:
state = .reconnecting // 断线重连
case .closing:
state = .closed // 主动关闭
case .closed, .reconnecting:
fatalError()
}
}
func websocketDidReceiveMessage(socket: WebSocketClient, text: String) {
log("websocketDidReceiveMessage text: \(text)")
if state == .connected {
let message = JSON(parseJSON: text)
MessageType(rawValue: message["msgType"].stringValue)?.messageHandler(self)(message)
}
}
func websocketDidReceiveData(socket: WebSocketClient, data: Data) {
log("websocketDidReceiveData \n \(String(data: data, encoding: .utf8)!)")
}
}
extension WebSocketManager {
func log(
_ log: @autoclosure () -> String = "",
file: String = #file,
line: Int = #line,
function: String = #function)
{
if enableLog {
print("\(function) at \((file as NSString).lastPathComponent)[\(line)]", log())
}
}
}