SpringBoot WebSocket 服务器主动推送(一)

本文主要讲解SpringBoot 如何基于WebSocket 实现主动推送消息给用户

消息推送的业务逻辑为服务端开启WebSocket 服务,客户端通过建立长连接进入等待状态,服务器在合适的时候推送消息给客户端,最后客户端接受消息自行处理。话不多说,上关键代码。

服务端

  1. Maven 项目在pom.xml 里引入websocket 依赖。
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
  1. Boot 启动类
//开启WebSocket
@EnableWebSocket
@SpringBootApplication
public class Application implements WebSocketConfigurer {

    public static void main(String[] args) {
        new SpringApplicationBuilder(Application.class).bannerMode(Banner.Mode.OFF).run(args);
    }   

    //用请求的方式模拟推送消息的时候
    @GetMapping("notice")
    public String notice(String count) {
        counterHandler.sendMessageToUser(count, "当前时间是:" + new Date());
        return "已发送";
    }

    /**
     * 注册WebSocket处理类
     *
     * @param webSocketHandlerRegistry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
        //支持websocket 的 connection,指定counterHandler处理路径为/counter 的长连接请求
        webSocketHandlerRegistry.addHandler(counterHandler(), "/counter")
                //添加WebSocket握手请求的拦截器.
                .addInterceptors(new CounterHandler.CountHandshakeInterceptor());
        
        //不支持websocket的connenction,采用sockjs
        webSocketHandlerRegistry.addHandler(counterHandler(), "/sockjs/counter")
                //添加WebSocket握手请求的拦截器.
                .addInterceptors(new CounterHandler.CountHandshakeInterceptor()).withSockJS();
    }

    @Bean
    public CounterHandler counterHandler() {
        return new CounterHandler();
    }
}
  1. Socket处理类
public class CounterHandler extends TextWebSocketHandler {
    public static final String COLLECTOR = "collector";
    private static final List<WebSocketSession> COUNTS = new ArrayList<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        System.out.println("Connection established");
        COUNTS.add(session);        
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message)
            throws Exception {      
        System.out.println("Received:" + message.getPayload());     
    }

    /**
     * 给某个用户发送消息
     *
     * @param count
     * @param message
     */
    public void sendMessageToUser(String count, String message) {
        //遍历记录的session,取出符合条件的session发送消息
        for (WebSocketSession socketSession : COUNTS) {
            if (socketSession.getAttributes().get(COLLECTOR).equals(count)) {
                try {
                    if (socketSession.isOpen()) {
                        //最关键的一句,给客户端推送消息
                        socketSession.sendMessage(new TextMessage(message));                                        
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                COUNTS.remove(socketSession);
                break;
            }
        }
    }


    /**
     * 检查握手请求和响应, 对WebSocketHandler传递属性
     */
    public static class CountHandshakeInterceptor implements HandshakeInterceptor {
        /**
         * 在握手之前执行该方法, 继续握手返回true, 中断握手返回false.
         * 通过attributes参数设置WebSocketSession的属性
         *
         * @param request
         * @param response
         * @param wsHandler
         * @param attributes
         * @return
         * @throws Exception
         */
        @Override
        public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                       WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
            String collector = ((ServletServerHttpRequest) request).getServletRequest().getParameter(COLLECTOR);
            if (Strings.isNullOrEmpty(collector)) {
                return false;
            } else {
                attributes.put(COLLECTOR, collector);
                return true;
            }
        }

        /**
         * 在握手之后执行该方法. 无论是否握手成功都指明了响应状态码和相应头.
         *
         * @param request
         * @param response
         * @param wsHandler
         * @param exception
         */
        @Override
        public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
                                   WebSocketHandler wsHandler, Exception exception) {

        }
    }

}

客户端

public class WebSocketTest {
    //服务器WebSocket 连接地址
    private static final String WS_URI = "ws://localhost:8080/counter?collector=1";

    public static void main(String[] args) throws IOException, InterruptedException {
        StandardWebSocketClient client = new StandardWebSocketClient();
        WebSocketConnectionManager manager = new WebSocketConnectionManager(client, new MyHandler(), WS_URI);
        manager.start();
        Thread.sleep(100000);
    }

    private static class MyHandler extends TextWebSocketHandler {
        @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            System.out.println("connected...........");
            session.sendMessage(new TextMessage("hello, web socket"));
            super.afterConnectionEstablished(session);
        }

        @Override
        protected void handleTextMessage(WebSocketSession session, TextMessage message)
                throws Exception {
            System.out.println("receive: " + message.getPayload());
            super.handleTextMessage(session, message);
        }
    }
}

至此便完成了主要代码逻辑。先启动服务端,然后运行客户端建立WebSocket连接,接着在浏览器地址栏输入localhost:8080/notice?count=1,服务器便会找到对应的socketSession 对其进行推送消息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容