SpringBoot集成WebSocket讲解

1 WebSocket

1.1 简介

WebSocket 协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端,建立客户端和服务器之间的通信渠道。浏览器和服务器仅需一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

image.png

1.2 WebSocket作用和调用

1.2.1 作用

HTTP 是基于请求响应式的,即通信只能由客户端发起,服务端做出响应,无状态,无连接:

  • 无状态:每次连接只处理一个请求,请求结束后断开连接。
  • 无连接:对于事务处理没有记忆能力,服务器不知道客户端是什么状态。

通过HTTP实现即时通讯,只能是页面轮询向服务器发出请求,服务器返回查询结果。轮询的效率低,非常浪费资源,因为必须不停连接,或者 HTTP 连接始终打开。
WebSocket的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话。

WebSocket 特点:

  • 建立在 TCP 协议之上,服务器端的实现比较容易。
  • HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
  • 数据格式比较轻量,性能开销小,通信高效。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务器通信。
  • 协议标识符是 ws(如果加密,则为wss),服务器网址就是 URL

1.2.2 js端调用

<script>
    var ws = new WebSocket('ws://localhost:8080/webSocket/10086');
    // 获取连接状态
    console.log('ws连接状态:' + ws.readyState);
    //监听是否连接成功
    ws.onopen = function () {
        console.log('ws连接状态:' + ws.readyState);
        //连接成功则发送一个数据
        ws.send('test1');
    }
    // 接听服务器发回的信息并处理展示
    ws.onmessage = function (data) {
        console.log('接收到来自服务器的消息:');
        console.log(data);
        //完成通信后关闭WebSocket连接
        ws.close();
    }
    // 监听连接关闭事件
    ws.onclose = function () {
        // 监听整个过程中websocket的状态
        console.log('ws连接状态:' + ws.readyState);
    }
    // 监听并处理error事件
    ws.onerror = function (error) {
        console.log(error);
    }
    function sendMessage() {
        var content = $("#message").val();
        $.ajax({
            url: '/socket/publish?userId=10086&message=' + content,
            type: 'GET',
            data: { "id": "7777", "content": content },
            success: function (data) {
                console.log(data)
            }
        })
    }
</script>

下面主要介绍三种方式:Javax,WebMVC,WebFlux,在Spring Boot中的服务端和客户端配置

1.3 Javax

java的扩展包javax.websocket中就定义了一套WebSocket的接口规范

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

1.3.1 服务端

1.3.1.1 服务端接收

一般使用注解的方式来进行配置

/**
 * html页面与之关联的接口
 * var reqUrl = "http://localhost:8081/websocket/" + cid;
 * socket = new WebSocket(reqUrl.replace("http", "ws"));
 */
@Component
@ServerEndpoint("/websocket/{type}")
public class JavaxWebSocketServerEndpoint {

    @OnOpen
    public void onOpen(Session session, EndpointConfig config,
                       @PathParam(value = "type") String type) {
        //连接建立
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //连接关闭
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        //接收文本信息
    }

    @OnMessage
    public void onMessage(Session session, PongMessage message) {
        //接收pong信息
    }

    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {
        //接收二进制信息,也可以用byte[]接收
    }

    @OnError
    public void onError(Session session, Throwable e) {
        //异常处理
    }
}

我们在类上添加 @ServerEndpoint注解来表示这是一个服务端点,同时可以在注解中配置路径,这个路径可以配置成动态的,使用{}包起来就可以了

  • @OnOpen:用来标记对应的方法作为客户端连接上来之后的回调,Session就相当于和客户端的连接了,我们可以把它缓存起来用于发送消息;通过@PathParam注解就可以获得动态路径中对应值了
  • @OnClose:用来标记对应的方法作为客户端断开连接之后的回调,我们可以在这个方法中移除对应Session的缓存,同时可以接受一个CloseReason的参数用于获取关闭原因
  • @OnMessage:用来标记对应的方法作为接收到消息之后的回调,我们可以接受文本消息,二进制消息和pong消息
  • @OnError:用来标记对应的方法作为抛出异常之后的回调,可以获得对应的Session和异常对象

1.3.1.2 服务端集成

@Configuration(proxyBeanMethods = false)
public class JavaxWebSocketConfiguration {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

依赖SpringWebSocket模块,手动注入ServerEndpointExporter就可以了
需要注意ServerEndpointExporterSpring中的类,算是Spring为了支持javax.websocket的原生用法所提供的支持类

javax.websocket 库中定义了PongMessage而没有PingMessage

通过测试发现基本上所有的WebSocket包括前端js自带的,都实现了自动回复;也就是说当接收到一个ping消息之后,是会自动回应一个pong消息,所以没有必要再自己接受ping消息来处理了,即我们不会接受到ping消息;
当然我上面讲的ping和pong都是需要使用框架提供的api,如果是我们自己通过Message来自定义心跳数据的话是没有任何的处理的,下面是对应的api

//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);

//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);

1.3.1.3 ping和pong消息

ping 消息pong 消息都是 WebSocket 协议中的特殊消息类型,用于进行心跳保活和检测 WebSocket 连接的健康状态。

  • ping 消息:由服务器端(或客户端)发送给对端的消息。它用于发起一个心跳检测请求,要求对端回复一个 pong 消息作为响应。ping 消息通常用于检测对端的连接是否仍然处于活动状态,以及测量网络延迟。
  • pong 消息:由对端(即客户端或服务器端)作为对 ping 消息的响应发送回来。它用于确认接收到 ping 消息,并表明连接仍然活跃。

当一方发送一个 ping 消息时,对端应该立即发送一个 pong 消息作为响应。通过交换 ping 和 pong 消息,可以检测连接是否仍然有效,以及测量网络的延迟时间。

ping 和 pong 消息通常由 WebSocket 底层协议处理,开发人员可以通过设置相应的参数来启用或禁用这些消息的交换。一般情况下,WebSocket 客户端和服务器都会自动处理 ping 和 pong 消息,无需开发人员显式地处理。ping 和 pong 消息是属于底层协议层

1.3.1.4 对象无法自动注入

使用了 @ServerEndpoint 注解的类中使用@Resource@Autowired注入对象都会失败,并且报空指针异常。

原因是WebSocket服务是线程安全的,那么当我们去发起一个ws连接时,就会创建一个端点对象。WebSocket服务是多对象的,不是单例的。而我们的SpringBean默认就是单例的,在非单例类中注入一个单例的Bean是冲突的。

或者说:
Spring管理采用单例模式(singleton),而 WebSocket 是多对象的,即每个客户端对应后台的一个 WebSocket 对象,也可以理解成 new 了一个 WebSocket,这样当然是不能获得自动注入的对象了,因为这两者刚好冲突。

@Autowired注解注入对象操作是在启动时执行的,而不是在使用时,而 WebSocket 是只有连接使用时才实例化对象,且有多个连接就有多个对象。所以我们可以得出结论,这个 Service 根本就没有注入到 WebSocket 当中。

如何解决呢?

使用静态对象,并且对外暴露set方法,这样在对象初始化的时候,将其注入到WebSocketServer中。

@ServerEndpoint("/websocket/{userId}")
@Component
@Slf4j
public class WebSocketServer {
  private static MessageStore messageStore;
  private static MessageSender messageSender;

  public static void setMessageStore(MessageStore messageStore) {
      WebSocketServer.messageStore = messageStore;
  }

  public static void setMessageSender(MessageSender messageSender) {
      WebSocketServer.messageSender = messageSender;
  }
}


@Slf4j
@Service
public class MessageStore {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @PostConstruct
    public void init() {
        WebSocketServer.setMessageStore(this);
    }
}

1.3.2 客户端

1.3.2.1 客户端接收

客户端也是使用注解配置

@ClientEndpoint
public class JavaxWebSocketClientEndpoint {

    @OnOpen
    public void onOpen(Session session) {
        //连接建立
    }

    @OnClose
    public void onClose(Session session, CloseReason reason) {
        //连接关闭
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        //接收文本消息
    }

    @OnMessage
    public void onMessage(Session session, PongMessage message) {
        //接收pong消息
    }

    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {
        //接收二进制消息
    }

    @OnError
    public void onError(Session session, Throwable e) {
        //异常处理
    }
}

客户端使用@ClientEndpoint来标记,其他的@OnOpen,@OnClose,@OnMessage,@OnError和服务端一模一样

1.3.2.2 客户端发送

WebSocketContainer container = ContainerProvider.getWebSocketContainer();
Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);

我们可以通过ContainerProvider来获得一个WebSocketContainer,然后调用connectToServer方法将我们的客户端类和连接的uri传入就行了

通过ContainerProvider#getWebSocketContainer获得WebSocketContainer其实是基于SPI实现的
Spring的环境中更推荐大家使用ServletContextAware来获得,代码如下

@Component
public class JavaxWebSocketContainer implements ServletContextAware {

    private volatile WebSocketContainer container;

    public WebSocketContainer getContainer() {
        if (container == null) {
            synchronized (this) {
                if (container == null) {
                    container = ContainerProvider.getWebSocketContainer();
                }
            }
        }
        return container;
    }

    @Override
    public void setServletContext(@NonNull ServletContext servletContext) {
        if (container == null) {
            container = (WebSocketContainer) servletContext
                .getAttribute("javax.websocket.server.ServerContainer");
        }
    }
}

发消息

Session session = container.connectToServer(JavaxWebSocketClientEndpoint.class, uri);

//发送文本消息
session.getAsyncRemote().sendText(String message);

//发送二进制消息
session.getAsyncRemote().sendBinary(ByteBuffer message);

//发送对象消息,会尝试使用Encoder编码
session.getAsyncRemote().sendObject(Object message);

//发送ping
session.getAsyncRemote().sendPing(ByteBuffer buffer);

//发送pong
session.getAsyncRemote().sendPong(ByteBuffer buffer);

1.4 WebMVC

pom依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

1.4.1 服务端

1.1.4.1 服务端接收

我们实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常

import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

public class ServletWebSocketServerHandler implements WebSocketHandler {

    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        //连接建立
    }

    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
        //接收消息
    }

    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        //异常处理
    }

    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
        //连接关闭
    }

    @Override
    public boolean supportsPartialMessages() {
        //是否支持接收不完整的消息
        return false;
    }
}

1.1.4.2 服务端集成

首先需要添加@EnableWebSocket来启用WebSocket
然后实现WebSocketConfigurer来注册WebSocket路径以及对应的WebSocketHandler

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry
            //添加处理器到对应的路径
            .addHandler(new ServletWebSocketServerHandler(), "/websocket")//注册Handler
            .setAllowedOrigins("*");
    }
}

1.1.4.3 服务器握手拦截

提供了HandshakeInterceptor来拦截握手

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        registry
            //添加处理器到对应的路径
            .addHandler(new ServletWebSocketServerHandler(), "/websocket")
            //添加握手拦截器
            .addInterceptors(new ServletWebSocketHandshakeInterceptor())
            .setAllowedOrigins("*");
    }
    
    public static class ServletWebSocketHandshakeInterceptor implements HandshakeInterceptor {

        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
            //握手之前
           if (request instanceof ServletServerHttpRequest) {
            String path = request.getURI().getPath();
            if(requestIsValid(path)){
                String[] params = getParams(path);
                attributes.put("WEBSOCKET_AUTH", params[0]);
                attributes.put("WEBSOCKET_PID", params[1]);
                attributes.put("WEBSOCKET_SN", params[2]);
                attributes.put("WEBSOCKET_OPENID", params[3]);
                attributes.put("WEBSOCKET_FIRSTONE","yes");
            }
        }
        System.out.println("================Before Handshake================");
        return true;
        }

        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
            //握手之后
            System.out.println("================After Handshake================");
            if(e!=null) e.printStackTrace();
            System.out.println("================After Handshake================");
        }
     
        private boolean requestIsValid(String url){
            //在这里可以写上具体的鉴权逻辑
            boolean isvalid = false;
            if(StringUtils.isNotEmpty(url)
                    && url.startsWith("/netgate/")
                    && url.split("/").length==6){
                isvalid = true;
            }
            return isvalid;
        }
    
        private String[] getParams(String url){
            url = url.replace("/netgate/","");
            return url.split("/");
        }

    }
}

1.1.4.4 服务器地址问题

当在集成的时候发现这种方式没办法动态匹配路径,它的路径就是固定的,没办法使用如/websocket/**这样的通配符

在研究了一下之后发现可以在UrlPathHelper上解决

@Configuration
@EnableWebSocket
public class ServletWebSocketServerConfigurer implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
        if (registry instanceof ServletWebSocketHandlerRegistry) {
            //替换UrlPathHelper
            ((ServletWebSocketHandlerRegistry) registry)
                .setUrlPathHelper(new PrefixUrlPathHelper("/websocket"));
        }

        registry
            //添加处理器到对应的路径
            .addHandler(new ServletWebSocketServerHandler(), "/websocket/**")
            .setAllowedOrigins("*");
    }
    
    public class PrefixUrlPathHelper extends UrlPathHelper {

        private String prefix;
        public PrefixUrlPathHelper(String prefix){this.prefix=prefix;}
        @Override
        public @NonNull String resolveAndCacheLookupPath(@NonNull HttpServletRequest request) {
            //获得原本的Path
            String path = super.resolveAndCacheLookupPath(request);
            //如果是指定前缀就返回对应的通配路径
            if (path.startsWith(prefix)) {
                return prefix + "/**";
            }
            return path;
        }
    }
}

因为它内部实际上就是用一个Map<String, WebSocketHandler>来存的,所以没有办法用通配符

1.4.2 客户端

1.4.2.1 客户端接收

和服务端一样我们需要先实现一个WebSocketHandler来处理WebSocket的连接,关闭,消息和异常

public class ServletWebSocketClientHandler implements WebSocketHandler {

    @Override
    public void afterConnectionEstablished(@NonNull WebSocketSession session) throws Exception {
        //连接建立
    }

    @Override
    public void handleMessage(@NonNull WebSocketSession session, @NonNull WebSocketMessage<?> message) throws Exception {
        //接收消息
    }

    @Override
    public void handleTransportError(@NonNull WebSocketSession session, @NonNull Throwable exception) throws Exception {
        //异常处理
    }

    @Override
    public void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus closeStatus) throws Exception {
        //连接关闭
    }

    @Override
    public boolean supportsPartialMessages() {
        //是否支持接收不完整的消息
        return false;
    }
}

1.4.2.2 客服端发送

WebSocketClient client = new StandardWebSocketClient();
WebSocketHandler handler = new ServletWebSocketClientHandler();
WebSocketConnectionManager manager = new WebSocketConnectionManager(client, handler, uri);
manager.start();

首先我们需要先new一个StandardWebSocketClient,可以传入一个WebSocketContainer参数,获得该对象的方式上面已经介绍过了,这边就先略过

然后new一个WebSocketConnectionManager传入WebSocketClientWebSocketHandler还有路径uri
最后调用一下WebSocketConnectionManagerstart方法就可以了

这里如果大家去看WebSocketClient的实现类就会发现有StandardWebSocketClient还有JettyWebSocketClient等等,所以大家可以根据自身项目所使用的容器来选择不同的WebSocketClient实现类

这里给大家贴一小段Spring适配不同容器WebSocket的代码

public abstract class AbstractHandshakeHandler implements HandshakeHandler, Lifecycle {

    private static final boolean tomcatWsPresent;

    private static final boolean jettyWsPresent;

    private static final boolean jetty10WsPresent;

    private static final boolean undertowWsPresent;

    private static final boolean glassfishWsPresent;

    private static final boolean weblogicWsPresent;

    private static final boolean websphereWsPresent;

    static {
        ClassLoader classLoader = AbstractHandshakeHandler.class.getClassLoader();
        tomcatWsPresent = ClassUtils.isPresent(
            "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader);
        jetty10WsPresent = ClassUtils.isPresent(
            "org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", classLoader);
        jettyWsPresent = ClassUtils.isPresent(
            "org.eclipse.jetty.websocket.server.WebSocketServerFactory", classLoader);
        undertowWsPresent = ClassUtils.isPresent(
            "io.undertow.websockets.jsr.ServerWebSocketContainer", classLoader);
        glassfishWsPresent = ClassUtils.isPresent(
            "org.glassfish.tyrus.servlet.TyrusHttpUpgradeHandler", classLoader);
        weblogicWsPresent = ClassUtils.isPresent(
            "weblogic.websocket.tyrus.TyrusServletWriter", classLoader);
        websphereWsPresent = ClassUtils.isPresent(
            "com.ibm.websphere.wsoc.WsWsocServerContainer", classLoader);
    }
}

发消息

import org.springframework.web.socket.*;

WebSocketSession session = ...

//发送文本消息
session.sendMessage(new TextMessage(CharSequence message);

//发送二进制消息
session.sendMessage(new BinaryMessage(ByteBuffer message));

//发送ping
session.sendMessage(new PingMessage(ByteBuffer message));

//发送pong
session.sendMessage(new PongMessage(ByteBuffer message));

1.5 WebFlux

WebFluxWebSocket 需要额外的依赖包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

注意WebSocket是响应式框架,不能和阻塞式框架连用比如spring-boot-starter-web,不然会报错:The Java/XML config for Spring MVC and Spring WebFlux cannot both be enabled, e.g. via @EnableWebMvc and @EnableWebFlux, in the same application

1.5.1 服务端

1.5.1.1 服务端发送接收

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocketServerHandler implements WebSocketHandler {

    @NonNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Mono<Void> send = session.send(Flux.create(sink -> {
            //可以持有sink对象在任意时候调用next发送消息
            sink.next(WebSocketMessage message);
        })).doOnError(it -> {
            //异常处理
        });

        Mono<Void> receive = session.receive()
                .doOnNext(it -> {
                    //接收消息
                })
                .doOnError(it -> {
                    //异常处理
                })
                .then();

        @SuppressWarnings("all")
        Disposable disposable = session.closeStatus()
                .doOnError(it -> {
                    //异常处理
                })
                .subscribe(it -> {
                    //连接关闭
                });

        return Mono.zip(send, receive).then();
    }
}

首先需要注意这里的WebSocketHandlerWebSocketSessionreactive包下的:

  • 通过WebSocketSession#send方法来持有一个FluxSink<WebSocketMessage>来用于发送消息
  • 通过WebSocketSession#receive来订阅消息
  • 通过WebSocketSession#closeStatus来订阅连接关闭事件

1.5.1.2 服务端集成

注入WebSocketHandlerAdapter

@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {

    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

注册一个HandlerMapping同时配置路径和对应的WebSocketHandler

@Order(Ordered.HIGHEST_PRECEDENCE)
@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {
    public ReactiveWebSocketServerHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/websocket/**", new ReactiveWebSocketServerHandler());
        setUrlMap(map);
        setOrder(100);
    }
}

注意:我们自定义的HandlerMapping需要设置order,如果不设置,默认为Ordered.LOWEST_PRECEDENCE,会导致这个HandlerMapping被放在最后,当有客户端连接上来时会被其他的HandlerMapping优先匹配上而连接失败

1.5.1.3 注解方式

使用 @MessageMapping@SendTo 注解来处理 WebSocket 通信
注解方式详解

  • @MessageMapping
    用于将特定的 WebSocket 消息路径映射到某个方法。类似于 HTTP 请求中的 @RequestMapping
    通常用于处理来自客户端的消息。在 WebSocket 连接建立后,客户端可以发送消息到指定路径,@MessageMapping 会将这些消息路由到相应的方法处理。
  • @SendTo
    用于将方法返回的消息发送到指定的目标路径,通常是广播给订阅此路径的所有连接客户端。
    适用于需要将处理结果发送回客户端或广播给多个客户端的情况。
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

@Controller
public class ChatController {

    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public String handleChatMessage(String message) {
        return "Server received: " + message;
    }
}

@MessageMapping("/chat"):将客户端发送到 /chat 路径的消息映射到 handleChatMessage 方法。
@SendTo("/topic/messages"):将方法返回的结果广播到 /topic/messages 路径,所有订阅此路径的客户端都会接收到消息。

1.5.2 客户端

1.5.2.1 客户端发送接收

客户端WebSocketHandler的写法和服务端的一样

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocketClientHandler implements WebSocketHandler {

    @NonNull
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        Mono<Void> send = session.send(Flux.create(sink -> {
            //可以持有sink对象在任意时候调用next发送消息
            sink.next(WebSocketMessage message);
        })).doOnError(it -> {
            //处理异常
        });

        Mono<Void> receive = session.receive()
                .doOnNext(it -> {
                    //接收消息
                })
                .doOnError(it -> {
                    //异常处理
                })
                .then();

        @SuppressWarnings("all")
        Disposable disposable = session.closeStatus()
                .doOnError(it -> {
                    //异常处理
                })
                .subscribe(it -> {
                    //连接关闭
                });

        return Mono.zip(send, receive).then();
    }
}

1.5.2.2 客户端发送

import org.springframework.web.reactive.socket.client.WebSocketClient;

WebSocketClient client = ReactorNettyWebSocketClient();
WebSocketHandler handler = new ReactiveWebSocketClientHandler();
client.execute(uri, handler).subscribe();

首先我们需要先new一个ReactorNettyWebSocketClient
然后调用一下WebSocketClientexecute方法传入路径uriWebSocketHandler并继续调用subscribe方法就可以了

注意WebFluxWebMVC 中的 WebSocketClient一样,Reactive包中的WebSocketClient也有很多实现类,比如ReactorNettyWebSocketClientJettyWebSocketClientUndertowWebSocketClientTomcatWebSocketClient 等等,也是需要大家基于自身项目的容器使用不同的实现类

这里也给大家贴一小段Reactive适配不同容器WebSocket的代码

public class HandshakeWebSocketService implements WebSocketService, Lifecycle {

    private static final boolean tomcatPresent;

    private static final boolean jettyPresent;

    private static final boolean jetty10Present;

    private static final boolean undertowPresent;

    private static final boolean reactorNettyPresent;

    static {
        ClassLoader loader = HandshakeWebSocketService.class.getClassLoader();
        tomcatPresent = ClassUtils.isPresent("org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", loader);
        jettyPresent = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.WebSocketServerFactory", loader);
        jetty10Present = ClassUtils.isPresent("org.eclipse.jetty.websocket.server.JettyWebSocketServerContainer", loader);
        undertowPresent = ClassUtils.isPresent("io.undertow.websockets.WebSocketProtocolHandshakeHandler", loader);
        reactorNettyPresent = ClassUtils.isPresent("reactor.netty.http.server.HttpServerResponse", loader);
    }
}

发消息
我们需要使用在WebSocketHandler中获得的FluxSink<WebSocketMessage>来发送消息

import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;

public class ReactiveWebSocket {

    private final WebSocketSession session;

    private final FluxSink<WebSocketMessage> sender;

    public ReactiveWebSocket(WebSocketSession session, FluxSink<WebSocketMessage> sender) {
        this.session = session;
        this.sender = sender;
    }

    public String getId() {
        return session.getId();
    }

    public URI getUri() {
        return session.getHandshakeInfo().getUri();
    }

    public void send(Object message) {
        if (message instanceof WebSocketMessage) {
            sender.next((WebSocketMessage) message);
        } else if (message instanceof String) {
            //发送文本消息
            sender.next(session.textMessage((String) message));
        } else if (message instanceof DataBuffer) {
            //发送二进制消息
            sender.next(session.binaryMessage(factory -> (DataBuffer) message));
        } else if (message instanceof ByteBuffer) {
            //发送二进制消息
            sender.next(session.binaryMessage(factory -> factory.wrap((ByteBuffer) message)));
        } else if (message instanceof byte[]) {
             //发送二进制消息
            sender.next(session.binaryMessage(factory -> factory.wrap((byte[]) message)));
        } else {
            throw new IllegalArgumentException("Message type not match");
        }
    }

    public void ping() {
        //发送ping
        sender.next(session.pingMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
    }

    public void pong() {
        //发送pong
        sender.next(session.pongMessage(factory -> factory.wrap(ByteBuffer.allocate(0))));
    }

    public void close(CloseStatus reason) {
        sender.complete();
        session.close(reason).subscribe();
    }
}

1.6 以Netty为基础的WebSocket

点击了解 Netty实现聊天通信功能

1.6.1 pom.xml

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.79.Final</version>
</dependency>

1.6.2 服务器

1.6.2.1 通道配置

package cn.websocket;

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.ImmediateEventExecutor;

import java.util.concurrent.ConcurrentHashMap;

public class NettyConfig {
    //定义全局channel,管理所有的channel
    private static volatile ChannelGroup channelGroup = null;

    //存放请求ID与channel的对应关系
    private static volatile ConcurrentHashMap<String, Channel> channelMap = null;

    //定义两把锁
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();

    public static ChannelGroup getChannelGroup() {
        if (null == channelGroup) {
            synchronized (lock1) {
                if (null == channelGroup) {
                    channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
                }
            }
        }
        return channelGroup;
    }

    public static ConcurrentHashMap<String, Channel> getChannelMap() {
        if (null == channelMap) {
            synchronized (lock2) {
                if (null == channelMap) {
                    channelMap = new ConcurrentHashMap<>();
                }
            }
        }
        return channelMap;
    }

    public static Channel getChannel(String userId) {
        if (null == channelMap) {
            return getChannelMap().get(userId);
        }
        return channelMap.get(userId);
    }
}

1.6.2.2 websocket服务

package cn.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Slf4j
@Component
public class NettyServer {
   
    @Autowired
    private WebSocketHandler webSocketHandler;
    @PostConstruct
    public void start() throws InterruptedException {
        new Thread(() -> {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                // bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作
                bootstrap.group(bossGroup, workGroup)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    // 为 bossGroup 添加 日志处理器
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 设置NIO类型的channel
                    .channel(NioServerSocketChannel.class);
                // 设置管道
                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 设置管道
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 流水线管理通道中的处理程序(Handler),用来处理业务
                        // webSocket协议本身是基于http协议的,所以这边也要使用http编解码器
                        pipeline.addLast(new HttpServerCodec());
                        // 以块的方式来写的处理器
                        pipeline.addLast(new ChunkedWriteHandler());
                        /**
                         *  1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
                         *  2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
                         */
                        pipeline.addLast(new HttpObjectAggregator(65536));
                        // 增加心跳支持
                        // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
                        //pipeline.addLast(new IdleStateHandler(60, 60, 60));
                        pipeline.addLast(new HeartBeatHandler());//自定义的心跳处理器

                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws",true));
                        // 自定义的handler,处理业务逻辑
                        pipeline.addLast(webSocketHandler);
                    }
                });
                // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
                ChannelFuture channelFuture = null;
                try {
                    channelFuture = bootstrap.bind(8000).sync();
                    log.info("Server started and listen on:{}", channelFuture.channel().localAddress());
                    // 对关闭通道进行监听
                    channelFuture.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }).start();
    }

}

1.6.2.3 自定义业务处理

package cn.websocket;

import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 在每个请求中生成唯一的 traceId
        MDC.put("traceId", IdUtil.simpleUUID());
        try {
            super.channelRead(ctx, msg);
        } finally {
            // 清理 MDC
            MDC.clear();
        }
    }
    /**
     * 一旦连接,第一个被执行
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());

    }

    /**
     * 读取数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("服务器收到消息:{}", msg.text());

        // 获取用户ID,关联channel
        JSONObject jsonObject = JSONUtil.parseObj(msg.text());
        String uid = jsonObject.getStr("uid");
        NettyConfig.getChannelMap().put(uid, ctx.channel());

        // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        ctx.channel().attr(key).setIfAbsent(uid);

        // 回复消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("用户下线了:{}", ctx.channel().id().asLongText());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("异常:{}", cause.getMessage());
        // 删除通道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeUserId(ctx);
        ctx.close();
    }

    /**
     * 删除用户与channel的对应关系
     */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getChannelMap().remove(userId);
    }
}

1.6.3 测试

ws://127.0.0.1:8000/ws/hello

image.png

1.7 WebFlux 和 starter-websocket 区别

Spring Boot 在集成 WebSocket 时,可以选择使用 WebFluxspring-boot-starter-websocket。这两者有不同的用途和实现方式,以下是它们的主要区别:

  • spring-boot-starter-websocket
    • Servlet APIspring-boot-starter-websocket 主要基于 Servlet API,适用于传统的阻塞式 IO 模型。
    • 依赖的服务器: 默认使用 Tomcat、Jetty 或 Undertow 作为嵌入式 Servlet 容器。这些容器是基于线程的阻塞式服务器。
    • 使用场景: 适合需要传统的同步编程模型的应用程序,并且可以轻松与现有的 Spring MVC 应用程序集成。
    • 简单配置: 配置相对简单,适合已经使用 Spring MVC 的项目。
    • 性能: 对于每个 WebSocket 连接,都需要一个线程来处理连接,可能会在大量连接时导致线程资源耗尽。
  • WebFlux
    • Reactive API:WebFlux 是 Spring 5 引入的响应式编程框架,基于非阻塞的 Reactive Streams API
    • 依赖的服务器: 默认使用 Reactor Netty,但也可以配置使用其他支持响应式的服务器,如 Jetty 或者 Undertow。
    • 使用场景: 适合需要处理大量并发连接的应用程序,比如聊天应用、大量客户端连接的实时更新等。
    • 高扩展性: 由于是非阻塞的,实现了更好的资源利用率和高并发处理能力。
    • 复杂度: 配置和编写代码可能会更复杂一些,需要开发者了解响应式编程的概念。
    • 性能: 非阻塞 IO,可以处理大量并发连接而不会轻易耗尽线程资源。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容