架构图
测试代码搭建
pom依赖
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- webSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
开启Websocket配置
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
}
WebsocketPool类
package com.chainter.rmblc.messaging.net;
import lombok.extern.java.Log;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.websocket.Session;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author ningbin 2019/12/18 13:45
* @Description:
*/
@Log
public class WebSocketPool {
// 当前在线人数
private static final AtomicInteger atomicNumber = new AtomicInteger(0);
// 当前Websocket session连接
private static Map<String, Session> onlineSession = new ConcurrentHashMap<>();
public static Integer addAtomicNumber(){
return atomicNumber.incrementAndGet();
}
public static Integer decrementNumber(){
return atomicNumber.decrementAndGet();
}
public static Integer getNumber(){
return atomicNumber.get();
}
public static void createOnlineSession(String userId,Session session){
onlineSession.put(userId,session);
}
public static Map<String,Session> getOnlineSession(){
return onlineSession;
}
public static Session getSesssionByUserId(String userId){
return Optional.ofNullable(onlineSession.get(userId)).orElse(null);
}
public static void removeSession(String userId){
Session session = onlineSession.get(userId);
if(ObjectUtils.isEmpty(session)){
return;
}
try {
session.close();
onlineSession.remove(userId);
} catch (IOException e) {
log.warning("关闭连接出现错误");
}
}
public static void send(){
onlineSession.values().stream().forEach(session -> {
try {
Date date = new Date();
long time = date.getTime();
String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
session.getBasicRemote().sendText("时间:"+dateString+",毫秒:"+time);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
ws连接类
@Component
@ServerEndpoint(value = "/WebSocketTest/{userId}")
@Log
public class WebsocketTest {
@OnOpen
public void onOpen(@PathParam("userId") String userId, Session session) {
Integer number = WebSocketPool.addAtomicNumber();
WebSocketPool.createOnlineSession(session.getId(),session);
// MessageTaskHandle.createTaskHandle(session,"服务器主动推送信息");
log.info("建立连接,当前人数:"+number);
}
@OnClose
public void onClose(@PathParam("userId") String userId, Session session){
WebSocketPool.removeSession(session.getId());
Integer number = WebSocketPool.decrementNumber();
log.info("用户"+userId+"关闭连接,当前人数:"+number);
}
@OnError
public void onError(@PathParam("userId") String userId, Session session,Throwable throwable){
WebSocketPool.removeSession(session.getId());
log.warning("WebSocket连接出现异常");
}
}
添加redis监听类
package com.chainter.rmblc.messaging.config;
import com.chainter.rmblc.messaging.handle.RedisListenerHandle;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
* @author ningbin 2019/12/19 18:20
* @Description:
*/
@Component
@Log
public class RedisListenerBean {
// application.yml中配置allWSName
@Value("${sub.channel.allWSName}")
private String allWSName;
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 监听msgToAll
container.addMessageListener(listenerAdapter, new PatternTopic(allWSName));
log.info("Subscribed Redis channel: " + allWSName);
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisListenerHandle redisListenerHandle){
return new MessageListenerAdapter(redisListenerHandle,"receiveMessage");
}
}
创建RedisListenerHandle监听消息处理类
package com.chainter.rmblc.messaging.handle;
import org.springframework.stereotype.Component;
/**
* @author ningbin 2019/12/20 10:07
* @Description:
*/
@Component
public class RedisListenerHandle {
public void receiveMessage(String message){
System.out.println("接收消息:"+message);
}
}