SSE流 webclient

在 Java 中使用WebClient + SSE(Server-Sent Events)来消费服务端流式响应是 Spring WebFlux 中推荐的方式。下面是完整的示例结构,展示如何使用 WebClient 接收SSE 数据流,适用于对接 OpenAI、LangChain、Spring SSE 服务等流式响应。

 一、服务端返回 SSE(text/event-stream)

// 示例 Controller - 模拟返回 SSE 流数据

@RestController

@RequestMapping("/sse")

public class SseServerController {

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

    public Flux<String> streamEvents() {

        return Flux.interval(Duration.ofMillis(500))

                  .map(i -> "服务器消息: " + i)

                  .take(10);

    }

}

php

364 Bytes

© 菜鸟-创作你的创作

 二、客户端 WebClient 接收 SSE 流(推荐方式)

你可以使用WebClient来发起 SSE 请求,并以 Flux 方式处理:

@Component

public class SseClient {

    private final WebClient webClient = WebClient.create("http://localhost:8080");

    public void consumeSseStream() {

        webClient.get()

                .uri("/sse/stream")

                .accept(MediaType.TEXT_EVENT_STREAM)

                .retrieve()

                .bodyToFlux(String.class)

                .doOnNext(msg -> System.out.println("收到消息: " + msg))

                .blockLast(); // 阻塞直到流完成(可改为 subscribe 异步)

    }

}

php

476 Bytes

© 菜鸟-创作你的创作

 或者异步监听(非阻塞):

public void asyncSseListen() {

    webClient.get()

            .uri("/sse/stream")

            .accept(MediaType.TEXT_EVENT_STREAM)

            .retrieve()

            .bodyToFlux(String.class)

            .subscribe(msg -> {

                System.out.println("接收到: " + msg);

            }, error -> {

                System.err.println("错误: " + error.getMessage());

            }, () -> {

                System.out.println("接收结束");

            });

}

php

463 Bytes

© 菜鸟-创作你的创作

 三、使用 WebClient 对接 OpenAI 流式接口(SSE)

如果你使用的是 OpenAI Chat Completion 接口并启用stream=true,格式类似 SSE:

public void callOpenAIStream(String prompt) {

    WebClient client = WebClient.builder()

        .baseUrl("https://api.openai.com/v1")

        .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer YOUR_API_KEY")

        .build();

    Map<String, Object> requestBody = Map.of(

        "model", "gpt-3.5-turbo",

        "stream", true,

        "messages", List.of(Map.of("role", "user", "content", prompt))

    );

    client.post()

        .uri("/chat/completions")

        .contentType(MediaType.APPLICATION_JSON)

        .accept(MediaType.TEXT_EVENT_STREAM)

        .bodyValue(requestBody)

        .retrieve()

        .bodyToFlux(String.class) // 返回每一行 SSE 数据

        .takeUntil(msg -> msg.contains("[DONE]")) // OpenAI 特有结束标志

        .map(msg -> parseOpenAIChunk(msg)) // 你可在此解析 JSON 结构

        .doOnNext(System.out::println)

        .blockLast();

}

php

848 Bytes

© 菜鸟-创作你的创作

 四、注意事项

bodyToFlux(String.class)获取每个 SSE 数据块;

OpenAI 返回的是伪 SSE(非标准 JSON,每行开头为data:),需解析;

如果你使用 Spring Boot 3.x + WebFlux,无需任何特殊依赖;

对接 GPT 时需设置 header 和解析data: {json...}行。

 五、前端配合(原生 JS)

<script>

  const source = new EventSource("/sse/stream");

  source.onmessage = function(e) {

    console.log("前端收到数据:", e.data);

  };

</script>

php

143 Bytes

https://www.52runoob.com/archives/4332

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容