原生微信小程序,封装sse处理文字丢失问题

wx-sse-client
遇到的问题:

文字丢失问题

由于是收到的是数据流,前端接收到的数据块,在解析的时候发现有些是不完整的,无法解析。反应到界面上就是发现会丢失一些文字。

解决方案:保存未处理的数据,新的数据来的时候,拼接上新数据,尝试解析,解析后,保留未能解析的数据,给一下段数据使用

/**
 * SSE 客户端封装类
 * 支持自动重连、错误处理和事件监听
 * events: open, message, readystatechange, error, close, success, complete, reconnecting
 */

class SSECustomEvent {
  constructor(type, options) {
    this.type = type
    // this.responseCode = options.responseCode
    // this.headers = options.headers
  }
}

/**
 * SSE 客户端封装类
 * 支持自动重连、错误处理和事件监听
 * 参考:https://github.com/mpetazzoni/sse.js
 */
class SSEClient {
  /** @type {number} */
  /**
   * -1 初始化中
   */
  static INITIALIZING = -1
  /** @type {number} */
  /**
   * 0 连接中
   */
  static CONNECTING = 0
  /** @type {number} */
  /**
   * 1 已连接
   */
  static OPEN = 1
  /** @type {number} */
  /**
   * 2 已关闭
   */
  static CLOSED = 2

  constructor(url, options = {}) {
    options = options || {}

    // 配置项初始化
    this.url = url || '' // SSE 服务器地址
    this.reconnectInterval = options.reconnectInterval || 10000 // 重连间隔时间(毫秒)
    this.maxRetries = options.maxRetries || 5 // 最大重试次数

    this.headers = options.headers || {} // 请求头
    this.payload = options.payload !== undefined ? options.payload : '' // 请求体
    this.method = options.method || (this.payload && 'POST') || 'GET' // 请求方法
    // this.withCredentials = !!options.withCredentials
    this.debug = !!options.debug // 是否开启调试
    // this.debug = true
    // 内部状态
    this.retryCount = 0 // 当前重试次数
    this.requestTask = null // 请求任务对象
    this.listeners = {} // 事件监听器集合

    /** @type {string} */
    this.FIELD_SEPARATOR = ':' // 字段分隔符

    /** @type {number} */
    this.readyState = SSEClient.INITIALIZING // 连接状态
    // /** @type {number} */
    // this.progress = 0
    /** @type {string} */
    this.chunk = ''
    /** @type {string} */
    this.lastEventId = ''
  }

  /**
   * 建立连接
   */
  connect() {
    // 如果连接状态为连接中或已连接,则直接返回
    if (
      this.readyState === SSEClient.CONNECTING ||
      this.readyState === SSEClient.OPEN
    )
      return

    this._onConnecting()

    this.requestTask = wx.request({
      url: this.url,
      method: this.method,
      header: {
        ...this.headers,
        Accept: 'text/event-stream',
        'Cache-Control': 'no-cache'
      },
      enableChunked: true, // 开启分块传输
      success: (res) => {
        // 断开的时候回调
        if (this.debug) {
          console.debug('SSE success', res)
        }
        this._onSuccess(res)
        // // todo 处理响应码非200的错误
        // if (res.statusCode !== 200) {
        //   this._onFailure(new Error(`SSE 连接失败,响应码:${res.statusCode}`))
        //   return
        // }

        // this._onOpened()
      },
      fail: (error) => {
        if (this.debug) {
          console.debug('SSE fail', error)
        }
        this._onFailure(error)
        this._reconnect()
      },
      complete: () => {
        if (this.debug) {
          console.debug('SSE complete')
        }
        this._onComplete()
      }
    })

    console.log('requestTask:', this.requestTask)

    // 监听数据返回
    this.requestTask.onChunkReceived((response) => {
      try {
        this._onOpened()
        // if (this.debug) {
        //   console.debug('SSE onChunkReceived', this.requestTask, response)
        // }
        // 将 ArrayBuffer 转换为字符串
        const data = this._arrayBufferToString(response.data)
        // 将数据分隔为多个部分
        const parts = (this.chunk + data).split(/(\r\n\r\n|\r\r|\n\n)/g)

        /*
         * We assume that the last chunk can be incomplete because of buffering or other network effects,
         * so we always save the last part to merge it with the next incoming packet
         */
        const lastPart = parts.pop()
        // 遍历每个部分,解析事件
        parts.forEach((part) => {
          if (part.trim().length > 0) {
            if (this.debug) {
              console.debug('SSE onChunkReceived part', part)
            }
            this._dispatchEvent(this._parseEventChunk(part))
          }
        })
        this.chunk = lastPart
      } catch (error) {
        console.error('SSE onChunkReceived error', error)
      }
    })
  }

  /**
   * 关闭连接
   */
  close() {
    if (this.readyState === SSEClient.CLOSED) {
      return
    }
    if (this.requestTask) {
      this.requestTask.abort()
    }
    this._markClosed()
  }

  /**
   * 添加事件监听器
   * @param {string} event 事件名称 (message/open/close/error)
   * @param {Function} callback 回调函数
   */
  on(event, callback) {
    if (!this.listeners[event]) {
      this.listeners[event] = []
    }
    const callbacks = this.listeners[event]
    if (callbacks.indexOf(callback) === -1) {
      callbacks.push(callback)
    }
  }

  /**
   * 移除事件监听器
   * @param {string} event 事件名称
   * @param {Function} callback 回调函数
   */
  off(event, callback) {
    if (this.listeners[event]) {
      if (!callback) {
        // 移除所有事件监听器
        this.listeners[event] = []
      } else {
        const callbacks = this.listeners[event]
        const newCallbacks = callbacks.filter((cb) => cb !== callback)
        // 如果移除后没有监听器,则删除事件
        if (newCallbacks.length === 0) {
          delete this.listeners[event]
        } else {
          this.listeners[event] = newCallbacks
        }
      }
    }
  }

  /**
   * 触发事件
   * @param {SSECustomEvent} e 事件对象
   * @returns {boolean} 是否成功触发事件
   * @private
   */
  _dispatchEvent(e) {
    if (!e) {
      return true
    }

    const type = e.type
    // 设置事件源
    e.source = this

    if (this.listeners[type]) {
      // 遍历所有事件监听器,并执行回调函数
      this.listeners[type].every((callback) => {
        callback(e)
        // 如果事件被阻止,则停止遍历
        return !e.defaultPrevented
      })
    }
    return true
  }

  /**
   * 标记连接已关闭
   * @private
   */
  _markClosed() {
    // this.chunk = ''
    // this.requestTask = null
    this._setReadyState(SSEClient.CLOSED)
  }

  /**
   * 设置连接状态
   * @param {number} state 连接状态
   * @private
   */
  _setReadyState(state) {
    if (this.readyState === state) {
      return
    }
    if (this.debug) {
      console.debug('SSE _setReadyState', state, this._stateToString(state))
    }
    const event = new SSECustomEvent('readystatechange')
    event.readyState = state
    this.readyState = state
    this._dispatchEvent(event)
  }

  /**
   * 将状态转换为字符串
   * @param {number} state 状态
   * @returns {string} 状态字符串
   */
  _stateToString(state) {
    return ['初始化中', '连接中', '已连接', '已关闭'][state + 1]
  }

  /**
   * 标记连接已打开
   * @private
   */
  _onOpened() {
    if (this.readyState === SSEClient.OPEN) {
      return
    }
    this.chunk = ''
    this.retryCount = 0
    // 触发 open 事件
    const event = new SSECustomEvent('open')
    event.responseCode = this.requestTask.status
    event.headers = this.headers
    this._setReadyState(SSEClient.OPEN)
    this._dispatchEvent(event)
  }

  _onConnecting() {
    this._setReadyState(SSEClient.CONNECTING)
    const event = new SSECustomEvent('connecting')
    this._dispatchEvent(event)
  }

  /**
   * 处理失败
   * @param {Error} e 错误对象
   * @private
   */
  _onFailure(e) {
    const event = new SSECustomEvent('error')
    event.error = e
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 处理成功
   * @param {Object} e 成功对象
   * @private
   */
  _onSuccess(e) {
    const event = new SSECustomEvent('success')
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 处理完成
   * @param {*} e 完成对象
   * @private
   */
  _onComplete(e) {
    const event = new SSECustomEvent('complete')
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 重连机制
   * @private
   */
  _reconnect() {
    if (this.retryCount >= this.maxRetries) {
      const event = new SSECustomEvent('error')
      event.error = new Error('最大重试次数已达到')
      this._dispatchEvent(event)
      return
    }

    this.retryCount++
    setTimeout(() => {
      const event = new SSECustomEvent('reconnecting')
      event.retryCount = this.retryCount
      this._dispatchEvent(event)
      this.connect()
    }, this.reconnectInterval)
  }

  /**
   * 将 ArrayBuffer 转换为字符串
   * @private
   */
  _arrayBufferToString(buffer) {
    let text = decodeURIComponent(
      escape(String.fromCharCode.apply(null, new Uint8Array(buffer)))
    )
    return text
  }

  /**
   * 解析接收到的 SSE 事件块,构造事件对象
   * Parse a received SSE event chunk into a constructed event object.
   *
   * Reference: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
   */
  _parseEventChunk(chunk) {
    if (!chunk || chunk.length === 0) {
      return null
    }

    const e = { id: null, retry: null, data: null, event: null }
    chunk.split(/\n|\r\n|\r/).forEach((line) => {
      const index = line.indexOf(this.FIELD_SEPARATOR)
      let field, value
      if (index > 0) {
        // only first whitespace should be trimmed
        const skip = line[index + 1] === ' ' ? 2 : 1
        field = line.substring(0, index)
        value = line.substring(index + skip)
      } else if (index < 0) {
        // Interpret the entire line as the field name, and use the empty string as the field value
        field = line
        value = ''
      } else {
        // A colon is the first character. This is a comment; ignore it.
        return
      }

      if (!(field in e)) {
        return
      }

      // consecutive 'data' is concatenated with newlines
      if (field === 'data' && e[field] !== null) {
        e['data'] += '\n' + value
      } else {
        e[field] = value
      }
    })

    if (e.id !== null) {
      this.lastEventId = e.id
    }

    const event = new SSECustomEvent(e.event || 'message')
    event.id = e.id
    event.data = e.data || ''
    event.lastEventId = this.lastEventId
    return event
  }
}

if (typeof module !== 'undefined' && module.exports) {
  module.exports = SSEClient
}

export default SSEClient

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容