SpringBoot——整合WebSocket(基于STOMP协议)

参考链接

STOMP

定义

STOMP 中文为“面向消息的简单文本协议”,STOMP 提供了能够协作的报文格 式,以至于 STOMP 客户端可以与任何 STOMP 消息代理Brokers进行通信,从而为多语言,多平台和 Brokers 集群提供简单且普遍的消息协作。STOMP 协议可 以建立在 WebSocket 之上,也可以建立在其他应用层协议之上。通过 Websocket 建立 STOMP 连接,也就是说在 Websocket 连接的基础上再建立 STOMP 连接。

常见的 STOMP 的服务器/客户端的开源实现

  • STOMP 服务器:ActiveMQ、RabbitMQ、StompServer、…
  • STOMP 客户端库:stomp.js(javascript)、stomp.py(python)、Gozirra(java)、…

STOMP Over WebSocket

即 WebSocket 结合 Stomp 的实现。WebSocket 协议是基于 TCP 的一种新的网络协议,实现双工通讯,但是 websocket 没有规范payload (除控制信息外的有效载体)格式,可以是文本数据,也可以发送二进制数据,需要我们自己定义。而我们可以使用 stomp 协议去规范传输数据格式标准。

Stomp 帧格式示例

STOMP的客户端和服务器之间的通信是通过“帧”(Frame) 实现的,每个帧由多“行”(Line)组成。

  • 第一行包含了命令,然后紧跟键值对形式的Header内容。
  • 第二行必须是空行。
  • 第三行开始就是Body内容,末尾都以空字符结尾。
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

参考:http://stomp.github.io/stomp-specification-1.1.html

STOMP Over WebSocket 优点

运行流程图

Stomp 本身

  • channel:即客户端与服务端连接的通道
  • /app:由程序配置定义的缀,这种前缀指该消息需要经过一些自定义处理(在Controller中)再发到 Stomp 代理(后续详解)
  • /topic:同上,不过这类是不需要经过自定义处理的,直接发到 Stomp 代理(后续详解)

结合其他消息中间件

即相对于上述增加了一步 Stomp 代理与消息中间件之间的交互。

SocketJs 是什么

SockJS 是一个浏览器的 JavaScript 库,它提供了一个类似于网络的对象,SockJS 提供了一个连贯的,跨浏览器的 JavaScriptAPI,它在浏览器和 Web 服务器之间创建了一个低延迟、全双工、跨域 通信通道。SockJS 的一大好处在于提供了浏览器兼容性。即优先使用原生 WebSocket,如果浏览器不支持 WebSocket,会自动降为轮询的方式。

依赖引入

<dependencies>

    <!-- web-socket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>compile</scope>
    </dependency>

    <!-- redis -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!-- security -->
    <!--        <dependency>-->
    <!--            <groupId>org.springframework.boot</groupId>-->
    <!--            <artifactId>spring-boot-starter-security</artifactId>-->
    <!--        </dependency>-->

    <!-- fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>

</dependencies>

群发实现

服务端代码

  • 1、WebSocketCofig 配置类
/**
 * @EnableWebSocketMessageBroker 开启 WebSocket Over Stomp
 * @author 17697
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 注册Stomp服务端点
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // addEndpoint 设置与客户端建立连接的url
        registry.addEndpoint("/ws")
                // 设置允许跨域
                .setAllowedOriginPatterns("*")
                // 允许SocketJs使用,是为了防止某些浏览器客户端不支持websocket协议的降级策略
                .withSockJS();
    }
}
  • 2、实体类
/**
 * 消息模型类
 */
@Data
public class ChatMessage {

    /**
     * 消息类型
     */
    private MessageType type;

    /**
     * 消息正文
     */
    private String content;

    /**
     * 消息发送者
     */
    private String sender;

    /**
     * 消息接收者
     */
    private String toUser;

    public enum MessageType {
        CHAT,
        JOIN,
        LEAVE
    }
}
  • 3、Controller层代码
@RestController
public class ChatController {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    /**
     * 客户端发送消息入口,群发消息
     * @param chatMessage
     * @return
     */
    @MessageMapping("/chat/sendMessage")
    @SendTo({"/topic/public"})
    public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
        return chatMessage;
    }

    /**
     * 客户端新增用户消息入口,用于群发显示:新进入xx用户
     * @param chatMessage
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/chat/addUser")
    @SendTo({"/topic/public"})
    public ChatMessage addUser(@Payload ChatMessage chatMessage,
                               SimpMessageHeaderAccessor headerAccessor) {
        // Add username in web socket session
        headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
        return chatMessage;
    }
    
}

注解介绍

  • @MessageMapping:和 @RequestMapping 注解功能类似,不过该注解用于接收 Stomp 客户端向服务端发送的url地址
    注意:

    • @MessageMapping 使用若未搭配 @SendTo 或 @SendToUser 则会默认发送同 @MessageMapping 中 url 的主题消息
    • 使用该注解,则方法下尽量不要使用 SimpMessagingTemplate 的转发方法,如果非要使用,请把方法返回值改为 void ,否则会出现同时向主题发送两次消息。
  • @SendTo:定义方法~返回数据向其定义的 url 发送;
    等同于 SimpMessagingTemplate.convertAndSendTo("/message", "新消息")
    例: @SendTo({"/topic/public"}) 将消息发送到 /topic/public 主题下

  • @SendToUser:同上,不过是向单一用户发送消息;
    等同于 SimpMessagingTemplate.convertAndSendToUser(Key,"/message", "新消息")

SimpMessagingTemplate 主要方法介绍

推荐使用 SimpMessagingTemplate 处理消息,这种相对于上注解更易理解。

  • void convertAndSend(D destination, Object payload)
    群发消息:arg1: 目的地址,arg2: 消息内容
  • void convertAndSendToUser(String user, String destination, Object payload)
    单发消息,arg1: 向谁发送,arg2: 目的地址,arg3: 消息内容。
@Override
public void convertAndSendToUser(String user, String destination, Object payload,
                                 @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException {
    Assert.notNull(user, "User must not be null");
    Assert.isTrue(!user.contains("%2F"), "Invalid sequence \"%2F\" in user name: " + user);
    user = StringUtils.replace(user, "/", "%2F");
    destination = destination.startsWith("/") ? destination : "/" + destination;
    // 实际还是调用 convertAndSend,destinationPrefix 默认值是 "/user/"(可配置修改)
    super.convertAndSend(this.destinationPrefix + user + destination, payload, headers, postProcessor);
}
  • 4、事件监听类编写
@Component
@Slf4j
public class WebSocketEventListener {

    @Autowired
    private SimpMessageSendingOperations messagingTemplate;

    /**
     * 连接建立事件
     * @param event
     */
    @EventListener
    public void handleWebSocketConnectListener(SessionConnectEvent event) {
        log.info("建立一个新的连接");
    }


    /**
     * 连接断开事件
     * @param event
     */
    @EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String username = (String) headerAccessor.getSessionAttributes().get("username");
        if(username != null) {
            log.info("用户断开连接 : " + username);

            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setType(ChatMessage.MessageType.LEAVE);
            chatMessage.setSender(username);

            messagingTemplate.convertAndSend("/topic/public", chatMessage);
        }
    }
}
  • 5、前端代码
function connect(event) {
    username = document.querySelector('#name').value.trim();

    if(username) {
        usernamePage.classList.add('hidden');
        chatPage.classList.remove('hidden');

        // 建立服务端 websocket 连接,/ws 是后端服务器配置端点路径
        var socket = new SockJS('/ws');
        stompClient = Stomp.over(socket);

        stompClient.connect({}, onConnected, onError);

    }
    event.preventDefault();
}

function onConnected() {
    // 订阅群发主题
    stompClient.subscribe('/topic/public', onMessageReceived);

    stompClient.send("/chat/addUser",
        {},
        JSON.stringify({sender: username, type: 'JOIN'})
    )

    connectingElement.classList.add('hidden');
}


function onError(error) {
    connectingElement.textContent = 'Could not connect to WebSocket server. Please refresh this page to try again!';
    connectingElement.style.color = 'red';
}


function sendMessage(event) {
    var messageContent = messageInput.value.trim();

    if(messageContent && stompClient) {
        var chatMessage = {
            sender: username,
            content: messageInput.value,
            type: 'CHAT'
        };
        // arg1: 消息发送url,arg2: 消息头信息(例:加入用户认证头信息),arg3: 消息体
        stompClient.send("/chat/sendMessage", {}, JSON.stringify(chatMessage));
        messageInput.value = '';
    }
    event.preventDefault();
}

单发消息(一对一)

服务端代码
其他不变,在 Controller 层新增一对一方法;

使用 @RequestMapping 的方式通常用于其他服务端发送消息的入口

这里由于本人不会前端,所以采用下述通过 Postman 发送消息的形式(菜狗),会前端的大佬可以按之前的群发自行改造

/**
 * 一对一消息发送
 * @param chatMessage
*/
@PostMapping("/chat/single")
public void sendSingleMessage(@RequestBody ChatMessage chatMessage) {
    messagingTemplate.convertAndSendToUser(chatMessage.getToUser(),"/single",chatMessage);
}

前端代码

在连接方法中新增订阅个人主题

function onConnected() {
    // 订阅群发主题
    stompClient.subscribe('/topic/public', onMessageReceived);

    // 新增订阅一对一主题,即通过用户名等唯一性标识拼接到订阅主题地址
    stompClient.subscribe('/user/'+username+'/single', onMessageReceived);
    
    stompClient.send("/chat/addUser",
        {},
        JSON.stringify({sender: username, type: 'JOIN'})
    )

    connectingElement.classList.add('hidden');
}

拦截器配置

  • 1、编写自定义拦截器实现 ChannelInterceptor
/**
 * Socket拦截器
 * @author 17697
 */
@Component
public class SocketChannelInterceptor implements ChannelInterceptor {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private final static String SOCKET_TOKEN_PREFIX = "webSocket:";

    private final static String SOCKET_AUTH = "socket_auth:";
    /**
     * 发送消息到通道前
     * @param message
     * @param channel
     * @return
     */
    @SneakyThrows
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        // 获取连接头信息
        StompHeaderAccessor accessor =
                MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

        // 连接验证token合法性(简单模拟)
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            // 获取头中的token
            String token = accessor.getFirstNativeHeader("token");
            if (StringUtils.hasText(token)) {
                String redisToken = redisTemplate.opsForValue().get(SOCKET_TOKEN_PREFIX);
                if (token.equals(redisToken)) {
                    /* 这里可以结合 Security
                    UsernamePasswordAuthenticationToken authentication = new UsernamePasswordAuthenticationToken(userDetails, null, userDetails.getAuthorities());
                   SecurityContextHolder.getContext().setAuthentication(authentication);
                    accessor.setUser(authentication);
*/
                    // 简单处理设置对应权限。完整的应该根据用户的权限得出是否有发送/订阅到某个目的路径的权限
                    accessor.setUser(new UserPrincipal() {
                        @Override
                        public String getName() {
                            // 模拟权限类,仅有属性可发送/订阅
                            Permission permission = new Permission();
                            permission.setIsSend(true);
                            permission.setIsSubscribe(true);
                            String s = JSON.toJSONString(permission);
                            return s;
                        }
                    });

                } else {
                    throw new IllegalAccessException("未授权!!!");
                }
            } else {
                throw new IllegalAccessException("未授权!!!");
            }
            // 订阅权限认证
        } else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            UserPrincipal user = ((UserPrincipal) accessor.getUser());
            String value = user.getName();
            if (StringUtils.hasText(value)) {
                JSONObject jsonObject = JSONObject.parseObject(value);
                Boolean flag = ((Boolean) jsonObject.get("isSubscribe"));
                if (!flag) {
                    throw new IllegalAccessException("无权限订阅!!!");
                }
            } else {
                throw new IllegalAccessException("无权限订阅!!!");
            }
            // 发送权限验证
        } else if (StompCommand.SEND.equals(accessor.getCommand())) {
            UserPrincipal user = ((UserPrincipal) accessor.getUser());
            String value = user.getName();
            if (StringUtils.hasText(value)) {
                JSONObject jsonObject = JSONObject.parseObject(value);
                Boolean flag = ((Boolean) jsonObject.get("isSend"));
                if (!flag) {
                    throw new IllegalAccessException("无权限发送!!!");
                }
            } else {
                throw new IllegalAccessException("无权限发送!!!");
            }
        }
        return message;
    }


    /**
     * 发送消息到通道后
     * @param message
     * @param channel
     * @return
     */
    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
        ChannelInterceptor.super.postSend(message, channel, sent);
    }

    /**
     * 发送完成后
     * @param message
     * @param channel
     * @return
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex);
    }

    @Override
    public boolean preReceive(MessageChannel channel) {
        return ChannelInterceptor.super.preReceive(channel);
    }

    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
        return ChannelInterceptor.super.postReceive(message, channel);
    }

    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {
        ChannelInterceptor.super.afterReceiveCompletion(message, channel, ex);
    }
}

为什么保存认证信息使用 setUser 方法?
该方法表示会话的拥有者,即存储该会话拥有者信息。
每次建立连接都会创建一个 WebSocketSession 会话信息类,在该会话进行消息传递每次都会把 SessionId ,SessionAttributes 和 Principal(即我们setUser()保存的信息) 赋值到 Message 中,而 Principal 就是专门存储身份认证信息的。

  • SessionId: 初始随机分配的,用于确定唯一的会话
  • SessionAttributes: 用于给 WebSocketSession 设置一些额外记录属性,结构是 Map
  • Principal: 用于设置 WebSocketSession 的身份认证信息

额外配置项

根据需求添加

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private SocketChannelInterceptor socketChanelInterceptor;
    
    /**
     * 注册Stomp服务端点
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // addEndpoint 设置与客户端建立连接的url
        registry.addEndpoint("/ws")
                // 设置允许跨域
                .setAllowedOriginPatterns("*")
                // 允许SocketJs使用,是为了防止某些浏览器客户端不支持websocket协议的降级策略
                .withSockJS();
    }
    
    /**
     * 自定义拦截器配置
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(socketChanelInterceptor);
    }


    /**
     * 配置消息代理的路由规则
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 定义服务端应用目标前缀;客户端只有以这个前缀才能进入服务端方法 @MessageMapping
        registry.setApplicationDestinationPrefixes("/app/");
        // 定义SimpleBroker处理的消息前缀;只有消息以这个为前缀才会被SimpleBroker处理转发
        registry.enableSimpleBroker("/topic/","/user/");
        // 设置一对一消息前缀,默认的是"/user/",可通过该方法修改
        registry.setUserDestinationPrefix("/user/");

    }
}

项目代码

最后附上所有的代码地址:https://github.com/jjhyb/websocket-master

参考:
https://blog.csdn.net/AhangA/article/details/125470930

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容