原生微信小程序,封装sse处理文字丢失问题

wx-sse-client
遇到的问题:

文字丢失问题

由于是收到的是数据流,前端接收到的数据块,在解析的时候发现有些是不完整的,无法解析。反应到界面上就是发现会丢失一些文字。

解决方案:保存未处理的数据,新的数据来的时候,拼接上新数据,尝试解析,解析后,保留未能解析的数据,给一下段数据使用

/**
 * SSE 客户端封装类
 * 支持自动重连、错误处理和事件监听
 * events: open, message, readystatechange, error, close, success, complete, reconnecting
 */

class SSECustomEvent {
  constructor(type, options) {
    this.type = type
    // this.responseCode = options.responseCode
    // this.headers = options.headers
  }
}

/**
 * SSE 客户端封装类
 * 支持自动重连、错误处理和事件监听
 * 参考:https://github.com/mpetazzoni/sse.js
 */
class SSEClient {
  /** @type {number} */
  /**
   * -1 初始化中
   */
  static INITIALIZING = -1
  /** @type {number} */
  /**
   * 0 连接中
   */
  static CONNECTING = 0
  /** @type {number} */
  /**
   * 1 已连接
   */
  static OPEN = 1
  /** @type {number} */
  /**
   * 2 已关闭
   */
  static CLOSED = 2

  constructor(url, options = {}) {
    options = options || {}

    // 配置项初始化
    this.url = url || '' // SSE 服务器地址
    this.reconnectInterval = options.reconnectInterval || 10000 // 重连间隔时间(毫秒)
    this.maxRetries = options.maxRetries || 5 // 最大重试次数

    this.headers = options.headers || {} // 请求头
    this.payload = options.payload !== undefined ? options.payload : '' // 请求体
    this.method = options.method || (this.payload && 'POST') || 'GET' // 请求方法
    // this.withCredentials = !!options.withCredentials
    this.debug = !!options.debug // 是否开启调试
    // this.debug = true
    // 内部状态
    this.retryCount = 0 // 当前重试次数
    this.requestTask = null // 请求任务对象
    this.listeners = {} // 事件监听器集合

    /** @type {string} */
    this.FIELD_SEPARATOR = ':' // 字段分隔符

    /** @type {number} */
    this.readyState = SSEClient.INITIALIZING // 连接状态
    // /** @type {number} */
    // this.progress = 0
    /** @type {string} */
    this.chunk = ''
    /** @type {string} */
    this.lastEventId = ''
  }

  /**
   * 建立连接
   */
  connect() {
    // 如果连接状态为连接中或已连接,则直接返回
    if (
      this.readyState === SSEClient.CONNECTING ||
      this.readyState === SSEClient.OPEN
    )
      return

    this._onConnecting()

    this.requestTask = wx.request({
      url: this.url,
      method: this.method,
      header: {
        ...this.headers,
        Accept: 'text/event-stream',
        'Cache-Control': 'no-cache'
      },
      enableChunked: true, // 开启分块传输
      success: (res) => {
        // 断开的时候回调
        if (this.debug) {
          console.debug('SSE success', res)
        }
        this._onSuccess(res)
        // // todo 处理响应码非200的错误
        // if (res.statusCode !== 200) {
        //   this._onFailure(new Error(`SSE 连接失败,响应码:${res.statusCode}`))
        //   return
        // }

        // this._onOpened()
      },
      fail: (error) => {
        if (this.debug) {
          console.debug('SSE fail', error)
        }
        this._onFailure(error)
        this._reconnect()
      },
      complete: () => {
        if (this.debug) {
          console.debug('SSE complete')
        }
        this._onComplete()
      }
    })

    console.log('requestTask:', this.requestTask)

    // 监听数据返回
    this.requestTask.onChunkReceived((response) => {
      try {
        this._onOpened()
        // if (this.debug) {
        //   console.debug('SSE onChunkReceived', this.requestTask, response)
        // }
        // 将 ArrayBuffer 转换为字符串
        const data = this._arrayBufferToString(response.data)
        // 将数据分隔为多个部分
        const parts = (this.chunk + data).split(/(\r\n\r\n|\r\r|\n\n)/g)

        /*
         * We assume that the last chunk can be incomplete because of buffering or other network effects,
         * so we always save the last part to merge it with the next incoming packet
         */
        const lastPart = parts.pop()
        // 遍历每个部分,解析事件
        parts.forEach((part) => {
          if (part.trim().length > 0) {
            if (this.debug) {
              console.debug('SSE onChunkReceived part', part)
            }
            this._dispatchEvent(this._parseEventChunk(part))
          }
        })
        this.chunk = lastPart
      } catch (error) {
        console.error('SSE onChunkReceived error', error)
      }
    })
  }

  /**
   * 关闭连接
   */
  close() {
    if (this.readyState === SSEClient.CLOSED) {
      return
    }
    if (this.requestTask) {
      this.requestTask.abort()
    }
    this._markClosed()
  }

  /**
   * 添加事件监听器
   * @param {string} event 事件名称 (message/open/close/error)
   * @param {Function} callback 回调函数
   */
  on(event, callback) {
    if (!this.listeners[event]) {
      this.listeners[event] = []
    }
    const callbacks = this.listeners[event]
    if (callbacks.indexOf(callback) === -1) {
      callbacks.push(callback)
    }
  }

  /**
   * 移除事件监听器
   * @param {string} event 事件名称
   * @param {Function} callback 回调函数
   */
  off(event, callback) {
    if (this.listeners[event]) {
      if (!callback) {
        // 移除所有事件监听器
        this.listeners[event] = []
      } else {
        const callbacks = this.listeners[event]
        const newCallbacks = callbacks.filter((cb) => cb !== callback)
        // 如果移除后没有监听器,则删除事件
        if (newCallbacks.length === 0) {
          delete this.listeners[event]
        } else {
          this.listeners[event] = newCallbacks
        }
      }
    }
  }

  /**
   * 触发事件
   * @param {SSECustomEvent} e 事件对象
   * @returns {boolean} 是否成功触发事件
   * @private
   */
  _dispatchEvent(e) {
    if (!e) {
      return true
    }

    const type = e.type
    // 设置事件源
    e.source = this

    if (this.listeners[type]) {
      // 遍历所有事件监听器,并执行回调函数
      this.listeners[type].every((callback) => {
        callback(e)
        // 如果事件被阻止,则停止遍历
        return !e.defaultPrevented
      })
    }
    return true
  }

  /**
   * 标记连接已关闭
   * @private
   */
  _markClosed() {
    // this.chunk = ''
    // this.requestTask = null
    this._setReadyState(SSEClient.CLOSED)
  }

  /**
   * 设置连接状态
   * @param {number} state 连接状态
   * @private
   */
  _setReadyState(state) {
    if (this.readyState === state) {
      return
    }
    if (this.debug) {
      console.debug('SSE _setReadyState', state, this._stateToString(state))
    }
    const event = new SSECustomEvent('readystatechange')
    event.readyState = state
    this.readyState = state
    this._dispatchEvent(event)
  }

  /**
   * 将状态转换为字符串
   * @param {number} state 状态
   * @returns {string} 状态字符串
   */
  _stateToString(state) {
    return ['初始化中', '连接中', '已连接', '已关闭'][state + 1]
  }

  /**
   * 标记连接已打开
   * @private
   */
  _onOpened() {
    if (this.readyState === SSEClient.OPEN) {
      return
    }
    this.chunk = ''
    this.retryCount = 0
    // 触发 open 事件
    const event = new SSECustomEvent('open')
    event.responseCode = this.requestTask.status
    event.headers = this.headers
    this._setReadyState(SSEClient.OPEN)
    this._dispatchEvent(event)
  }

  _onConnecting() {
    this._setReadyState(SSEClient.CONNECTING)
    const event = new SSECustomEvent('connecting')
    this._dispatchEvent(event)
  }

  /**
   * 处理失败
   * @param {Error} e 错误对象
   * @private
   */
  _onFailure(e) {
    const event = new SSECustomEvent('error')
    event.error = e
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 处理成功
   * @param {Object} e 成功对象
   * @private
   */
  _onSuccess(e) {
    const event = new SSECustomEvent('success')
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 处理完成
   * @param {*} e 完成对象
   * @private
   */
  _onComplete(e) {
    const event = new SSECustomEvent('complete')
    this._dispatchEvent(event)
    this._markClosed()
  }

  /**
   * 重连机制
   * @private
   */
  _reconnect() {
    if (this.retryCount >= this.maxRetries) {
      const event = new SSECustomEvent('error')
      event.error = new Error('最大重试次数已达到')
      this._dispatchEvent(event)
      return
    }

    this.retryCount++
    setTimeout(() => {
      const event = new SSECustomEvent('reconnecting')
      event.retryCount = this.retryCount
      this._dispatchEvent(event)
      this.connect()
    }, this.reconnectInterval)
  }

  /**
   * 将 ArrayBuffer 转换为字符串
   * @private
   */
  _arrayBufferToString(buffer) {
    let text = decodeURIComponent(
      escape(String.fromCharCode.apply(null, new Uint8Array(buffer)))
    )
    return text
  }

  /**
   * 解析接收到的 SSE 事件块,构造事件对象
   * Parse a received SSE event chunk into a constructed event object.
   *
   * Reference: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
   */
  _parseEventChunk(chunk) {
    if (!chunk || chunk.length === 0) {
      return null
    }

    const e = { id: null, retry: null, data: null, event: null }
    chunk.split(/\n|\r\n|\r/).forEach((line) => {
      const index = line.indexOf(this.FIELD_SEPARATOR)
      let field, value
      if (index > 0) {
        // only first whitespace should be trimmed
        const skip = line[index + 1] === ' ' ? 2 : 1
        field = line.substring(0, index)
        value = line.substring(index + skip)
      } else if (index < 0) {
        // Interpret the entire line as the field name, and use the empty string as the field value
        field = line
        value = ''
      } else {
        // A colon is the first character. This is a comment; ignore it.
        return
      }

      if (!(field in e)) {
        return
      }

      // consecutive 'data' is concatenated with newlines
      if (field === 'data' && e[field] !== null) {
        e['data'] += '\n' + value
      } else {
        e[field] = value
      }
    })

    if (e.id !== null) {
      this.lastEventId = e.id
    }

    const event = new SSECustomEvent(e.event || 'message')
    event.id = e.id
    event.data = e.data || ''
    event.lastEventId = this.lastEventId
    return event
  }
}

if (typeof module !== 'undefined' && module.exports) {
  module.exports = SSEClient
}

export default SSEClient

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容