1 WebSocket
1.1 简介
WebSocket
协议是基于TCP
的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex
)通信——允许服务器主动发送信息给客户端,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
1.2 WebSocket作用和调用
1.2.1 作用
HTTP
是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接:
-
无状态
:每次连接只处理一个请求,请求结束后断开连接。 -
无连接
:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。
通过HTTP
实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP
连接始终打开。
WebSocket
的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息
,是真正的双向平等对话。
WebSocket
特点:
- 建立在
TCP
协议之上,服务器端的实现比较容易。 - 与
HTTP
协议有着良好的兼容性。默认端口也是80和443
,并且握手阶段采用HTTP
协议,因此握手时不容易屏蔽,能通过各种HTTP
代理服务器。 - 数据格式比较轻量,性能开销小,通信高效。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是
ws
(如果加密,则为wss
),服务器网址就是URL
1.2.2 js端调用
<script>
var ws = new WebSocket('ws://localhost:8080/webSocket/10086');
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//监听是否连接成功
ws.onopen = function () {
console.log('ws连接状态:' + ws.readyState);
//连接成功则发送一个数据
ws.send('test1');
}
// 接听服务器发回的信息并处理展示
ws.onmessage = function (data) {
console.log('接收到来自服务器的消息:');
console.log(data);
//完成通信后关闭WebSocket连接
ws.close();
}
// 监听连接关闭事件
ws.onclose = function () {
// 监听整个过程中websocket的状态
console.log('ws连接状态:' + ws.readyState);
}
// 监听并处理error事件
ws.onerror = function (error) {
console.log(error);
}
function sendMessage() {
var content = $("#message").val();
$.ajax({
url: '/socket/publish?userId=10086&message=' + content,
type: 'GET',
data: { "id": "7777", "content": content },
success: function (data) {
console.log(data)
}
})
}
</script>
下面主要介绍三种方式:Javax,WebMVC,WebFlux
,在Spring Boot
中的服务端和客户端配置
1.3 Javax
在java
的扩展包javax.websocket
中就定义了一套WebSocket
的接口规范
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1.3.1 服务端
1.3.1.1 服务端接收
一般使用注解的方式来进行配置
/**
* html页面与之关联的接口
* var reqUrl = "http://localhost:8081/websocket/" + cid;
* socket = new WebSocket(reqUrl.replace("http", "ws"));
*/
@Component
@ServerEndpoint("/websocket/{type}")
public class JavaxWebSocketServerEndpoint {
@OnOpen
public void onOpen(Session session, EndpointConfig config,
@PathParam(value = "type") String type) {
//连接建立
}
@OnClose
public void onClose(Session session, CloseReason reason) {
//连接关闭
}
@OnMessage
public void onMessage(Session session, String message) {
//接收文本信息
}
@OnMessage
public void onMessage(Session session, PongMessage message) {
//接收pong信息
}
@OnMessage
public void onMessage(Session session, ByteBuffer message) {
//接收二进制信息,也可以用byte[]接收
}
@OnError
public void onError(Session session, Throwable e) {
//异常处理
}
}
我们在类上添加 @ServerEndpoint
注解来表示这是一个服务端点,同时可以在注解中配置路径,这个路径可以配置成动态的,使用{}
包起来就可以了
-
@OnOpen
:用来标记对应的方法作为客户端连接上来之后的回调,Session
就相当于和客户端的连接了,我们可以把它缓存起来用于发送消息;通过@PathParam
注解就可以获得动态路径中对应值了 -
@OnClose
:用来标记对应的方法作为客户端断开连接之后的回调,我们可以在这个方法中移除对应Session
的缓存,同时可以接受一个CloseReason
的参数用于获取关闭原因 -
@OnMessage
:用来标记对应的方法作为接收到消息之后的回调,我们可以接受文本消息,二进制消息和pong消息 -
@OnError
:用来标记对应的方法作为抛出异常之后的回调,可以获得对应的Session
和异常对象
1.3.1.2 服务端集成
@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
依赖Spring
的WebSocket
模块,手动注入ServerEndpointExporter
就可以了
需要注意ServerEndpointExporter
是Spring
中的类,算是Spring
为了支持javax.websocket
的原生用法所提供的支持类
javax.websocket
库中定义了PongMessage
而没有PingMessage
通过测试发现基本上所有的WebSocket
包括前端js
自带的,都实现了自动回复;也就是说当接收到一个ping
消息之后,是会自动回应一个pong
消息,所以没有必要再自己接受ping
消息来处理了,即我们不会接受到ping
消息;
当然我上面讲的ping和pong
都是需要使用框架提供的api
,如果是我们自己通过Message
来自定义心跳数据的话是没有任何的处理的,下面是对应的api
//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);
//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.3.1.3 ping和pong消息
ping 消息
和 pong 消息
都是 WebSocket
协议中的特殊消息类型,用于进行心跳保活和检测 WebSocket
连接的健康状态。
-
ping 消息
:由服务器端(或客户端)发送给对端的消息。它用于发起一个心跳检测请求,要求对端回复一个pong
消息作为响应。ping
消息通常用于检测对端的连接是否仍然处于活动状态,以及测量网络延迟。 -
pong 消息
:由对端(即客户端或服务器端)作为对ping
消息的响应发送回来。它用于确认接收到ping
消息,并表明连接仍然活跃。
当一方发送一个 ping
消息时,对端应该立即发送一个 pong
消息作为响应。通过交换 ping 和 pong 消息
,可以检测连接是否仍然有效,以及测量网络的延迟时间。
ping 和 pong
消息通常由 WebSocket
底层协议处理,开发人员可以通过设置相应的参数来启用或禁用这些消息的交换。一般情况下,WebSocket
客户端和服务器都会自动处理 ping 和 pong
消息,无需开发人员显式地处理。ping 和 pong
消息是属于底层协议层
1.3.1.4 对象无法自动注入
使用了 @ServerEndpoint
注解的类中使用@Resource
或@Autowired
注入对象都会失败,并且报空指针异常。
原因是WebSocket
服务是线程安全的,那么当我们去发起一个ws
连接时,就会创建一个端点对象。WebSocket
服务是多对象的,不是单例的。而我们的Spring
的Bean
默认就是单例
的,在非单例类中注入一个单例的Bean是冲突的。
或者说:
Spring
管理采用单例模式(singleton
),而 WebSocket
是多对象的,即每个客户端对应后台的一个 WebSocket
对象,也可以理解成 new 了一个 WebSocket,这样当然是不能获得自动注入的对象了,因为这两者刚好冲突。
@Autowired
注解注入对象操作是在启动时执行的,而不是在使用时,而 WebSocket
是只有连接使用时才实例化对象,且有多个连接就有多个对象。所以我们可以得出结论,这个 Service 根本就没有注入到 WebSocket 当中。
如何解决呢?
使用静态对象,并且对外暴露set方法,这样在对象初始化的时候,将其注入到
WebSocketServer
中。
@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
private static MessageStore messageStore;
private static MessageSender messageSender;
public static void setMessageStore(MessageStore messageStore) {
WebSocketServer.messageStore = messageStore;
}
public static void setMessageSender(MessageSender messageSender) {
WebSocketServer.messageSender = messageSender;
}
}
@Slf4j
@Service
public class MessageStore {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void init() {
WebSocketServer.setMessageStore(this);
}
}
1.3.2 客户端
1.3.2.1 客户端接收
客户端也是使用注解配置
@ClientEndpoint
public class JavaxWebSocketClientEndpoint {
@OnOpen
public void onOpen(Session session) {
//连接建立
}
@OnClose
public void onClose(Session session, CloseReason reason) {
//连接关闭
}
@OnMessage
public void onMessage(Session session, String message) {
//接收文本消息
}
@OnMessage
public void onMessage(Session session, PongMessage message) {
//接收pong消息
}
@OnMessage
public void onMessage(Session session, ByteBuffer message) {
//接收二进制消息
}
@OnError
public void onError(Session session, Throwable e) {
//异常处理
}
}
客户端使用@ClientEndpoint
来标记,其他的@OnOpen,@OnClose,@OnMessage,@OnError
和服务端一模一样
1.3.2.2 客户端发送
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
我们可以通过ContainerProvider
来获得一个WebSocketContainer
,然后调用connectToServer
方法将我们的客户端类和连接的uri传入就行了
通过ContainerProvider#getWebSocketContainer
获得WebSocketContainer
其实是基于SPI
实现的
在Spring
的环境中更推荐大家使用ServletContextAware
来获得,代码如下
@Component
public class JavaxWebSocketContainer implements ServletContextAware {
private volatile WebSocketContainer container;
public WebSocketContainer getContainer() {
if (container == null) {
synchronized (this) {
if (container == null) {
container = ContainerProvider.getWebSocketContainer();
}
}
}
return container;
}
@Override
public void setServletContext(@NonNull ServletContext servletContext) {
if (container == null) {
container = (WebSocketContainer) servletContext
.getAttribute("javax.websocket.server.ServerContainer");
}
}
}
发消息
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);
//发送文本消息
session.getAsyncRemote().sendText(String message);
//发送二进制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);
//发送对象消息,会尝试使用Encoder编码
session.getAsyncRemote().sendObject(Object message);
//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);
//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);
1.4 WebMVC
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
1.4.1 服务端
1.1.4.1 服务端接收
我们实现一个WebSocketHandler
来处理WebSocket
的连接,关闭,消息和异常
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
public class ServletWebSocketServerHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
//连接建立
}
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
//接收消息
}
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
//异常处理
}
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
//连接关闭
}
@Override
public boolean supportsPartialMessages() {
//是否支持接收不完整的消息
return false;
}
}
1.1.4.2 服务端集成
首先需要添加@EnableWebSocket
来启用WebSocket
然后实现WebSocketConfigurer
来注册WebSocket
路径以及对应的WebSocketHandler
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
registry
//添加处理器到对应的路径
.addHandler(new ServletWebSocketServerHandler(), "/websocket")//注册Handler
.setAllowedOrigins("*");
}
}
1.1.4.3 服务器握手拦截
提供了HandshakeInterceptor
来拦截握手
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
registry
//添加处理器到对应的路径
.addHandler(new ServletWebSocketServerHandler(), "/websocket")
//添加握手拦截器
.addInterceptors(new ServletWebSocketHandshakeInterceptor())
.setAllowedOrigins("*");
}
public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
//握手之前
if (request instanceof ServletServerHttpRequest) {
String path = request.getURI().getPath();
if(requestIsValid(path)){
String[] params = getParams(path);
attributes.put("WEBSOCKET_AUTH", params[0]);
attributes.put("WEBSOCKET_PID", params[1]);
attributes.put("WEBSOCKET_SN", params[2]);
attributes.put("WEBSOCKET_OPENID", params[3]);
attributes.put("WEBSOCKET_FIRSTONE","yes");
}
}
System.out.println("================Before Handshake================");
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
//握手之后
System.out.println("================After Handshake================");
if(e!=null) e.printStackTrace();
System.out.println("================After Handshake================");
}
private boolean requestIsValid(String url){
//在这里可以写上具体的鉴权逻辑
boolean isvalid = false;
if(StringUtils.isNotEmpty(url)
&& url.startsWith("/netgate/")
&& url.split("/").length==6){
isvalid = true;
}
return isvalid;
}
private String[] getParams(String url){
url = url.replace("/netgate/","");
return url.split("/");
}
}
}
1.1.4.4 服务器地址问题
当在集成的时候发现这种方式没办法动态匹配路径,它的路径就是固定的,没办法使用如/websocket/**
这样的通配符
在研究了一下之后发现可以在UrlPathHelper
上解决
@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
if (registry instanceof ServletWebSocketHandlerRegistry) {
//替换UrlPathHelper
((ServletWebSocketHandlerRegistry) registry)
.setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));
}
registry
//添加处理器到对应的路径
.addHandler(new ServletWebSocketServerHandler(), "/websocket/**")
.setAllowedOrigins("*");
}
public class PrefixUrlPathHelper extends UrlPathHelper {
private String prefix;
public PrefixUrlPathHelper(String prefix){this.prefix=prefix;}
@Override
public @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {
//获得原本的Path
String path = super.resolveAndCacheLookupPath(request);
//如果是指定前缀就返回对应的通配路径
if (path.startsWith(prefix)) {
return prefix + "/**";
}
return path;
}
}
}
因为它内部实际上就是用一个Map<String, WebSocketHandler>
来存的,所以没有办法用通配符
1.4.2 客户端
1.4.2.1 客户端接收
和服务端一样我们需要先实现一个WebSocketHandler
来处理WebSocket
的连接,关闭,消息和异常
public class ServletWebSocketClientHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
//连接建立
}
@Override
public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
//接收消息
}
@Override
public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
//异常处理
}
@Override
public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
//连接关闭
}
@Override
public boolean supportsPartialMessages() {
//是否支持接收不完整的消息
return false;
}
}
1.4.2.2 客服端发送
WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();
首先我们需要先new一个StandardWebSocketClient
,可以传入一个WebSocketContainer
参数,获得该对象的方式上面已经介绍过了,这边就先略过
然后new一个WebSocketConnectionManager
传入WebSocketClient
,WebSocketHandler
还有路径uri
最后调用一下WebSocketConnectionManager
的start
方法就可以了
这里如果大家去看WebSocketClient
的实现类就会发现有StandardWebSocketClient
还有JettyWebSocketClient
等等,所以大家可以根据自身项目所使用的容器来选择不同的WebSocketClient
实现类
这里给大家贴一小段Spring适配不同容器WebSocket的代码
public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {
private static final boolean tomcatWsPresent;
private static final boolean jettyWsPresent;
private static final boolean jetty10WsPresent;
private static final boolean undertowWsPresent;
private static final boolean glassfishWsPresent;
private static final boolean weblogicWsPresent;
private static final boolean websphereWsPresent;
static {
ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
tomcatWsPresent = ClassUtils.isPresent(
"org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
jetty10WsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
jettyWsPresent = ClassUtils.isPresent(
"org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
undertowWsPresent = ClassUtils.isPresent(
"io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
glassfishWsPresent = ClassUtils.isPresent(
"org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);
weblogicWsPresent = ClassUtils.isPresent(
"weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
websphereWsPresent = ClassUtils.isPresent(
"com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
}
}
发消息
import org.springframework.web.socket.*;
WebSocketSession session = ...
//发送文本消息
session.sendMessage(new TextMessage(CharSequence message);
//发送二进制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));
//发送ping
session.sendMessage(new PingMessage(ByteBuffer message));
//发送pong
session.sendMessage(new PongMessage(ByteBuffer message));
1.5 WebFlux
WebFlux
的WebSocket
需要额外的依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
注意
:WebSocket
是响应式框架,不能和阻塞式框架连用比如spring-boot-starter-web
,不然会报错:The Java/XML config for Spring MVC and Spring WebFlux cannot both be enabled, e.g. via @EnableWebMvc and @EnableWebFlux, in the same application
1.5.1 服务端
1.5.1.1 服务端发送接收
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactiveWebSocketServerHandler implements WebSocketHandler {
@NonNull
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> send = session.send(Flux.create(sink -> {
//可以持有sink对象在任意时候调用next发送消息
sink.next(WebSocketMessage message);
})).doOnError(it -> {
//异常处理
});
Mono<Void> receive = session.receive()
.doOnNext(it -> {
//接收消息
})
.doOnError(it -> {
//异常处理
})
.then();
@SuppressWarnings("all")
Disposable disposable = session.closeStatus()
.doOnError(it -> {
//异常处理
})
.subscribe(it -> {
//连接关闭
});
return Mono.zip(send, receive).then();
}
}
首先需要注意这里的WebSocketHandler
和WebSocketSession
是reactive
包下的:
- 通过
WebSocketSession#send
方法来持有一个FluxSink<WebSocketMessage>
来用于发送消息 - 通过
WebSocketSession#receive
来订阅消息 - 通过
WebSocketSession#closeStatus
来订阅连接关闭事件
1.5.1.2 服务端集成
注入WebSocketHandlerAdapter
@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
注册一个HandlerMapping
同时配置路径和对应的WebSocketHandler
@Order(Ordered.HIGHEST_PRECEDENCE)
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {
public ReactiveWebSocketServerHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/websocket/**", new ReactiveWebSocketServerHandler());
setUrlMap(map);
setOrder(100);
}
}
注意
:我们自定义的HandlerMapping
需要设置order
,如果不设置,默认为Ordered.LOWEST_PRECEDENCE
,会导致这个HandlerMapping
被放在最后,当有客户端连接上来时会被其他的HandlerMapping
优先匹配上而连接失败
1.5.1.3 注解方式
使用 @MessageMapping
和 @SendTo
注解来处理 WebSocket 通信
注解方式详解
-
@MessageMapping
用于将特定的WebSocket
消息路径映射到某个方法。类似于 HTTP 请求中的@RequestMapping
。
通常用于处理来自客户端的消息。在WebSocket
连接建立后,客户端可以发送消息到指定路径,@MessageMapping
会将这些消息路由到相应的方法处理。 -
@SendTo
用于将方法返回的消息发送到指定的目标路径,通常是广播给订阅此路径的所有连接客户端。
适用于需要将处理结果发送回客户端或广播给多个客户端的情况。
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class ChatController {
@MessageMapping("/chat")
@SendTo("/topic/messages")
public String handleChatMessage(String message) {
return "Server received: " + message;
}
}
@MessageMapping("/chat")
:将客户端发送到 /chat 路径的消息映射到 handleChatMessage 方法。
@SendTo("/topic/messages")
:将方法返回的结果广播到 /topic/messages 路径,所有订阅此路径的客户端都会接收到消息。
1.5.2 客户端
1.5.2.1 客户端发送接收
客户端WebSocketHandler
的写法和服务端的一样
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactiveWebSocketClientHandler implements WebSocketHandler {
@NonNull
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> send = session.send(Flux.create(sink -> {
//可以持有sink对象在任意时候调用next发送消息
sink.next(WebSocketMessage message);
})).doOnError(it -> {
//处理异常
});
Mono<Void> receive = session.receive()
.doOnNext(it -> {
//接收消息
})
.doOnError(it -> {
//异常处理
})
.then();
@SuppressWarnings("all")
Disposable disposable = session.closeStatus()
.doOnError(it -> {
//异常处理
})
.subscribe(it -> {
//连接关闭
});
return Mono.zip(send, receive).then();
}
}
1.5.2.2 客户端发送
import org.springframework.web.reactive.socket.client.WebSocketClient;
WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();
首先我们需要先new一个ReactorNettyWebSocketClient
然后调用一下WebSocketClient
的execute
方法传入路径uri
和WebSocketHandler
并继续调用subscribe
方法就可以了
注意
:WebFlux
和 WebMVC
中的 WebSocketClient
一样,Reactive
包中的WebSocketClient
也有很多实现类,比如ReactorNettyWebSocketClient
,JettyWebSocketClient
,UndertowWebSocketClient
,TomcatWebSocketClient
等等,也是需要大家基于自身项目的容器使用不同的实现类
这里也给大家贴一小段Reactive适配不同容器WebSocket的代码
public class HandshakeWebSocketService implements WebSocketService, Lifecycle {
private static final boolean tomcatPresent;
private static final boolean jettyPresent;
private static final boolean jetty10Present;
private static final boolean undertowPresent;
private static final boolean reactorNettyPresent;
static {
ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
}
}
发消息
我们需要使用在WebSocketHandler
中获得的FluxSink<WebSocketMessage>
来发送消息
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactiveWebSocket {
private final WebSocketSession session;
private final FluxSink<WebSocketMessage> sender;
public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {
this.session = session;
this.sender = sender;
}
public String getId() {
return session.getId();
}
public URI getUri() {
return session.getHandshakeInfo().getUri();
}
public void send(Object message) {
if (message instanceof WebSocketMessage) {
sender.next((WebSocketMessage) message);
} else if (message instanceof String) {
//发送文本消息
sender.next(session.textMessage((String) message));
} else if (message instanceof DataBuffer) {
//发送二进制消息
sender.next(session.binaryMessage(factory -> (DataBuffer) message));
} else if (message instanceof ByteBuffer) {
//发送二进制消息
sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));
} else if (message instanceof byte[]) {
//发送二进制消息
sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));
} else {
throw new IllegalArgumentException("Message type not match");
}
}
public void ping() {
//发送ping
sender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
}
public void pong() {
//发送pong
sender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
}
public void close(CloseStatus reason) {
sender.complete();
session.close(reason).subscribe();
}
}
1.6 以Netty为基础的WebSocket
1.6.1 pom.xml
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.79.Final</version>
</dependency>
1.6.2 服务器
1.6.2.1 通道配置
package cn.websocket;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
public class NettyConfig {
//定义全局channel,管理所有的channel
private static volatile ChannelGroup channelGroup = null;
//存放请求ID与channel的对应关系
private static volatile ConcurrentHashMap<String, Channel> channelMap = null;
//定义两把锁
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static ChannelGroup getChannelGroup() {
if (null == channelGroup) {
synchronized (lock1) {
if (null == channelGroup) {
channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
}
}
}
return channelGroup;
}
public static ConcurrentHashMap<String, Channel> getChannelMap() {
if (null == channelMap) {
synchronized (lock2) {
if (null == channelMap) {
channelMap = new ConcurrentHashMap<>();
}
}
}
return channelMap;
}
public static Channel getChannel(String userId) {
if (null == channelMap) {
return getChannelMap().get(userId);
}
return channelMap.get(userId);
}
}
1.6.2.2 websocket服务
package cn.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class NettyServer {
@Autowired
private WebSocketHandler webSocketHandler;
@PostConstruct
public void start() throws InterruptedException {
new Thread(() -> {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
bootstrap.group(bossGroup, workGroup)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
// 为 bossGroup 添加 日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
// 设置NIO类型的channel
.channel(NioServerSocketChannel.class);
// 设置管道
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置管道
ChannelPipeline pipeline = socketChannel.pipeline();
// 流水线管理通道中的处理程序(Handler),用来处理业务
// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
pipeline.addLast(new HttpServerCodec());
// 以块的方式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(65536));
// 增加心跳支持
// 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
//pipeline.addLast(new IdleStateHandler(60, 60, 60));
pipeline.addLast(new HeartBeatHandler());//自定义的心跳处理器
pipeline.addLast(new WebSocketServerProtocolHandler("/ws",true));
// 自定义的handler,处理业务逻辑
pipeline.addLast(webSocketHandler);
}
});
// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.bind(8000).sync();
log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}).start();
}
}
1.6.2.3 自定义业务处理
package cn.websocket;
import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 在每个请求中生成唯一的 traceId
MDC.put("traceId", IdUtil.simpleUUID());
try {
super.channelRead(ctx, msg);
} finally {
// 清理 MDC
MDC.clear();
}
}
/**
* 一旦连接,第一个被执行
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
// 添加到channelGroup 通道组
NettyConfig.getChannelGroup().add(ctx.channel());
}
/**
* 读取数据
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
log.info("服务器收到消息:{}", msg.text());
// 获取用户ID,关联channel
JSONObject jsonObject = JSONUtil.parseObj(msg.text());
String uid = jsonObject.getStr("uid");
NettyConfig.getChannelMap().put(uid, ctx.channel());
// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
AttributeKey<String> key = AttributeKey.valueOf("userId");
ctx.channel().attr(key).setIfAbsent(uid);
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("用户下线了:{}", ctx.channel().id().asLongText());
// 删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("异常:{}", cause.getMessage());
// 删除通道
NettyConfig.getChannelGroup().remove(ctx.channel());
removeUserId(ctx);
ctx.close();
}
/**
* 删除用户与channel的对应关系
*/
private void removeUserId(ChannelHandlerContext ctx) {
AttributeKey<String> key = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(key).get();
NettyConfig.getChannelMap().remove(userId);
}
}
1.6.3 测试
1.7 WebFlux 和 starter-websocket 区别
Spring Boot
在集成 WebSocket
时,可以选择使用 WebFlux
或 spring-boot-starter-websocket
。这两者有不同的用途和实现方式,以下是它们的主要区别:
- spring-boot-starter-websocket
-
Servlet API
:spring-boot-starter-websocket
主要基于 Servlet API,适用于传统的阻塞式 IO 模型。 - 依赖的服务器: 默认使用 Tomcat、Jetty 或 Undertow 作为嵌入式 Servlet 容器。这些容器是基于线程的阻塞式服务器。
- 使用场景: 适合需要传统的同步编程模型的应用程序,并且可以轻松与现有的 Spring MVC 应用程序集成。
- 简单配置: 配置相对简单,适合已经使用 Spring MVC 的项目。
- 性能: 对于每个 WebSocket 连接,都需要一个线程来处理连接,可能会在大量连接时导致线程资源耗尽。
-
- WebFlux
-
Reactive API
:WebFlux 是 Spring 5 引入的响应式编程框架,基于非阻塞的Reactive Streams API
。 - 依赖的服务器: 默认使用 Reactor Netty,但也可以配置使用其他支持响应式的服务器,如 Jetty 或者 Undertow。
- 使用场景: 适合需要处理大量并发连接的应用程序,比如聊天应用、大量客户端连接的实时更新等。
- 高扩展性: 由于是非阻塞的,实现了更好的资源利用率和高并发处理能力。
- 复杂度: 配置和编写代码可能会更复杂一些,需要开发者了解响应式编程的概念。
- 性能: 非阻塞 IO,可以处理大量并发连接而不会轻易耗尽线程资源。
-