零、前戏
最近在“玩”demo的过程中,就一直好奇。插件信息等元数据,soul是怎么从soul-admin同步到soul-bootstrap中的。
本着大胆瞎猜,小心求证的心态,就有今天的关于websocket这篇文章。
一、服务端--soul-admin
- 首先从jar入手,soul-admin中的只有一个spring-boot-starter-websocket官方jar包。
猜测:服务端实现是在写在soul-admin项目内部
- 通过全文搜索“websocket”,找到了DataSyncConfiguration
涉及websocket的配置初始化代码如下:
与yml中的配置项对应
- 进入WebsocketCollector中一探究竟
@ServerEndpoint("/websocket")科普帖子:https://blog.csdn.net/weixin_39793752/article/details/80949128
@ServerEndpoint("/websocket")
public class WebsocketCollector {
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static final String SESSION_KEY = "sessionKey";
/**
* On open.
*
* @param session the session
*/
@OnOpen
public void onOpen(final Session session) {
log.info("websocket on open successful....");
SESSION_SET.add(session);
}
/**
* On message.
*
* @param message the message
* @param session the session
*/
@OnMessage
public void onMessage(final String message, final Session session) {
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
try {
ThreadLocalUtil.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally {
ThreadLocalUtil.clear();
}
}
}
/**
* On close.
*
* @param session the session
*/
@OnClose
public void onClose(final Session session) {
SESSION_SET.remove(session);
ThreadLocalUtil.clear();
}
/**
* On error.
*
* @param session the session
* @param error the error
*/
@OnError
public void onError(final Session session, final Throwable error) {
SESSION_SET.remove(session);
ThreadLocalUtil.clear();
log.error("websocket collection error: ", error);
}
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
try {
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if (session != null) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
}
@OnOpen:连接建立成功调用的方法
@OnClose:连接关闭调用方法
@OnMessage:收到客户端消息后调用的方法
@OnError:发送错误时调用
- 当服务端收到一条MYSELF类型的消息,将同步全量元数据信息
- admin 会推送一次全量数据,后续如果配置数据发生变更则将增量数据通过 websocket 主动推送给 soul-web
详细机制此处暂留坑,后文再续
二、客户端--soul-bootstrap
从pom文件入手,找到了soul-spring-boot-starter-sync-data-websocket中的WebsocketSyncDataConfiguration类
- 深入WebsocketSyncDataService
- 按照配置soul-admin的个数,创建线程池大小
- 通过线程池的定时任务,实现心跳机制----保持心跳
- /使用调度线程池进行断线重连,30秒进行一次。如果连接断开,将client.connectBlocking(3000, TimeUnit.MILLISECONDS);重连,重新发送MYSELF类型信息,进行全量更新。
线程池科普贴:https://blog.csdn.net/weixin_35756522/article/details/81707276
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
command:执行线程
initialDelay:初始化延时
period:两次开始执行最小间隔时间
unit:计时单位
- SoulWebsocketClient 是具体处理websocket类
- 在与服务端建立连接时,会发送MYSELF类型的消息。****如上文所述,服务端接收****MYSELF类****型消息后将发送全量消息用于同步。
小结:
- soul-admin与soul-bootstrap基本通信链路,基本捋顺。相关数据处理的细节,将在后续文章中浅析。
- 日拱一卒,每天进步一点点