1、添加maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
2、新建配置类,开启websocket支持
/**
* WebScoket配置处理器
*/
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter 作用
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
3、新建服务端
@ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端。注解的值将被用于监听用户连接的终端访问URL地址。
@OnOpen和@OnClose这两个注解定义了当一个新用户连接和断开的时候所调用的方法。
@OnMessage这个注解定义了当服务器接收到客户端发送的消息时所调用的方法。
package com.mickey.erp.ticket.websocket;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
@Slf4j
@ServerEndpoint("/webSocket/{clientId}")
@Component
public class WebSocketServer {
//concurrent包的线程安全Set,用来存放每个客户端对应的WebSocketServer对象。
private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
/**
* 发送消息 底层方法
* @param session
* @param message
* @throws IOException
*/
public void sendMessage(Session session, String message) throws IOException {
if (session != null) {
synchronized (session) {
log.info("发送数据:" + message);
session.getBasicRemote().sendText(message);
}
}
}
/**
* 给指定客户端发送信息
* @param clientId
* @param message
*/
public void sendInfo(String clientId, String message) {
Session session = sessionPools.get(clientId);
try {
sendMessage(session, message);
} catch (Exception e) {
log.error("给客户端【" + clientId + "】发送消息:" + message + "异常", e);
}
}
/**
* 群发消息
* @param message
*/
public void broadcast(String message) {
for (Session session : sessionPools.values()) {
try {
sendMessage(session, message);
} catch (Exception e) {
log.error("群发消息:" + message + "异常", e);
continue;
}
}
}
/**
* 建立连接成功调用
* @param session
* @param clientId
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "clientId") String clientId) {
sessionPools.put(clientId, session);
// 广播上线消息
JSONObject msg = new JSONObject();
msg.put("date",new Date());
msg.put("clientId",clientId);
msg.put("to",0);
msg.put("status",1);
broadcast(msg.toJSONString());
}
/**
* 关闭连接时调用
* @param clientId
*/
@OnClose
public void onClose(@PathParam(value = "clientId") String clientId) {
sessionPools.remove(clientId);
// 广播下线消息
JSONObject msg = new JSONObject();
msg.put("date",new Date());
msg.put("clientId",clientId);
msg.put("status",0);
msg.put("to",-2);
broadcast(msg.toJSONString());
}
/**
* 收到客户端信息后,根据接收人的clientId把消息推下去或者群发
* to=-1群发消息
* @param message
* @throws IOException
*/
@OnMessage
public void onMessage(String message) throws IOException {
log.info("server get" + message);
JSONObject jsonObject = JSON.parseObject(message);
jsonObject.put("date",new Date());
if (jsonObject.get("to").equals("-1")) {
broadcast(message);
} else {
sendInfo(jsonObject.get("to").toString(), message);
}
}
/**
* 错误时调用
* @param session
* @param throwable
*/
@OnError
public void onError(Session session, Throwable throwable) {
log.error("发生错误",throwable);
}
public static ConcurrentHashMap<String, Session> getSessionPools() {
return sessionPools;
}
}
4、参考项目
https://gitee.com/shenzhanwang/Spring-websocket.git