sse是一种服务端向客户端推送(单向推荐)的技术,基于http的协议
如果需要 websocket
注意:这里特别说明一下,如果部署到nginx服务器,那么一定不能忘记关闭缓冲,否则前端连接不上
,这里是我的nginx反向代理
# /etc/nginx/conf.d目录下新建 xxxx.xiaoer.cn.conf配置文件
# /etc/nginx/conf.d/xxxx.xiaoer.cn.conf
server {
listen 80;
server_name xxxx.xiaoer.cn;
location / {
proxy_pass http://xxx.xx.xx.xx:7060;
proxy_buffering off; # 关闭缓冲,特别重要
proxy_cache off; # 关闭缓存,特别重要
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
引包
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
首先创建工具类 SseEmitterUtil
package com.zyq.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/** sse推送
* author xiaochi
* date 2021/6/11 15:04
*/
@Slf4j
public class SseEmitterUtil {
private static final ConcurrentHashMap<String, SseEmitter> container = new ConcurrentHashMap<>(0);
/**
* 创建用户连接并返回 SseEmitter
* @param clientId
* @return
*/
public static SseEmitter connect(String clientId){
if (container.containsKey(clientId)){
remove(clientId);
}
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(() ->{
remove(clientId);
log.info("结束连接:{}", clientId);
});
sseEmitter.onTimeout(() ->{
remove(clientId);
log.info("连接超时:{}", clientId);
});
sseEmitter.onError(e -> {
remove(clientId);
log.info("连接异常:{},error:", clientId,e);
});
container.put(clientId,sseEmitter);
log.info("创建sse连接,当前用户:{}", clientId);
return sseEmitter;
}
/**
* 移除用户连接
* @param clientId
*/
public static void remove(String clientId){
container.remove(clientId);
log.info("移除用户:{}", clientId);
}
/**
* 推送消息到指定用户
* @param clientId
* @param message
*/
public static void send(String clientId,String message){
if (container.containsKey(clientId)){
try {
container.get(clientId).send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", clientId, e.getMessage());
remove(clientId);
}
}
}
/**
* 群发消息
* @param clientIds
* @param message
*/
public static void send(List<String> clientIds,String message) {
clientIds.forEach(clientId -> send(clientId, message));
}
/**
* 群发消息所有人
* @param message
*/
public static void send(String message) {
container.forEach((clientId,v) -> {
try {
v.send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", clientId, e.getMessage());
remove(clientId);
}
});
}
}
创建接口
/**
* 用于创建连接
*/
@ApiAuthorize(token = false)
@GetMapping("/sse/connect/{clientId}")
public SseEmitter connect(@PathVariable String clientId) {
return SseEmitterUtil.connect(clientId);
}
/**
* 推送给所有人
*
* @param message
* @return
*/
@ApiAuthorize(token = false)
@GetMapping("/sse/push/{message}")
public R<String> push(@PathVariable(name = "message") String message) {
SseEmitterUtil.send(message);
return R.ok("发送成功!");
}
/**
* 关闭连接
*/
@ApiAuthorize(token = false)
@GetMapping("/sse/close/{clientId}")
public R<String> close(@PathVariable String clientId) {
SseEmitterUtil.remove(clientId);
return R.ok("连接关闭");
}
到此就算完成,接下来就是前端页面进行链接访问,新增sse.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>消息推送</title>
</head>
<body>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
let source = null;
// 用时间戳模拟登录用户
const userId = new Date().getTime();
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:6020/zyq/api/sse/connect/' + userId);
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
/**
* 如果发生通信错误(比如连接中断),就会触发error事件
* 或者:
* 另一种写法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
closeSse();
};
// 关闭Sse连接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:6020/zyq/api/sse/close/' + userId, true);
httpRequest.send();
console.log("close");
}
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
</html>
原生js就是这样的,如果是vue请看下面
vue
安装包(event-source-polyfill支持设置请求头
)
npm i -S event-source-polyfill
封装函数
function eventSource(url,header,openCallback,successCallback,errorCallback){
let eventSource = new EventSourcePolyfill(url, {
heartbeatTimeout:300000, //超时时间,毫秒为单位,以5分钟为例
withCredentials: true,
headers: header
});
eventSource.onopen = function (e) {
console.log(url+'连接成功',e);
openCallback && openCallback(e);
};
eventSource.onmessage = function (e) {
console.log(url+'收到消息', e);
successCallback && successCallback(e)
};
eventSource.onerror = function (e) {
console.log(url+'连接失败', e);
eventSource.close(); // 关闭连接
errorCallback && errorCallback(e);
};
return eventSource;
}
到此完成,感谢阅读。