Spring Springboot实现websocket通讯-2

特别说明:1. 本文基于Springboot spring-boot-starter-parent 1.5.1.RELEASE编码,在不同的版本中部分方法有区别。2. 因为博客字数限制,拆分成了两篇文章
第一篇地址:https://www.jianshu.com/p/4762494d42f1
第二篇地址:https://www.jianshu.com/p/9103c9c7e128


前面两种建立websocket通讯,不管是用javax的包还是spring的包都是用的比较底层的协议,下面我们来看看用上层的STOMP来建立websocket通讯

SockJs+Spring-WebSocket时,由于SockJs与Spring WebSocket之间采用JSON通讯,需要引入jackson 2的相关jar包
        <!-- jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.6.3</version>
        </dependency>

前面已经提到了STOMP是一个上层协议,STOMP 在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义。
STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧:

SEND
destination:/app/marco
content-length:20
 
{\"message\":\"hello word!\"}
  1. SEND:STOMP命令,表明会发送一些内容;
  2. destination:头信息,用来表示消息发送到哪里;
  3. content-length:头信息,用来表示 负载内容的 大小;
  4. 空行:
  5. 帧内容(负载)内容

要使用STOMP 通讯,服务端,和客户端都必须支持,服务端的准备步骤

服务端准备工作

  1. 我们已经配置了STOMP通讯的配置类 WebSocketStompConfig

  2. 配置了WebSocketChannelInterceptor 和 WebSocketHandshakeInterceptor 两个自定义拦截器

  3. 一个WebSocketStompController 用于接收客户端消息和响应客户端

  4. 一个简单的MVC controller 用于跳转websocket 页面

在Spring中启用STOMP通讯不用我们自己去写原生态的帧,spring的消息功能是基于代理模式构建,其实说得复杂,都是封装好了的,如果需要开启SOMP,只需要在websocket配置类上使用@EnableWebSocketMessageBroker (注解的作用为能够在 WebSocket 上启用 STOMP),并实现WebSocketMessageBrokerConfigurer接口,有些教程在这一步会继承AbstractWebSocketMessageBrokerConfigurer 类,我们看一下AbstractWebSocketMessageBrokerConfigurer类的源码,可以看到都是空方法,也是实现的接口,这里推荐自己实现接口,因为官方API上AbstractWebSocketMessageBrokerConfigurer已经标记为废弃

image.png

AbstractWebSocketMessageBrokerConfigurer 抽象类

public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
    public AbstractWebSocketMessageBrokerConfigurer() {
    }

    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
    }

    public void configureClientInboundChannel(ChannelRegistration registration) {
    }

    public void configureClientOutboundChannel(ChannelRegistration registration) {
    }

    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        return true;
    }

    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
    }

    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
    }

    public void configureMessageBroker(MessageBrokerRegistry registry) {
    }
}

WebSocketMessageBrokerConfigurer接口

public interface WebSocketMessageBrokerConfigurer {

    // 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
    void registerStompEndpoints(StompEndpointRegistry var1);

    // 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
    void configureWebSocketTransport(WebSocketTransportRegistration var1);

    // 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
    void configureClientInboundChannel(ChannelRegistration var1);
    
    // 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
    void configureClientOutboundChannel(ChannelRegistration var1);

    // 添加自定义的消息转换器,spring 提供多种默认的消息转换器,返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
    boolean configureMessageConverters(List<MessageConverter> var1);

    // 配置消息代理,哪种路径的消息会进行代理处理
    void configureMessageBroker(MessageBrokerRegistry var1);
    
    // 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
    void addArgumentResolvers(List<HandlerMethodArgumentResolver> var1);

    // 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
    void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> var1);
}

在registerStompEndpoints 方法中,我们可以设置websocket服务的地址,同样,我们也可以根据自身业务需求,去添加拦截器,例如前文我们写的WebSocketHandshakeInterceptor拦截器,可以获取到httpsession,同样,当我们把信息存入map 后,都可以通过通过WebSocketSession的getAttributes()下提供get方法获取

    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
     * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     * 
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry)
    {
        
        /*
         * 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
         *    用户连接了这个端点后就可以进行websocket通讯,支持socketJs
         * 2. setAllowedOrigins("*")表示可以跨域
         * 3. withSockJS()表示支持socktJS访问
         * 4. 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
         */
        registry.addEndpoint("/stomp/websocketJS")
                .setAllowedOrigins("*")
                .withSockJS()
                .setInterceptors(new WebSocketHandshakeInterceptor())
        ;

        /*
         * 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
         * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
         * 所以可以添加多个端点
         */
        registry.addEndpoint("/stomp/websocket");
    }

如果我们业务关心,用户的数量,在线数量,连接状况等数据,我们也可以通过ChannelRegistration对象的setInterceptors方法添加监听,这里先展示一个完整的实现类,监听接口在后面会介绍,代码中的WebSocketHandshakeInterceptor 拦截器,是上一个例子已经实现的,用于存储httpsession,WebSocketChannelInterceptor 拦截器 ,在这个拦截器中可以做一些在线人数统计等操作,后面会介绍

package com.wzh.demo.websocket.config;

import com.wzh.demo.websocket.handler.MyPrincipalHandshakeHandler;
import com.wzh.demo.websocket.interceptor.WebSocketChannelInterceptor;
import com.wzh.demo.websocket.interceptor.WebSocketHandshakeInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

import java.util.List;

/**
 * <配置基于STOMP的websocket>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-12 18:38
 * @see [相关类/方法] (可选)
 **/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
     * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     * 
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry)
    {
        
        /*
         * 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
         *    用户连接了这个端点后就可以进行websocket通讯,支持socketJs
         * 2. setAllowedOrigins("*")表示可以跨域
         * 3. withSockJS()表示支持socktJS访问
         * 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
         * 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息
         */
        registry.addEndpoint("/stomp/websocketJS")
                //.setAllowedOrigins("*")
                .addInterceptors(new WebSocketHandshakeInterceptor())
                .setHandshakeHandler(new MyPrincipalHandshakeHandler())
                .withSockJS()

        ;

        /*
         * 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
         * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
         * 所以可以添加多个端点
         */
        registry.addEndpoint("/stomp/websocket");
    }

    /**
     * 配置消息代理
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry)
    {
        /*
         *  enableStompBrokerRelay 配置外部的STOMP服务,需要安装额外的支持 比如rabbitmq或activemq
         * 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
         *    我们就可以在配置的域上向客户端推送消息
         * 3. 可以通过 setRelayHost 配置代理监听的host,默认为localhost
         * 4. 可以通过 setRelayPort 配置代理监听的端口,默认为61613
         * 5. 可以通过 setClientLogin 和 setClientPasscode 配置账号和密码
         * 6. setxxx这种设置方法是可选的,根据业务需要自行配置,也可以使用默认配置
         */
        //registry.enableStompBrokerRelay("/topicTest","/userTest")
                //.setRelayHost("rabbit.someotherserver")
                //.setRelayPort(62623);
                //.setClientLogin("userName")
                //.setClientPasscode("password")
                //;

        // 自定义调度器,用于控制心跳线程
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 线程池线程数,心跳连接开线程
        taskScheduler.setPoolSize(1);
        // 线程名前缀
        taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
        // 初始化
        taskScheduler.initialize();

        /*
         * spring 内置broker对象
         * 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
         *    我们就可以在配置的域上向客户端推送消息
         * 2,进行心跳设置,第一值表示server最小能保证发的心跳间隔毫秒数, 第二个值代码server希望client发的心跳间隔毫秒数
         * 3. 可以配置心跳线程调度器 setHeartbeatValue这个不能单独设置,不然不起作用,要配合setTaskScheduler才可以生效
         *    调度器我们可以自己写一个,也可以自己使用默认的调度器 new DefaultManagedTaskScheduler()
         */
        registry.enableSimpleBroker("/topicTest","/userTest")
                .setHeartbeatValue(new long[]{10000,10000})
                .setTaskScheduler(taskScheduler);
        /*
         *  "/app" 为配置应用服务器的地址前缀,表示所有以/app 开头的客户端消息或请求
         *  都会路由到带有@MessageMapping 注解的方法中
         */
        registry.setApplicationDestinationPrefixes("/app");

        /*
         *  1. 配置一对一消息前缀, 客户端接收一对一消息需要配置的前缀 如“'/user/'+userid + '/message'”,
         *     是客户端订阅一对一消息的地址 stompClient.subscribe js方法调用的地址
         *  2. 使用@SendToUser发送私信的规则不是这个参数设定,在框架内部是用UserDestinationMessageHandler处理,
         *     而不是而不是 AnnotationMethodMessageHandler 或  SimpleBrokerMessageHandler
         *     or StompBrokerRelayMessageHandler,是在@SendToUser的URL前加“user+sessionId"组成
         */
        registry.setUserDestinationPrefix("/user");

        /*
         * 自定义路径分割符
         * 注释掉的这段代码添加的分割符为. 分割是类级别的@messageMapping和方法级别的@messageMapping的路径
         * 例如类注解路径为 “topic”,方法注解路径为“hello”,那么客户端JS stompClient.send 方法调用的路径为“/app/topic.hello”
         * 注释掉此段代码后,类注解路径“/topic”,方法注解路径“/hello”,JS调用的路径为“/app/topic/hello”
         */
        //registry.setPathMatcher(new AntPathMatcher("."));

    }

    /**
     * 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        /*
         * 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节
         * 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节
         * 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒
         */
        registration.setMessageSizeLimit(10240)
                    .setSendBufferSizeLimit(10240)
                    .setSendTimeLimit(10000);
    }

    /**
     * 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        /*
         * 配置消息线程池
         * 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务
         * 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程
         * 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒
         */
        registration.taskExecutor().corePoolSize(10)
                    .maxPoolSize(20)
                    .keepAliveSeconds(60);
        /*
         * 添加stomp自定义拦截器,可以根据业务做一些处理
         * springframework 4.3.12 之后版本此方法废弃,代替方法 interceptors(ChannelInterceptor... interceptors)
         * 消息拦截器,实现ChannelInterceptor接口
         */
        registration.setInterceptors(webSocketChannelInterceptor());
    }

    /**
     *设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     * @param registration
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(10)
                    .maxPoolSize(20)
                    .keepAliveSeconds(60);
        //registration.setInterceptors(new WebSocketChannelInterceptor());
    }

    /**
     * 添加自定义的消息转换器,spring 提供多种默认的消息转换器,
     * 返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
     * @param list
     * @return
     */
    @Override
    public boolean configureMessageConverters(List<MessageConverter> list) {
        return true;
    }

    /**
     * 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
     * @param list
     */
    @Override
    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> list) {

    }

    /**
     * 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
     * @param list
     */
    @Override
    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> list) {

    }

    /**
     * 拦截器加入 spring ioc容器
     * @return
     */
    @Bean
    public WebSocketChannelInterceptor webSocketChannelInterceptor()
    {
        return new WebSocketChannelInterceptor();
    }

}


WebSocketChannelInterceptor 的实现步骤

如果需要添加监听,我们的监听类需要实现ChannelInterceptor接口,在 springframework包5.0.7之前这一步我们一般是实现ChannelInterceptorAdapter 抽象类,不过这个类已经废弃了,文档也推荐直接实现接口。

image.png

首先我们看一下,ChannelInterceptor 哪些方法

package org.springframework.messaging.support;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public interface ChannelInterceptor {
    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    Message<?> preSend(Message<?> var1, MessageChannel var2);

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    void postSend(Message<?> var1, MessageChannel var2, boolean var3);

    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    void afterSendCompletion(Message<?> var1, MessageChannel var2, boolean var3, Exception var4);

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    boolean preReceive(MessageChannel var1);

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    Message<?> postReceive(Message<?> var1, MessageChannel var2);

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    void afterReceiveCompletion(Message<?> var1, MessageChannel var2, Exception var3);
}

上面有说道,在ChannelInterceptor接口中的preSend能在消息发送前做一些处理,例如可以获取到用户登录的唯一token令牌,这里的令牌是我们业务传递给客户端的,例如用户在登录成功后跳转到websocket建立连接的页面,我们后台生成的一个标识符,客户端在和服务端建立websocket连接的时候,我们可以从消息头中获取到这种业务参数,并做一系列后续处理,如果要做这种业务操作,我们还需要一个Authentication对象,这个对象是我们自己写的,这个类必须实现java.security.Principal,这里只是做一个简单的token存储,可以根据实际的业务 逻辑进行扩展。

import java.security.Principal;

/**
 * <websocket登录连接对象>
 * <用于保存websocket连接过程中需要存储的业务参数>
 * @author wzh
 * @version 2018-08-26 23:30
 * @see [相关类/方法] (可选)
 **/
public class WebSocketUserAuthentication implements Principal{

    /**
     * 用户身份标识符
     */
    private String token;

    public WebSocketUserAuthentication(String token) {
        this.token = token;
    }

    public WebSocketUserAuthentication() {
    }

    /**
     * 获取用户登录令牌
     * @return
     */
    @Override
    public String getName() {
        return token;
    }
}

一个消息头拦截器,用于获取用户的认证信息


package com.wzh.demo.websocket.handler;

import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.util.Map;

/**
 * <设置认证用户信息>
 * <功能详细描述>
 * @author wzh
 * @version 2018-09-18 23:55
 * @see [相关类/方法] (可选)
 **/
public class MyPrincipalHandshakeHandler extends DefaultHandshakeHandler{

    private static final Logger log = Logger.getLogger(MyPrincipalHandshakeHandler.class);

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {

        HttpSession httpSession = getSession(request);
        // 获取登录的信息,就是controller 跳转页面存的信息,可以根据业务修改
        String user = (String)httpSession.getAttribute("loginName");

        if(StringUtils.isEmpty(user)){
            log.error("未登录系统,禁止登录websocket!");
            return null;
        }
        log.info(" MyDefaultHandshakeHandler login = " + user);
        return new WebSocketUserAuthentication(user);
    }

    private HttpSession getSession(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            return serverRequest.getServletRequest().getSession(false);
        }
        return null;
    }

}


下面我们做个拦截器,在preSend方法中获取封装首次登陆后的令牌信息,在postSend方法中统计在线人数

WebSocketChannelInterceptor 拦截登录时消息头中的信息
package com.wzh.demo.websocket.interceptor;

import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;

import javax.servlet.http.HttpSession;

import static org.springframework.messaging.simp.stomp.StompCommand.CONNECT;

/**
 * <websocke消息监听,用于监听websocket用户连接情况>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-25 23:39
 * @see [相关类/方法] (可选)
 **/
public class WebSocketChannelInterceptor implements ChannelInterceptor {

    public WebSocketChannelInterceptor() {
    }

    Logger log = Logger.getLogger(WebSocketChannelInterceptor.class);

    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {

        StompHeaderAccessor accessor =  MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        /**
         * 1. 判断是否为首次连接请求,如果已经连接过,直接返回message
         * 2. 网上有种写法是在这里封装认证用户的信息,本文是在http阶段,websockt 之前就做了认证的封装,所以这里直接取的信息
         */
        if(StompCommand.CONNECT.equals(accessor.getCommand()))
        {
            /*
             * 1. 这里获取就是JS stompClient.connect(headers, function (frame){.......}) 中header的信息
             * 2. JS中header可以封装多个参数,格式是{key1:value1,key2:value2}
             * 3. header参数的key可以一样,取出来就是list
             * 4. 样例代码header中只有一个token,所以直接取0位
             */
            String token = accessor.getNativeHeader("token").get(0);

            /*
             * 1. 这里直接封装到StompHeaderAccessor 中,可以根据自身业务进行改变
             * 2. 封装大搜StompHeaderAccessor中后,可以在@Controller / @MessageMapping注解的方法中直接带上StompHeaderAccessor
             *    就可以通过方法提供的 getUser()方法获取到这里封装user对象
             * 2. 例如可以在这里拿到前端的信息进行登录鉴权
             */
            WebSocketUserAuthentication user = (WebSocketUserAuthentication) accessor.getUser();

            System.out.println("认证用户:" + user.toString() + " 页面传递令牌" + token);

        }else if (StompCommand.DISCONNECT.equals(accessor.getCommand()))
        {

        }
        return message;
    }

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    @Override
    public void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {

        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        /*
         * 拿到消息头对象后,我们可以做一系列业务操作
         * 1. 通过getSessionAttributes()方法获取到websocketSession,
         *    就可以取到我们在WebSocketHandshakeInterceptor拦截器中存在session中的信息
         * 2. 我们也可以获取到当前连接的状态,做一些统计,例如统计在线人数,或者缓存在线人数对应的令牌,方便后续业务调用
         */
        HttpSession httpSession = (HttpSession) accessor.getSessionAttributes().get("HTTP_SESSION");

        // 这里只是单纯的打印,可以根据项目的实际情况做业务处理
        log.info("postSend 中获取httpSession key:" + httpSession.getId());

        // 忽略心跳消息等非STOMP消息
        if(accessor.getCommand() == null)
        {
            return;
        }

        // 根据连接状态做处理,这里也只是打印了下,可以根据实际场景,对上线,下线,首次成功连接做处理
        System.out.println(accessor.getCommand());
        switch (accessor.getCommand())
        {
            // 首次连接
            case CONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 首次连接");
                break;
            // 连接中
            case CONNECTED:
                break;
            // 下线
            case DISCONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 下线");
                break;
             default:
                break;
        }


    }

    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean b, Exception e) {

    }

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    @Override
    public boolean preReceive(MessageChannel messageChannel) {
        return true;
    }

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
        return message;
    }

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception e) {

    }
}


服务端发送消息大体有两种场景,公告和私信,实现的方式蛮多的,这里只是举例说明,具体可以看此篇博文。

路径:https://blog.csdn.net/pacosonswjtu/article/details/51914567

服务端处理消息的场景:

  1. 公告就是只要订阅了此路径的的用户都能收到,我们使用@SendTo 注解实现,如果不使用注解指定,

    会默认交给broker进行处理,例如@MessageMapping("/demo1/twoWays") 这种,就会拼接代理域+路径
    
    相当于配置了@SendTo("/topicTest/twoWays"),也可以使用SimpMessagingTemplate.convertAndSend
    
  2. 私信就是指定人员才能收到,可以用@SendToUser 注解或者SimpMessagingTemplate 模板类(框架提供)的convertAndSendToUser进行处理

    • @SendToUser 多用于资源的请求,如果我只是想简单的用websocket向服务器请求资源而已,然后服务器你就把资源给我就行了,别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
    • SimpMessagingTemplate.convertAndSendToUser 可以用户发送指定的人员
    • 使用指定人员发送的时候,前缀必须为配置的setUserDestinationPrefix 配置的“/user”,在spring 框架内部以"/user" 为前缀的消息将会通过 UserDestinationMessageHandler 进行处理,而不是 AnnotationMethodMessageHandler 或 SimpleBrokerMessageHandler or StompBrokerRelayMessageHandler。UserDestinationMessageHandler 的主要任务: 是 将用户消息重新路由到 某个用户独有的目的地上。 在处理订阅的时候,它会将目标地址中的 "/user" 前缀去掉,并基于用户 的会话添加一个后缀。如,对 "/user/userTest/notifications" 的订阅最后可能路由到 名为 "/userTest/notifacations-user65a4sdfa" 目的地上

服务端controller 用于接收客户端消息和响应客户端

package com.wzh.demo.controller;

import com.alibaba.fastjson.JSON;
import com.wzh.demo.domain.WebSocketUserAuthentication;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Controller;

import java.util.HashMap;
import java.util.Map;

/**
 * <STOMP websocket controller>
 * <功能详细描述>
 * @author wzh
 * @version 2018-08-29 22:58
 * @see [相关类/方法] (可选)
 **/
@Controller
public class WebSocketStompController {

    Logger log = Logger.getLogger(WebSocketStompController.class);

    private final SimpMessagingTemplate messagingTemplate;

    /**
     * 实例化Controller的时候,注入SimpMessagingTemplate
     * @param messagingTemplate
     */
    @Autowired
    public WebSocketStompController(SimpMessagingTemplate messagingTemplate)
    {
        this.messagingTemplate = messagingTemplate;
    }

    /**
     * 发送广播消息,所有订阅了此路径的用户都会收到此消息
     * 这里做个restful风格,其实无所谓,根据项目实际情况进行配置
     * restful风格的接口,在springMVC中,我们使用@PathVariable注解,
     * 在websocket stomp接口中,restful要使用@DestinationVariable
     * @param groupId
     * @param json
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsg/{groupId}")
    @SendTo("/topicTest/hello")
    public Map<String, Object> sendChatMsg(@DestinationVariable(value = "groupId") String groupId, String json,
        StompHeaderAccessor headerAccessor)
    {
        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user =(WebSocketUserAuthentication)headerAccessor.getUser();
        log.info("公告controller 中获取用户登录令牌:" + user.getName());
        log.info("公告拿到客户端传递分组参数:" + groupId);
        
        // 这里拿到的json 字符串,其实可以自动绑定到对象上
        System.out.println("公告获取客户端传递过来的JSON 字符串:" + json);
        Map msg = (Map) JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();
        data.put("msg", "公告服务器收到客户端请求,发送广播消息:"+ msg.get("msg"));
        return data;
        
    }

    /**
     * 发送私信消息,只是想简单的用websocket向服务器请求资源而已,
     * 然后服务器你就把资源给我就行了,别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
     * 如果一个帐号打开了多个浏览器窗口,也就是打开了多个websocket session通道,
     * 这时,spring webscoket默认会把消息推送到同一个帐号不同的session,
     * 可以利用broadcast = false把避免推送到所有的session中
     * @param json
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsgByOwn")
    @SendToUser(value = "/userTest/own")
    public Map<String, Object> sendChatMsgByOwn(String json,
        StompHeaderAccessor headerAccessor)
    {
        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user = (WebSocketUserAuthentication)headerAccessor.getUser();

        log.info("SendToUser controller 中获取用户登录令牌:" + user.getName()
                + " socketId:" + headerAccessor.getSessionId());

        // 这里拿到的json 字符串,其实可以自动绑定到对象上
        System.out.println("SendToUser获取客户端传递过来的JSON 字符串:" + json);

        Map msg = (Map)JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();
        data.put("msg", "SendToUser服务器收到客户端请求,发送私信消息:" + msg.get("msg"));
        
        return data;
    }

    /**
     * 根据ID 把消息推送给指定用户
     * 1. 这里用了 @SendToUser 和 返回值 其意义是可以在发送成功后回执给发送放其信息发送成功
     * 2. 非必须,如果实际业务不需要关心此,可以不用@SendToUser注解,方法返回值为void
     * 3. 这里接收人的参数是用restful风格带过来了,websocket把参数带到后台的方式很多,除了url路径,
     *    还可以在header中封装用@Header或者@Headers去取等多种方式
     * @param accountId 消息接收人ID
     * @param json 消息JSON字符串
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsgById/{accountId}")
    @SendToUser(value = "/userTest/callBack")
    public Map<String, Object> sendChatMsgById(
        @DestinationVariable(value = "accountId") String accountId, String json,
        StompHeaderAccessor headerAccessor)
    {
        Map msg = (Map)JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();

        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user = (WebSocketUserAuthentication)headerAccessor.getUser();

        log.info("SendToUser controller 中获取用户登录令牌:" + user.getName()
                + " socketId:" + headerAccessor.getSessionId());

        // 向用户发送消息,第一个参数是接收人、第二个参数是浏览器订阅的地址,第三个是消息本身
        // 如果服务端要将消息发送给特定的某一个用户,
        // 可以使用SimpleMessageTemplate的convertAndSendToUser方法(第一个参数是用户的登陆名username)
        String address = "/userTest/callBack";
        messagingTemplate.convertAndSendToUser(accountId, address, msg.get("msg"));

        data.put("msg", "callBack 消息已推送,消息内容:" + msg.get("msg"));
        return data;
    }

}

一个springMVC的controller 用户跳转websocket页面,并封装简单的认证信息

package com.wzh.demo.controller;

import com.wzh.demo.websocket.handler.WebSocketHander;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.socket.TextMessage;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.Date;


/**
 * <websocket测试用MVC控制器>
 * <功能详细描述>
 * @author wzh
 * @version 2018-07-09 22:53
 * @see [相关类/方法] (可选)
 **/
@Controller
@RequestMapping("/websocket")
public class WebSocketController {

    // 跳转stomp websocket 页面
    @RequestMapping(value = "/spring/stompSocket.do",method = RequestMethod.GET)
    public String toStompWebSocket(HttpSession session, HttpServletRequest request, Model model)
    {
        // 这里封装一个登录的用户组参数,模拟进入通讯后的简单初始化
        model.addAttribute("groupId","user_groupId");
        model.addAttribute("session_id",session.getId());
        System.out.println("跳转:" + session.getId());
        session.setAttribute("loginName",session.getId());
        return "/test/springWebSocketStomp";

    }
}

Html 客户端,客户端需要引入额外的两个js,sockjs.js和stomp.js

Github 地址:

API地址:https://stomp-js.github.io/stomp-websocket/codo/extra/docs-src/Usage.md.html

API中文翻译博文:https://blog.csdn.net/jqsad/article/details/77745379


<#import "spring.ftl" as spring />
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
        "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
    <title>Title</title>
    <script src="${request.contextPath}/js/jquery-3.3.1.min.js"></script>
    <script src="${request.contextPath}/js/sockjs.js"></script>
    <script src="${request.contextPath}/js/stomp.js"></script>
    <script type="text/javascript">

        // 定义全局变量 stomp socket
        var stompClient,socket;

        $(document).ready(function () {
            if (window.WebSocket){
                websocketConfig();
            } else {
                alert("错误","浏览器不支持websocket技术通讯.");
            }
        });

        // websocket 配置
        function websocketConfig() {
            /*
             * 1. 连接url为endpointChat的endpoint,对应后台WebSoccketConfig的配置
             * 2. SockJS 所处理的URL 是 "http://" 或 "https://" 模式,而不是 "ws://" or  "wss://"
             */
            socket = new SockJS("${request.contextPath}/stomp/websocketJS");

            // 通过sock对象监听每个事件节点,非必须,这个必须放在stompClient的方法前面
            sockHandle();

            // 获取 STOMP 子协议的客户端对象
            stompClient = Stomp.over(socket);

            /*
             * 1. 获取到stomp 子协议后,可以设置心跳连接时间,认证连接,主动断开连接
             * 2,连接心跳有的版本的stomp.js 是默认开启的,这里我们不管版本,手工设置
             * 3. 心跳是双向的,客户端开启心跳,必须要服务端支持心跳才行
             * 4. heartbeat.outgoing 表示客户端给服务端发送心跳的间隔时间
             * 5. 客户端接收服务端心跳的间隔时间,如果为0 表示客户端不接收服务端心跳
             */
            stompClient.heartbeat.outgoing = 10000;
            stompClient.heartbeat.incoming = 0;

            /*
             * 1. stompClient.connect(headers, connectCallback, errorCallback);
             * 2. headers表示客户端的认证信息,多个参数 json格式存,这里简单用的httpsessionID,可以根据业务场景变更
             *    这里存的信息,在服务端StompHeaderAccessor 对象调用方法可以取到
             * 3. connectCallback 表示连接成功时(服务器响应 CONNECTED 帧)的回调方法;
             *    errorCallback 表示连接失败时(服务器响应 ERROR 帧)的回调方法,非必须;
             */
            var headers = {token:"${session_id}"};

            stompClient.connect(headers,function (frame) {

                console.log('Connected: ' + frame);

                /*
                 * 1. 订阅服务,订阅地址为服务器Controller 中的地址
                 * 2. 如果订阅为公告,地址为Controller 中@SendTo 注解地址
                 * 3. 如果订阅为私信,地址为setUserDestinationPrefix 前缀+@SendToUser注解地址
                 *    或者setUserDestinationPrefix 前缀 + controller的convertAndSendToUser地址一致
                 * 4. 这里演示为公告信息,所有订阅了的用户都能接受
                 */
                stompClient.subscribe("/topicTest/hello",function (message) {
                    var msg = JSON.parse(message.body).msg;
                    console.log("接收到公告信息:" + msg);
                    alert("接收到公告信息:" + msg);
                });

                /*
                 * 1. 因为推送为私信,必须带上或者setUserDestinationPrefix前缀 /user
                 * 2. 演示自己发送给自己,做websocket向服务器请求资源而已,然后服务器你就把资源给我就行了,
                 *    别的用户就不用你广播推送了,简单点,就是我请求,你就推送给我
                 */
                stompClient.subscribe('/user/userTest/own',function (message) {
                    var msg = JSON.parse(message.body).msg;
                    console.log("接收到私信信息SendToUser:" + msg);
                    alert("接收到私信信息SendToUser:" + msg);
                });

                /*
                 * 1. 订阅点对点消息
                 * 2. 很多博文这里的路径会写成"/user/{accountId}/userTest/callBack”这种,是因为
                 *    @SendToUser发送的代理地址是 /userTest/callBack, 地址将会被转化为 /user/{username}/userTest/callBack
                 *    username,为用户的登录名,也是就是Principal或者本文中的WebSocketUserAuthentication对象getName获取的参数
                 *    如果在拦截器中配置了认证路径,可以不带参数,不过推荐用带参数的写法
                 *
                 */
                stompClient.subscribe('/user/userTest/callBack',function (message) {

                    var msg  = message.body;
                    console.log("接收到点对点SendToUser:" + msg);
                    alert("接收到点对点SendToUser:" + msg);
                });

            }, function (error) {
                console.log('STOMP: ' + error);
                //setTimeout(websocketConfig, 10000);
                console.log('STOMP: Reconnecting in 10 seconds');
            });
        }
        
        // 发送公告消息
        function sendMsg() {
            var msg = $("#message").val();
            var data ={"msg":msg};

            /**
             *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
             *  2. headers 为发送信息的header,json格式,JavaScript 对象,
             *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
             *  3. body 为发送信息的 body,字符串,可选参数
             */
            stompClient.send('${"/app/sendChatMsg/" + groupId}',{},JSON.stringify(data));
        }

        // 发送给自己
        function sendMsgOwn() {
            var msg = $("#message").val();
            var data ={"msg":msg};

            /**
             *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
             *  2. headers 为发送信息的header,json格式,JavaScript 对象,
             *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
             *  3. body 为发送信息的 body,字符串,可选参数
             */
            stompClient.send("/app/sendChatMsgByOwn",{},JSON.stringify(data));
        }

        // 发送点对点消息
        function sendMsgById() {
            var msg = $("#message").val();
            var accountId = $("#accountId").val();
            var data ={"msg":msg};

            /**
             *  1. 第一个参数 url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
             *  2. headers 为发送信息的header,json格式,JavaScript 对象,
             *     可选参数,可以携带消息头信息,也可以做事务,如果没有,传{}
             *  3. body 为发送信息的 body,字符串,可选参数
             *  4. accountId这个参数其实可以通过header传过去,不过因为是restful风格,所以就跟在url上
             */
            stompClient.send("/app/sendChatMsgById/" + accountId,{},JSON.stringify(data));
        }

        // 通过sock对象监听每个事件节点,非必须,这里开启了stomp的websocket 也不会生效了
        function sockHandle() {

            // 连接成功后的回调函数
            socket.onopen = function () {
                console.log("------连接成功------");
            };

            // 监听接受到服务器的消息
            socket.onmessage = function (event) {
                console.log('-------收到的消息: ' + event.data);
            };

            // 关闭连接的回调函数
            socket.onclose = function (event) {
                console.log('--------关闭连接: connection closed.------');
            };

            // 连接发生错误
            socket.onerror = function () {
                alert("连接错误", "网络超时或通讯地址错误.");
                disconnect();
            } ;
        }

        // 关闭websocket
        function disconnect() {
            if (socket != null) {
                socket.close();
                socket = null;
            }
        }

    </script>
</head>
<body>
    <div>
        <span>消息</span>
        <input type="text" id="message" name="message">
        <input type="button" id="sendMsg" name="sendMsg" value="发送公告" onclick="sendMsg();">
        <input type="button" id="sendMsgOwn" name="sendMsgOwn" value="自己给自己推送" onclick="sendMsgOwn();">
        <br/>
        <span>接收人</span>
        <input type="text" id="accountId" name="accountId">
        <input type="button" id="sendMsgById" name="sendMsgById" value="点对点消息" onclick="sendMsgById();">
    </div>
</body>
</html>

这样就可以通过页面做简单的消息推送了


20180919224511380 (1).gif
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容