springboot2使用sse实现服务端推送

sse是一种服务端向客户端推送(单向推荐)的技术,基于http的协议

如果需要 websocket

注意:这里特别说明一下,如果部署到nginx服务器,那么一定不能忘记关闭缓冲,否则前端连接不上,这里是我的nginx反向代理

# /etc/nginx/conf.d目录下新建 xxxx.xiaoer.cn.conf配置文件
# /etc/nginx/conf.d/xxxx.xiaoer.cn.conf
 
server {
    listen       80;
    server_name  xxxx.xiaoer.cn;
 
    location / {
        proxy_pass http://xxx.xx.xx.xx:7060;
        proxy_buffering off; # 关闭缓冲,特别重要
        proxy_cache off; # 关闭缓存,特别重要
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

引包

<!-- web -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

首先创建工具类 SseEmitterUtil

package com.zyq.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

/** sse推送
 * author xiaochi
 * date 2021/6/11 15:04
 */
@Slf4j
public class SseEmitterUtil {

    private static final ConcurrentHashMap<String, SseEmitter> container = new ConcurrentHashMap<>(0);

    /**
     * 创建用户连接并返回 SseEmitter
     * @param clientId
     * @return
     */
    public static SseEmitter connect(String clientId){
        if (container.containsKey(clientId)){
            remove(clientId);
        }
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(() ->{
            remove(clientId);
            log.info("结束连接:{}", clientId);
        });
        sseEmitter.onTimeout(() ->{
            remove(clientId);
            log.info("连接超时:{}", clientId);
        });
        sseEmitter.onError(e -> {
            remove(clientId);
            log.info("连接异常:{},error:", clientId,e);
        });
        container.put(clientId,sseEmitter);
        log.info("创建sse连接,当前用户:{}", clientId);
        return sseEmitter;
    }

    /**
     * 移除用户连接
     * @param clientId
     */
    public static void remove(String clientId){
        container.remove(clientId);
        log.info("移除用户:{}", clientId);
    }

    /**
     * 推送消息到指定用户
     * @param clientId
     * @param message
     */
    public static void send(String clientId,String message){
        if (container.containsKey(clientId)){
            try {
                container.get(clientId).send(message);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", clientId, e.getMessage());
                remove(clientId);
            }
        }
    }

    /**
     * 群发消息
     * @param clientIds
     * @param message
     */
    public static void send(List<String> clientIds,String message) {
        clientIds.forEach(clientId -> send(clientId, message));
    }

    /**
     * 群发消息所有人
     * @param message
     */
    public static void send(String message) {
        container.forEach((clientId,v) -> {
            try {
                v.send(message);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", clientId, e.getMessage());
                remove(clientId);
            }
        });
    }
}

创建接口

/**
 * 用于创建连接
 */
@ApiAuthorize(token = false)
@GetMapping("/sse/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId) {
    return SseEmitterUtil.connect(clientId);
}

/**
 * 推送给所有人
 *
 * @param message
 * @return
 */
@ApiAuthorize(token = false)
@GetMapping("/sse/push/{message}")
public R<String> push(@PathVariable(name = "message") String message) {
    SseEmitterUtil.send(message);
    return R.ok("发送成功!");
}
/**
 * 关闭连接
 */
@ApiAuthorize(token = false)
@GetMapping("/sse/close/{clientId}")
public R<String> close(@PathVariable String clientId) {
    SseEmitterUtil.remove(clientId);
    return R.ok("连接关闭");
}

到此就算完成,接下来就是前端页面进行链接访问,新增sse.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>消息推送</title>
</head>
<body>
    <button onclick="closeSse()">关闭连接</button>
    <div id="message"></div>
</body>
<script>
    let source = null;

    // 用时间戳模拟登录用户
    const userId = new Date().getTime();

    if (window.EventSource) {
        // 建立连接
        source = new EventSource('http://localhost:6020/zyq/api/sse/connect/' + userId);

        /**
         * 连接一旦建立,就会触发open事件
         * 另一种写法:source.onopen = function (event) {}
         */
        source.addEventListener('open', function (e) {
            setMessageInnerHTML("建立连接。。。");
        }, false);

        /**
         * 客户端收到服务器发来的数据
         * 另一种写法:source.onmessage = function (event) {}
         */
        source.addEventListener('message', function (e) {
            setMessageInnerHTML(e.data);
        });

        /**
         * 如果发生通信错误(比如连接中断),就会触发error事件
         * 或者:
         * 另一种写法:source.onerror = function (event) {}
         */
        source.addEventListener('error', function (e) {
            if (e.readyState === EventSource.CLOSED) {
                setMessageInnerHTML("连接关闭");
            } else {
                console.log(e);
            }
        }, false);

    } else {
        setMessageInnerHTML("你的浏览器不支持SSE");
    }

    // 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
    window.onbeforeunload = function () {
        closeSse();
    };

    // 关闭Sse连接
    function closeSse() {
        source.close();
        const httpRequest = new XMLHttpRequest();
        httpRequest.open('GET', 'http://localhost:6020/zyq/api/sse/close/' + userId, true);
        httpRequest.send();
        console.log("close");
    }

    // 将消息显示在网页上
    function setMessageInnerHTML(innerHTML) {
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
</script>
</html>

原生js就是这样的,如果是vue请看下面

vue

安装包(event-source-polyfill支持设置请求头)

npm i -S event-source-polyfill

封装函数

function eventSource(url,header,openCallback,successCallback,errorCallback){
    let eventSource = new EventSourcePolyfill(url, {
        heartbeatTimeout:300000, //超时时间,毫秒为单位,以5分钟为例
        withCredentials: true,
        headers: header
    });
    eventSource.onopen = function (e) {
        console.log(url+'连接成功',e);
        openCallback && openCallback(e);
    };
    eventSource.onmessage = function (e) {
        console.log(url+'收到消息', e);
        successCallback && successCallback(e)
    };
    eventSource.onerror = function (e) {
        console.log(url+'连接失败', e);
        eventSource.close(); // 关闭连接
        errorCallback && errorCallback(e);
    };
    return eventSource;
}

到此完成,感谢阅读。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容