java服务端
将/ws映射到YourWebSocketHandler 处理器上
添加拦截器
有拦截器的需要放开/ws
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
@Autowired
private YourWebSocketHandler webSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/ws")
.addInterceptors(webSocketHandshakeInterceptor)
.setAllowedOrigins("*"); // 配置允许的源
}
}
#处理器
@Component
@Slf4j
public class YourWebSocketHandler extends TextWebSocketHandler {
@Autowired
private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 连接建立后的逻辑
LoginUser token = webSocketHandshakeInterceptor.getToken(session);
if (token!=null) {
WebSocketUtils.addMapSession(session.getId(),SecurityUtils.getUserId());
WebSocketUtils.addSession(token.getUserId(),session);
log.info("websocket已连接: {}", session.getId());
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 收到消息后的逻辑
log.info("收到消息:{}", message.getPayload());
session.sendMessage(new TextMessage(message.getPayload()));
String payload = message.getPayload();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
// 连接关闭后的逻辑
Long userId = WebSocketUtils.getMapSession(session.getId());
WebSocketUtils.removeSession(userId);
WebSocketUtils.removeMapSession(session.getId());
System.out.println("websocket连接关闭: " + session.getId());
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
// 传输错误处理逻辑
log.error("websocket连接异常",exception);
Long userId = WebSocketUtils.getMapSession(session.getId());
WebSocketUtils.removeSession(userId);
WebSocketUtils.removeMapSession(session.getId());
session.close(CloseStatus.SERVER_ERROR);
}
}
//拦截器
@Component
@Slf4j
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Autowired
private TokenService tokenService;
//权限处理
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String token = request.getHeaders().getFirst("Authorization");
if (StringUtils.isBlank(token)) {
token = request.getHeaders().getFirst("Sec-WebSocket-Protocol");
}
log.info("token:{}", token);
if (token == null ) {
return false; // 拦截,如果token无效则返回false
}
getToken(request,token);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
//如果token在Sec-WebSocket-Protocol上 返回也需要有
List<String> strings = request.getHeaders().get("Sec-WebSocket-Protocol");
if (strings != null) {
response.getHeaders().put("Sec-WebSocket-Protocol",strings);
}
}
public void getToken(ServerHttpRequest request,String token){
LoginUser loginUser = tokenService.getLoginUserSocket(token);
if (StringUtils.isNotNull(loginUser))
{
tokenService.verifyToken(loginUser);
UsernamePasswordAuthenticationToken authenticationToken = new UsernamePasswordAuthenticationToken(loginUser, null, loginUser.getAuthorities());
if(request instanceof ServletServerHttpRequest) {
ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
HttpServletRequest httpServletRequest = serverHttpRequest.getServletRequest();
authenticationToken.setDetails(new WebAuthenticationDetailsSource().buildDetails(httpServletRequest));
}
SecurityContextHolder.getContext().setAuthentication(authenticationToken);
log.info(SecurityUtils.getUserId().toString());
}
}
public LoginUser getToken(WebSocketSession session){
String authorization = session.getHandshakeHeaders().getFirst("Authorization");
if (StringUtils.isBlank(authorization)) {
List<String> strings = session.getHandshakeHeaders().get("Sec-WebSocket-Protocol");
if(strings!=null &&!strings.isEmpty()) {
authorization =strings.get(0);
}
}
LoginUser loginUser = tokenService.getLoginUserSocket(authorization);
if (StringUtils.isNotNull(loginUser))
{
tokenService.verifyToken(loginUser);
UsernamePasswordAuthenticationToken authenticationToken = new UsernamePasswordAuthenticationToken(loginUser, null, loginUser.getAuthorities());
SecurityContextHolder.getContext().setAuthentication(authenticationToken);
log.info(SecurityUtils.getUserId().toString());
return loginUser;
}
return null;
}
}
#工具类
public class WebSocketUtils {
private static Map<Long, WebSocketSession> map = new HashMap<>();
private static Map<String, Long> mapSession = new HashMap<>();
public static void removeSession(Long id) {
map.remove(id);
}
public static void addSession(Long id,WebSocketSession session) {
map.put(id,session);
}
public static WebSocketSession getSession(Long id) {
if (map.containsKey(id)) {
return map.get(id);
}
return null;
}
public static Long getMapSession(String sessionId) {
if (mapSession.containsKey(sessionId)) {
return mapSession.get(sessionId);
}
return null;
}
public static void addMapSession(String session,Long userId) {
WebSocketUtils.mapSession.put(session,userId);
}
public static void removeMapSession(String session) {
mapSession.remove(session);
}
js客户端
import { getToken } from '@/utils/auth'
let socket;
let reconnectInterval = 1000;
const url = 'ws://127.0.0.1:8080/ws'
let shouldReconnect = true; // 控制重连的标志
async function connect() {
if(!getToken()){
return ;
}
let token= `${getToken()}`
if(!socket){
try{
socket = new WebSocket(url,[token]);
socket.onopen = async () => {
socket.send("hello");
reconnectInterval = 1000; // 重置重连间隔
};
socket.onmessage = (event) => {
console.log('Received from server:', event.data);
};
socket.onclose = () => {
console.log('Disconnected from server');
if (shouldReconnect) {
reconnect();
}
};
socket.onerror = (event) => {
console.log('WebSocket Error:', event.error);
if (shouldReconnect) {
socket.close();
}
};
}catch(err){
console.log(err);
}
}else{
switch (socket.readyState) {
case socket.CONNECTING: //表示正在连接。
//console.log("正在连接");
break;
case socket.OPEN: //表示连接成功,可以通信了。
//console.log("已经连接");
break;
case socket.CLOSING: //表示连接正在关闭。
//console.log("正在关闭,1秒后再次尝试连接");
setTimeout(() => {
reconnect();
}, 1000);
break;
case socket.CLOSED: //表示连接已经关闭,或者打开连接失败
//console.log("已经关闭,再次连接");
socket = null;
reconnect();
break;
default:
// this never happens
break;
}
}
}
function reconnect() {
console.log(`Attempting to reconnect in ${reconnectInterval / 1000} seconds...`);
setTimeout(() => {
reconnectInterval = Math.min(reconnectInterval * 2, 30000); // 增加重连间隔,但不超过30秒
connect();
}, reconnectInterval);
}
function closeConnection() {
shouldReconnect = false; // 禁止自动重连
if (socket) {
socket.close();
}
console.log('Connection closed manually');
}
export { connect, closeConnection };