首先要说的是集群和高可用是两码事,集群就是多台服务器同时在工作;高可用是就一台服务器在工作,但崩溃了另一台顶上。对于websocket、nacos、spring boot、spring cloud gateway 等技术不在这做过多讲解。
网络拓扑图
- 所有服务注册到 nacos 中
- 前端 socket 注册会通过网关轮询到一个socket服务器进行socket握手,一旦前端刷新,会开一个新的socket session 注册到 其中一个 socket 服务器。
以上2步其实已经实现了最基本的 socket 集群搭建,最重要的是在于客户端和服务端数据的发送以及状态的管理,如谁注册到哪个 socket 服务器,要能找到,否则消息发给谁?
状态管理
状态管理可以使用 redis 来进行状态的管理,当用户第一次握手(上线)的时候,我们可以把该用户添加到 redis 中,当用户下线的时候,删除 redis 里的用户。
状态管理多用于聊天室业务的用户在线状态,以上流程中是前端发起请求获取状态,这种方式不是最好的,其实我们可以通过 socket 本身的事件,当用户注册session时,socket 有特定的事件能接收,接收到后可以获取该用户的其他好友并通知等。
数据发送
数据发送主要考虑接收消息的用户在哪个 socket 服务注册的 session,找到该注册用户的session有两种方式可以实现:
- A 发送消息给 B,消息往每个socket服务都发送一份,判断哪个socket服务持有B的session信息(广播)
- 通过Hash的方式,该方式要求用户向socket 注册的时候也是以 hash 的方式注册到某个 socket 服务。
第一种方式实现起来最简单,可以使用 Redis 的 pub/sub 方式,socket 服务订阅同一个地址,接收到消息后,判断socket服务器缓存中是否有该用户持有的session即可。
第二种方式实现起来稍微麻烦一些,我们需要重写网关的请求转发,当然我用的是 spring cloud gateway 和 nacos,本身就提供了 hash 转发的方式,但是就要求我的消息发送时的 hash 计算和 nacos 所提供的 hash 转算法一致,否则会出现问题。如果使用了这种方式,就需要通过服务之间的 RPC方式来控制 hash 的算法。也可以使用 rabbitmq,该方式会简单并好处多。
数据发送失败
如果用以上的 Redis 来做数据的传输是做不到数据不丢失的,Redis 的 pub/sub 是没有ACK机制的,不管socket服务发送成功失败都会删除该条数据。
我们可以使用 RabbitMQ ,RabbitMQ 是一款消息中间件,提供了生产 ACK 机制和消费 ACK 机制,能保证数据不丢失,当socket服务器发送失败的时候,数据还会保留在 RabbitMQ 中。
代码
这里只提供 websocket 代码,具体的消息的传输方式请大家自研,思路已经告诉大家了,只要你学过这些东西就一定能搭建出来。
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
SSHSocketConfig
@Configuration
@EnableWebSocket
public class SSHSocketConfig implements WebSocketConfigurer {
@Resource
WebSSHSocketHandler webSSHSocketHandler;
@Resource
WebSocketInterceptor webSocketInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
webSocketHandlerRegistry.addHandler(webSSHSocketHandler, "/webSocket")
.addInterceptors(webSocketInterceptor)
.setAllowedOrigins("*");
}
}
WebSocketInterceptor
@Slf4j
@Component
public class WebSocketInterceptor implements HandshakeInterceptor {
/**
* 握手前
* @param request
* @param response
* @param wsHandler
* @param attributes
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
// 获得请求参数
Map<String, String> paramMap = HttpUtil.decodeParamMap(request.getURI().getQuery(), Charset.forName("utf-8"));
String uid = paramMap.get("token");
if (StrUtil.isNotBlank(uid)) {
attributes.put("token", uid);
return true;
}
log.error("用户登录已失效");
return false;
}
/**
* 握手后
*
* @param request
* @param response
* @param wsHandler
* @param exception
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
log.debug("握手完成");
}
}
WebSSHSocketHandler
@Component
@Slf4j
public class WebSSHSocketHandler extends TextWebSocketHandler {
private ObjectMapper objectMapper = new ObjectMapper();
/**
* socket 建立成功事件
* @param webSocketSession
* @throws Exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
Object token = webSocketSession.getAttributes().get("token");
if (token != null) {
// 用户连接成功,放入在线用户缓存
WsSessionManager.add(token.toString(), webSocketSession);
log.debug("用户 account {} 握手成功!",token.toString());
// WebSocketVO webSocketVO = new WebSocketVO();
// webSocketVO.setTopic(SocketConst.INDEX_WELCOME.getTopic());
// webSocketVO.setData(SocketConst.INDEX_WELCOME.getMsg());
// webSocketSession.sendMessage(new TextMessage(objectMapper.writeValueAsString(webSocketVO)));
} else {
throw new RuntimeException("用户登录已经失效!");
}
}
/**
* 接收消息事件
* @param webSocketSession
* @param webSocketMessage
* @throws Exception
*/
@Override
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage webSocketMessage) throws Exception {
// 获得客户端传来的消息
// 消息内容
String payload = webSocketMessage.getPayload().toString();
// 检查session
Object token = webSocketSession.getAttributes().get("token");
}
/**
* socket 断开连接时
* @param webSocketSession
* @param closeStatus
* @throws Exception
*/
@Override
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
Object token = webSocketSession.getAttributes().get("token");
if (token != null) {
// 用户退出,移除缓存
WsSessionManager.removeAndClose(token.toString());
}
log.debug("{} 用户离开",token.toString());
}
}
WsSessionManager
@Slf4j
public class WsSessionManager {
/**
* 保存连接 session 的地方
*/
private static Map<String, WebSocketSession> SESSION_POOL = new ConcurrentHashMap<>();
/**
* 添加 session
* @param key
*/
public static void add(String key, WebSocketSession session) {
// 添加 session
SESSION_POOL.put(key, session);
}
/**
* 删除 session,会返回删除的 session
*
* @param key
* @return
*/
public static WebSocketSession remove(String key) {
// 删除 session
return SESSION_POOL.remove(key);
}
/**
* 删除并同步关闭连接
*
* @param key
*/
public static void removeAndClose(String key) {
WebSocketSession session = remove(key);
if (session != null) {
try {
// 关闭连接
session.close();
} catch (IOException e) {
// todo: 关闭出现异常处理
e.printStackTrace();
}
}
}
/**
* 获得 session
* @param key
* @return
*/
public static WebSocketSession get(String key) {
// 获得 session
return SESSION_POOL.get(key);
}
public static void sendToUser(String id,String json) throws IOException {
WebSocketSession webSocketSession = get(id);
if(!webSocketSession.isOpen()){
return;
}
synchronized (id) {
webSocketSession.sendMessage(new TextMessage(json));
}
}
public static void sendToUserGroup(String id,String json) throws IOException {
String groupKey = id.split("-")[1];
Set<String> set = SESSION_POOL.keySet();
Iterator<String> iterator = set.iterator();
while (iterator.hasNext()){
String key = iterator.next();
if(key.endsWith("-"+groupKey)){
WebSocketSession webSocketSession = get(key);
if(!webSocketSession.isOpen()){
continue;
}
synchronized (key) {
webSocketSession.sendMessage(new TextMessage(json));
}
}
}
}
public static void sendToAll(String id,String json) throws IOException {
Set<String> set = SESSION_POOL.keySet();
Iterator<String> iterator = set.iterator();
while (iterator.hasNext()){
String key = iterator.next();
WebSocketSession webSocketSession = get(key);
if(!webSocketSession.isOpen()){
continue;
}
synchronized (key) {
webSocketSession.sendMessage(new TextMessage(json));
}
}
}
}
前端代码
import { getSocketKey } from '@/utils/auth'
export default{
debug: false,
connection: null,
init(bus){
let protocol = 'ws://';
if (window.location.protocol == 'https:') {
protocol = 'wss://';
}
if (!window.WebSocket) {
//否则报错
this.console("不支持 socket 连接")
return null;
}
let endpoint = protocol+'127.0.0.1:8080/monitor-socket/webSocket?token='+getSocketKey();
this.connection = new WebSocket(endpoint);
// 打开连接
this.connection.onopen = () => {
this.console("连接打开成功")
};
// 接收消息
this.connection.onmessage = (evt) => {
this.console(evt)
let json = JSON.parse(evt.data);
bus.emit(json.topic,json);
};
// 关闭连接
this.connection.onclose = (evt) => {
this.console(evt)
};
// 连接错误
this.connection.onerror = (evt) => {
this.console(evt)
};
},
send(msg){
this.connection.send(JSON.stringify(msg));
},
close(){
this.connection.close()
},
console(msg){
if(this.debug){
console.log(msg)
}
}
}
webSocket 性能测试
测试代码
可以看出来我是给用户组发送消息,并且保证有一个用户连线,否则则是没有意义,我的最主要的是socket往外发的能力。
配置
属性 | 值 |
---|---|
内存 | 16G |
内核 | 8 |
CPU | 16 |
CPU频率 | 2.90 GHz |
单机性能测试结果
测试工具 JMeter
线程 | 时间(s) | 真实运行时间(s) | 循环次数 | 异常率 | 吞吐量 |
---|---|---|---|---|---|
1000 | 10 | 9 | 1 | 0 | 100.1/s |
1000 | 5 | 5 | 1 | 0 | 200.1/s |
1000 | 1 | 1 | 1 | 0 | 1000.0/s |
5000 | 10 | 9 | 1 | 0 | 500.2/s |
5000 | 5 | 5 | 1 | 0 | 1000.0/s |
5000 | 1 | 1 | 1 | 0 | 4071.7/s |
5000 | 1 | 1 | 1 | 25.54% | 3090.2/s |
时间的意思是,固定时间内,运行完这些线程,真实运行时间,就是开辟这么多线程实际使用时间。出现异常率后就不需要测试了,同时并发请求5000以下问题不大。