LLM 流式通信之 SSE

写在前面

SSE是LLM进行流式通信常用的技术方案, 下图是 kimi 的示例

kimi回答时使用SSE

SSE 简介

Server-Sent Events(SSE)是一种允许服务器向客户端实时推送数据的技术。它基于HTTP协议,允许服务器通过一个持久的HTTP连接向客户端发送事件流。以下是SSE的一些关键点:

  1. SSE的本质:SSE利用HTTP协议的流信息(streaming)特性,实现服务器向客户端的单向通信。客户端保持连接打开,等待服务器发送新的数据流。

  2. SSE的特点

    • 使用HTTP协议,现有的服务器软件都支持。
    • 轻量级,使用简单,与WebSocket相比,协议相对简单。
    • 默认支持断线重连,而WebSocket需要自己实现。
    • 一般只用来传送文本数据,二进制数据需要编码后传送。
    • 支持自定义发送的消息类型。
  3. 客户端API

    • EventSource对象用于创建与服务器的连接并接收事件。
    • 通过监听message事件接收服务器发送的消息。
    • 可以监听自定义事件,不仅限于message事件。
  4. 服务器端发送事件

    • 服务器端脚本需要使用text/event-streamMIME类型响应内容。
    • 每个通知以文本块形式发送,并以一对换行符结尾。
    • 消息由字段组成,包括eventdataidretry等。
  5. 事件流格式

    • 事件流是一个简单的文本数据流,使用UTF-8编码。
    • 消息由一对换行符分开,以冒号开头的行为注释行,会被忽略。
    • 每条消息由一行或多行文字组成,列出该消息的字段。
  6. 浏览器兼容性

    • SSE在现代浏览器中得到了广泛支持,除了IE/Edge外,其他浏览器如Firefox、Chrome、Safari等都支持SSE。

SSE适用于需要服务器向客户端单向实时推送数据的场景,如实时通知、股票行情、新闻推送等。它是一种有效降低服务器负载和网络资源消耗的技术,通过服务器主动向客户端发送更新事件,实现实时通信。

py 中使用 SSE

  • py 中异步: async + await
  • py 中流式接收 SSE: httpx
  • py 中流式返回 SSE: from fastapi.responses import StreamingResponse as FastapiStreamingResponse
  • 路由定义
@router.post("/stream", tags=["chat"])
async def streaming_chat(
    params: QuestionParams, current_user: TokenData = Depends(get_current_user)
):
    if not params.user_id:
        params.user_id = current_user.uid
    async_generator = RetrievalController().stream_answer(params)
    return StreamingResponse(async_generator)
  • 流式输出定义
from typing import Mapping

from fastapi.responses import StreamingResponse as FastapiStreamingResponse
from starlette.background import BackgroundTask
from starlette.responses import ContentStream


class StreamingResponse(FastapiStreamingResponse):
    def __init__(
        self,
        content: ContentStream,
        status_code: int = 200,
        headers: Mapping[str, str] | None = None,
        media_type: str | None = None,
        background: BackgroundTask | None = None,
    ) -> None:
        default_headers = {"Content-Type": "text/event-stream", "Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
        default_headers.update(headers or {})
        super().__init__(content, status_code, default_headers, media_type, background)
  • 流式接收并流式返回
    @LogDecorate(
        func_name="retrieval_controller::process_stream_answer", raise_exc=True
    )
    async def stream_answer(self, params: QuestionParams, model: int = 1):
        """
        :param model: 1-8B 2-32B
        """
        session_id = params.session_id
        if params.new_session:
            session_id = str(uuid.uuid1()).replace("-", "")
        request_body = dict(
            messages=msgs,
            user_id=params.user_id,
        )
        stream_answer_api = f"{AI_DOMAIN}{STREAM_ANSWER_API}"

        answer = ""
        # 流式接收
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST",
                stream_answer_api,
                json=request_body,
                timeout=60,
                headers=dict(trace_id=get_req_ctx("trace_id")),
            ) as response:
                async for chunk in response.aiter_text():
                    answer += chunk
                    yield self.get_yield_data(
                        {"content": chunk, "create_at": int(time.time() * 1000)}
                    )

        yield self.get_yield_data("[DONE]")
        yield self.get_yield_data({"session_id": session_id})
        yield self.get_yield_data("[END]")

        # 落库
        await user_qa_dao.save_user_qa(params.q, answer, session_id, params.user_id)

Go中使用SSE

使用 https://github.com/hertz-contrib/sse

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/cloudwego/hertz/pkg/app"
    "github.com/cloudwego/hertz/pkg/common/hlog"
    "github.com/google/uuid"
    "github.com/hertz-contrib/sse"
    "github.com/spf13/cast"
)

func ChatStream(ctx context.Context, c *app.RequestContext) {
    u := ctl.CtxUser(c)

    var req struct {
        Query string `form:"query" json:"query"`
        Model int    `form:"model" json:"model"`
        Sid   string `form:"sid" json:"sid"` // session id
    }
    if err := c.BindAndValidate(&req); err != nil {
        utils.RespErr(c, err)
        return
    }

    // 聊天消息支持多轮对话
    var sid string
    if req.Sid != "" {
        sid = req.Sid
    } else {
        sid = uuid.New().String()
    }
    msg := chat.SaveUserMsg(ctx, sid, req.Query)
    content := &chat.Content{
        Messages: msg,
        UserId:   cast.ToString(u.ID),
        UserName: u.Name,
    }
    b, _ := json.Marshal(content)

    // https://github.com/hertz-contrib/sse/blob/main/examples/client/quickstart/main.go
    cli := sse.NewClient(conf.GetConf().Dev.AIDomain + "xxx")
    cli.SetMethod("POST")
    cli.SetHeaders(map[string]string{"Content-Type": "application/json", "trace_id": httpx.TraceId()})
    cli.SetBody(b)

    var ans, allAns string // AI 返回内容
    var flag bool          // reply正文标识
    events := make(chan *sse.Event)
    errChan := make(chan error)
    s := sse.NewStream(c)
    go func() {
        cErr := cli.Subscribe(func(msg *sse.Event) {
            if msg != nil && msg.Data != nil {
                events <- msg
                return
            }
        })
        errChan <- cErr
    }()
    for {
        select {
        case e := <-events:
            m := map[string]any{}
            _ = json.Unmarshal(e.Data, &m)
            if v, ok := m["content"]; ok {
                allAns += v.(string)
                if flag {
                    ans += v.(string)
                }
                if v == "__REPLY_START__" {
                    flag = true
                }
                da := map[string]any{
                    "content":   v,
                    "create_at": time.Now().Unix(),
                }
                jsonData, _ := json.Marshal(da)
                hlog.Info("publish event data = %s", string(jsonData))
                _ = s.Publish(&sse.Event{Data: jsonData})
            } else {
                hlog.Info("invalid event data = %s", string(e.Data))
            }
        case err := <-errChan:
            if err != nil {
                hlog.CtxErrorf(context.Background(), "err = %s", err.Error())
            }
            chat.SaveAssistantMsg(ctx, sid, ans, msg)
            chat.SaveQA(u.ID, sid, req.Query, allAns)
            _ = s.Publish(&sse.Event{Data: []byte("[DONE]")})
            _ = s.Publish(&sse.Event{Data: []byte(fmt.Sprintf(`{"session_id": "%s"}`, sid))})
            _ = s.Publish(&sse.Event{Data: []byte("[END]")})
            hlog.Info("cli get all event")
            return
        }
    }
}

写在最后

需要注意的点

  • py 使用 httpx 接收 SSE 流式数据, 对数据结构没有要求, 比如 SSE event 常见的 data: xxx, 可以不带 data 标识返回
  • go 中使用 https://github.com/hertz-contrib/sse 接收 SSE 流式数据
    • 底层会解析 SSE 数据格式, 需要判断 data 标识, 如果没有, 会导致解析失败
    • 如果数据包含 \n换行, 也会导致数据解析失败, 比较简单的做法 data: json 格式数据
// go 中对应 SSE 库数据解析源码
func (c *Client) processEvent(msg []byte) (event *Event, err error) {
    var e Event

    if len(msg) < 1 {
        return nil, fmt.Errorf("event message was empty")
    }

    // Normalize the crlf to lf to make it easier to split the lines.
    // Split the line by "\n" or "\r", per the spec.
    for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) {
        switch {
        case bytes.HasPrefix(line, headerID):
            e.ID = string(append([]byte(nil), trimHeader(len(headerID), line)...))
        case bytes.HasPrefix(line, headerData):
            // The spec allows for multiple data fields per event, concatenated them with "\n".
            e.Data = append(e.Data[:], append(trimHeader(len(headerData), line), byte('\n'))...)
        // The spec says that a line that simply contains the string "data" should be treated as a data field with an empty body.
        case bytes.Equal(line, bytes.TrimSuffix(headerData, []byte(":"))):
            e.Data = append(e.Data, byte('\n'))
        case bytes.HasPrefix(line, headerEvent):
            e.Event = string(append([]byte(nil), trimHeader(len(headerEvent), line)...))
        case bytes.HasPrefix(line, headerRetry):
            e.Retry, err = strconv.ParseUint(b2s(append([]byte(nil), trimHeader(len(headerRetry), line)...)), 10, 64)
            if err != nil {
                return nil, fmt.Errorf("process message `retry` failed, err is %s", err)
            }
        default:
            // Ignore any garbage that doesn't match what we're looking for.
        }
    }

    // Trim the last "\n" per the spec.
    e.Data = bytes.TrimSuffix(e.Data, []byte("\n"))

    if c.encodingBase64 {
        buf := make([]byte, base64.StdEncoding.DecodedLen(len(e.Data)))

        n, err := base64.StdEncoding.Decode(buf, e.Data)
        if err != nil {
            err = fmt.Errorf("failed to decode event message: %s", err)
            return &e, err
        }
        e.Data = buf[:n]
    }
    return &e, err
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,490评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,581评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,830评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,957评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,974评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,754评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,464评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,847评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,995评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,137评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,819评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,482评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,023评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,149评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,409评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,086评论 2 355

推荐阅读更多精彩内容