wx-sse-client
遇到的问题:
文字丢失问题
由于是收到的是数据流,前端接收到的数据块,在解析的时候发现有些是不完整的,无法解析。反应到界面上就是发现会丢失一些文字。
解决方案:保存未处理的数据,新的数据来的时候,拼接上新数据,尝试解析,解析后,保留未能解析的数据,给一下段数据使用
/**
* SSE 客户端封装类
* 支持自动重连、错误处理和事件监听
* events: open, message, readystatechange, error, close, success, complete, reconnecting
*/
class SSECustomEvent {
constructor(type, options) {
this.type = type
// this.responseCode = options.responseCode
// this.headers = options.headers
}
}
/**
* SSE 客户端封装类
* 支持自动重连、错误处理和事件监听
* 参考:https://github.com/mpetazzoni/sse.js
*/
class SSEClient {
/** @type {number} */
/**
* -1 初始化中
*/
static INITIALIZING = -1
/** @type {number} */
/**
* 0 连接中
*/
static CONNECTING = 0
/** @type {number} */
/**
* 1 已连接
*/
static OPEN = 1
/** @type {number} */
/**
* 2 已关闭
*/
static CLOSED = 2
constructor(url, options = {}) {
options = options || {}
// 配置项初始化
this.url = url || '' // SSE 服务器地址
this.reconnectInterval = options.reconnectInterval || 10000 // 重连间隔时间(毫秒)
this.maxRetries = options.maxRetries || 5 // 最大重试次数
this.headers = options.headers || {} // 请求头
this.payload = options.payload !== undefined ? options.payload : '' // 请求体
this.method = options.method || (this.payload && 'POST') || 'GET' // 请求方法
// this.withCredentials = !!options.withCredentials
this.debug = !!options.debug // 是否开启调试
// this.debug = true
// 内部状态
this.retryCount = 0 // 当前重试次数
this.requestTask = null // 请求任务对象
this.listeners = {} // 事件监听器集合
/** @type {string} */
this.FIELD_SEPARATOR = ':' // 字段分隔符
/** @type {number} */
this.readyState = SSEClient.INITIALIZING // 连接状态
// /** @type {number} */
// this.progress = 0
/** @type {string} */
this.chunk = ''
/** @type {string} */
this.lastEventId = ''
}
/**
* 建立连接
*/
connect() {
// 如果连接状态为连接中或已连接,则直接返回
if (
this.readyState === SSEClient.CONNECTING ||
this.readyState === SSEClient.OPEN
)
return
this._onConnecting()
this.requestTask = wx.request({
url: this.url,
method: this.method,
header: {
...this.headers,
Accept: 'text/event-stream',
'Cache-Control': 'no-cache'
},
enableChunked: true, // 开启分块传输
success: (res) => {
// 断开的时候回调
if (this.debug) {
console.debug('SSE success', res)
}
this._onSuccess(res)
// // todo 处理响应码非200的错误
// if (res.statusCode !== 200) {
// this._onFailure(new Error(`SSE 连接失败,响应码:${res.statusCode}`))
// return
// }
// this._onOpened()
},
fail: (error) => {
if (this.debug) {
console.debug('SSE fail', error)
}
this._onFailure(error)
this._reconnect()
},
complete: () => {
if (this.debug) {
console.debug('SSE complete')
}
this._onComplete()
}
})
console.log('requestTask:', this.requestTask)
// 监听数据返回
this.requestTask.onChunkReceived((response) => {
try {
this._onOpened()
// if (this.debug) {
// console.debug('SSE onChunkReceived', this.requestTask, response)
// }
// 将 ArrayBuffer 转换为字符串
const data = this._arrayBufferToString(response.data)
// 将数据分隔为多个部分
const parts = (this.chunk + data).split(/(\r\n\r\n|\r\r|\n\n)/g)
/*
* We assume that the last chunk can be incomplete because of buffering or other network effects,
* so we always save the last part to merge it with the next incoming packet
*/
const lastPart = parts.pop()
// 遍历每个部分,解析事件
parts.forEach((part) => {
if (part.trim().length > 0) {
if (this.debug) {
console.debug('SSE onChunkReceived part', part)
}
this._dispatchEvent(this._parseEventChunk(part))
}
})
this.chunk = lastPart
} catch (error) {
console.error('SSE onChunkReceived error', error)
}
})
}
/**
* 关闭连接
*/
close() {
if (this.readyState === SSEClient.CLOSED) {
return
}
if (this.requestTask) {
this.requestTask.abort()
}
this._markClosed()
}
/**
* 添加事件监听器
* @param {string} event 事件名称 (message/open/close/error)
* @param {Function} callback 回调函数
*/
on(event, callback) {
if (!this.listeners[event]) {
this.listeners[event] = []
}
const callbacks = this.listeners[event]
if (callbacks.indexOf(callback) === -1) {
callbacks.push(callback)
}
}
/**
* 移除事件监听器
* @param {string} event 事件名称
* @param {Function} callback 回调函数
*/
off(event, callback) {
if (this.listeners[event]) {
if (!callback) {
// 移除所有事件监听器
this.listeners[event] = []
} else {
const callbacks = this.listeners[event]
const newCallbacks = callbacks.filter((cb) => cb !== callback)
// 如果移除后没有监听器,则删除事件
if (newCallbacks.length === 0) {
delete this.listeners[event]
} else {
this.listeners[event] = newCallbacks
}
}
}
}
/**
* 触发事件
* @param {SSECustomEvent} e 事件对象
* @returns {boolean} 是否成功触发事件
* @private
*/
_dispatchEvent(e) {
if (!e) {
return true
}
const type = e.type
// 设置事件源
e.source = this
if (this.listeners[type]) {
// 遍历所有事件监听器,并执行回调函数
this.listeners[type].every((callback) => {
callback(e)
// 如果事件被阻止,则停止遍历
return !e.defaultPrevented
})
}
return true
}
/**
* 标记连接已关闭
* @private
*/
_markClosed() {
// this.chunk = ''
// this.requestTask = null
this._setReadyState(SSEClient.CLOSED)
}
/**
* 设置连接状态
* @param {number} state 连接状态
* @private
*/
_setReadyState(state) {
if (this.readyState === state) {
return
}
if (this.debug) {
console.debug('SSE _setReadyState', state, this._stateToString(state))
}
const event = new SSECustomEvent('readystatechange')
event.readyState = state
this.readyState = state
this._dispatchEvent(event)
}
/**
* 将状态转换为字符串
* @param {number} state 状态
* @returns {string} 状态字符串
*/
_stateToString(state) {
return ['初始化中', '连接中', '已连接', '已关闭'][state + 1]
}
/**
* 标记连接已打开
* @private
*/
_onOpened() {
if (this.readyState === SSEClient.OPEN) {
return
}
this.chunk = ''
this.retryCount = 0
// 触发 open 事件
const event = new SSECustomEvent('open')
event.responseCode = this.requestTask.status
event.headers = this.headers
this._setReadyState(SSEClient.OPEN)
this._dispatchEvent(event)
}
_onConnecting() {
this._setReadyState(SSEClient.CONNECTING)
const event = new SSECustomEvent('connecting')
this._dispatchEvent(event)
}
/**
* 处理失败
* @param {Error} e 错误对象
* @private
*/
_onFailure(e) {
const event = new SSECustomEvent('error')
event.error = e
this._dispatchEvent(event)
this._markClosed()
}
/**
* 处理成功
* @param {Object} e 成功对象
* @private
*/
_onSuccess(e) {
const event = new SSECustomEvent('success')
this._dispatchEvent(event)
this._markClosed()
}
/**
* 处理完成
* @param {*} e 完成对象
* @private
*/
_onComplete(e) {
const event = new SSECustomEvent('complete')
this._dispatchEvent(event)
this._markClosed()
}
/**
* 重连机制
* @private
*/
_reconnect() {
if (this.retryCount >= this.maxRetries) {
const event = new SSECustomEvent('error')
event.error = new Error('最大重试次数已达到')
this._dispatchEvent(event)
return
}
this.retryCount++
setTimeout(() => {
const event = new SSECustomEvent('reconnecting')
event.retryCount = this.retryCount
this._dispatchEvent(event)
this.connect()
}, this.reconnectInterval)
}
/**
* 将 ArrayBuffer 转换为字符串
* @private
*/
_arrayBufferToString(buffer) {
let text = decodeURIComponent(
escape(String.fromCharCode.apply(null, new Uint8Array(buffer)))
)
return text
}
/**
* 解析接收到的 SSE 事件块,构造事件对象
* Parse a received SSE event chunk into a constructed event object.
*
* Reference: https://html.spec.whatwg.org/multipage/server-sent-events.html#dispatchMessage
*/
_parseEventChunk(chunk) {
if (!chunk || chunk.length === 0) {
return null
}
const e = { id: null, retry: null, data: null, event: null }
chunk.split(/\n|\r\n|\r/).forEach((line) => {
const index = line.indexOf(this.FIELD_SEPARATOR)
let field, value
if (index > 0) {
// only first whitespace should be trimmed
const skip = line[index + 1] === ' ' ? 2 : 1
field = line.substring(0, index)
value = line.substring(index + skip)
} else if (index < 0) {
// Interpret the entire line as the field name, and use the empty string as the field value
field = line
value = ''
} else {
// A colon is the first character. This is a comment; ignore it.
return
}
if (!(field in e)) {
return
}
// consecutive 'data' is concatenated with newlines
if (field === 'data' && e[field] !== null) {
e['data'] += '\n' + value
} else {
e[field] = value
}
})
if (e.id !== null) {
this.lastEventId = e.id
}
const event = new SSECustomEvent(e.event || 'message')
event.id = e.id
event.data = e.data || ''
event.lastEventId = this.lastEventId
return event
}
}
if (typeof module !== 'undefined' && module.exports) {
module.exports = SSEClient
}
export default SSEClient