1.背景
最近在项目中遇到需要在网页上实时展示某些设备信息的需求。一开始还好,接入的设备会定时推送数据给系统(一分钟间隔),时效性要求不是很高,于是在前端写了定时方法,每分钟刷新一次数据,虽不优雅,但也勉强可以满足需求。
但是系统陆续又接入了一些新设备,这次数据是不定时的推送(可能一天推送几条,也可能一秒钟推送几条)。这样一来,之前的方法就不合适了,定时方法的时间间隔设的太大,就有会延迟,也谈不上实时了;设置的太小,则会频繁的请求,浪费资源不说,还有可能请求不到数据(因为是不定时推送)。因此,就想,要是服务器主动向前端推送数据就好了,一旦服务器就收到设备数据,服务器就立马推送给前端。
听前辈说WebSocket可以做到,于是一顿搜罗。简单地说,WebSocket是H5的一部分,是可以满足实时性的全双工通信的一种协议。
我们最常用的http协议,都是发出一次请求,才能获取响应,服务器并不能主动向客户端推送数据。为了解决这个短板,在WebSocket之前,有定期轮询、comet等,能够实现类似服务器推送效果,但本质上还是请求/响应模式。WebSocket则不同,通过一次握手,会在服务器和客户端建立一个连接,基于此连接,客户端和服务器就可以双向发送数据。
WebSocket详解
聊天室示例
我的需求和聊天室有些区别,聊天室是有数据变更,所有人都能看到;而我的需求是,数据有变更则推送给对应的客户端而不是所有客户端。这里先做个demo试一试深浅。
demo:https://gitee.com/bit-fist/websocket-test.git
2.环境
Vue + sockjs-client + stompjs
SpringBoot + RabitMQ
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
ws = new WebSocket("ws://localhost:8080/Demo/websocketTest/user000");
console.log("link success")
}else{
alert('Not support websocket')
}
//连接成功建立的回调方法
ws.onopen = function(){ws.send("Test!"); };
//接收到消息的回调方法
ws.onmessage = function(evt){console.log(evt.data);ws.close();};
//连接关闭的回调方法
ws.onclose = function(evt){console.log("WebSocketClosed!");};
//连接发生错误的回调方法
ws.onerror = function(evt){console.log("WebSocketError!");};
上面的方法算是原生的,WebSocket是比较新的协议,不是所有浏览器都支持。因此需要有兼容WebSocket的方案,这就要用到SockJS了,它是一个浏览器上运行的 JavaScript 库,如果浏览器不支持 WebSocket,该库可以模拟对 WebSocket 的支持,实现浏览器和 Web 服务器之间低延迟、全双工、跨域的通讯通道。
至于STOMP,直接使用WebSocket(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。
不过,好消息是我们并非必须要使用原生的WebSocket连接。就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。
对于STOMP理解不深,直接摘抄 自 使用WebSocket和STOMP实现消息功能 (读书人的事,怎么能算偷呢→_→)
安装命令:
cnpm install sockjs-client
cnpm install stompjs
前端就算准备完成了。
至于后端,只要添加两个依赖就行了
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.前端代码
methods:{
connect(){
// /server是vue.config.js中配置的代理,指向后端。如果没有配置代理,可以直接用http://{ip}:{port}替换。
// 其中,/ws 是后端配置的端点,用于创建连接。
let sock = new SockJS("/server/ws");
this.socket = Stomp.over(sock);
this.socket.connect({}, this.afterConnect, this.connectError)
},
disconnect(){
this.socket.disconnect(()=>{
})
},
afterConnect(){
console.log("连接成功")
},
connectError(){
console.error("连接失败!")
},
subscribe1(){
this.order = this.socket.subscribe("/topic/subscribe/first", this.afterSubscribe)
},
sendMessage1(){
this.socket.send("/topic/send/first", {}, this.message1 || "nothing")
},
subscribe2(){
this.order = this.socket.subscribe("/topic/subscribe/second")
},
sendMessage2(){
this.socket.send("/topic/send/second/" + this.name + "/" + this.gender, this.afterSubscribe, this.message2 || "nothing")
},
subscribe3(){
this.order = this.socket.subscribe("/topic/subscribe/third", this.afterSubscribe, {name: this.name1, gender: this.gender1})
},
sendMessage3(){
this.socket.send("/topic/send/third", {}, this.message3)
},
subscribe31(){
this.order = this.socket.subscribe("/topic/subscribe/third1", this.afterSubscribe)
},
sendMessage31(){
this.socket.send("/topic/send/third1", {name: this.name2, gender: this.gender2}, this.message4)
},
subscribe4(){
this.order = this.socket.subscribe("/user/topic/order", this.afterSubscribe)
},
sendMessage4(){
this.socket.send("/user/topic/message", {}, this.message5 || "nothing")
},
afterSubscribe(m){
console.log("afterSubscribe", JSON.parse(m.body))
},
afterSendMessage(m){
console.log("afterSendMessage", JSON.parse(m.body))
}
},
beforeDestroy(){
if(this.socket){
this.disconnect()
}
}
}
4.后端代码
1.配置
package com.bit.fist.websocketdemo.common.config;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@SpringBootConfiguration
@EnableWebSocketMessageBroker
public class WebSocketConfigurer implements WebSocketMessageBrokerConfigurer {
/**
* 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
* 也可以设置拦截器、处理器等
* @param registry
*/
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")//用于首次建立连接
.setAllowedOrigins("*")//允许跨域
.withSockJS();//使用sockjs兼容
}
/**
* 配置消息代理,哪种路径的消息会进行代理处理
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//订阅广播 Broker(消息代理)名称
registry.enableSimpleBroker("/topic", "/talk");
//点对点(一对一)使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user");
}
}
2.拦截器
3.controller
package com.bit.fist.websocketdemo.controller;
import com.bit.fist.websocketdemo.common.entity.ResponseEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.*;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@Slf4j
@RestController
public class SubscribeAndSendController {
/**
* 1.普通的订阅与推送
* @return
*/
@SubscribeMapping("/topic/subscribe/first")
public ResponseEntity subscribe1(){
log.info("订阅1----------");
return ResponseEntity.successToResponse("订阅1成功");
}
@MessageMapping("/topic/send/first")
@SendTo("/topic/subscribe/first")
public ResponseEntity recieve1(String msg){
log.info("发送1:{}", msg);
return ResponseEntity.successToResponse(msg);
}
/**
* 2.url传参的订阅与推送(Message适用,Subscribe不适用, 因为@SendTo识别不到)
* @return
*/
@SubscribeMapping("/topic/subscribe/second")
public ResponseEntity subscribe2(){
return ResponseEntity.successToResponse("订阅2成功");
}
@MessageMapping("/topic/send/second/{name}/{gender}")
@SendTo("/topic/subscribe/second")
public ResponseEntity recieve2(String msg,@DestinationVariable("name") String name, @DestinationVariable("gender") String gender){
log.info("发送2----------:{},{},{}",msg, name, gender);
return ResponseEntity.successToResponse(msg);
}
/**
* 3.headers传参(Subscribe、Message都适用)
* 可以通过@Headers取得所有参数,也可以通过@Header单独取出需要的数据
* @param name
* @param gender
* @param headers
* @return
*/
@SubscribeMapping("/topic/subscribe/third")
public ResponseEntity subscribe3(@Header("name")String name, @Header("gender")String gender, @Headers Map<String, Object> headers){
log.info("订阅3----------:{},{},{}",name, gender, headers);
return ResponseEntity.successToResponse("订阅3成功");
}
@MessageMapping("/topic/send/third")
@SendTo("/topic/subscribe/third")
public ResponseEntity recieve3(String msg){
log.info("发送3:{}", msg);
/**
* 3.1
* @return
*/
@SubscribeMapping("/topic/subscribe/third1")
public ResponseEntity subscribe31(){
log.info("订阅3.1");
return ResponseEntity.successToResponse("订阅3.1成功");
}
@MessageMapping("/topic/send/third1")
@SendTo("/topic/subscribe/third1")
public ResponseEntity recieve31(String msg, @Header(value = "name")String name, @Header(value = "gender", required = false)String gender, @Headers Map<String, Object> headers){
log.info("发送3.1----------:{},{},{},{}", msg, name, gender, headers);
return ResponseEntity.successToResponse(msg);
}
/**
* 点对点的订阅与推送
* @return
*/
@SubscribeMapping("/user/topic/order")
public ResponseEntity subscribe4(){
log.info("订阅4成功");
return ResponseEntity.successToResponse("订阅4成功");
}
@MessageMapping("/user/topic/message")
@SendToUser("/topic/order")
public ResponseEntity recieve4(String msg){
log.info("发送4----------:{}", msg);
return ResponseEntity.successToResponse(msg);
}
}
5.遇到的问题
看着挺简单,做起来净毛病,各种匪夷所思
1.建立连接失败
WebSocketConfig 中,registerStompEndpoints方法的registry.setAllowedOrigins("*"),是用来声明允许跨域的,但是创建连接就报错:
java.lang.IllegalArgumentException: When allowCredentials is true, allowedOrigins cannot contain the special value "*"since that cannot be set on the "Access-Control-Allow-Origin" response header. To allow credentials to a set of origins, list them explicitly or consider using "allowedOriginPatterns" instead.
可以试着换成.setAllowedOrigins("http://{ip}:{port}").
2.订阅失败,Payload value must not be empty
org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public com.kes.common.core.base.ApiResponseBody com.laowang.socket.controller.WebSocketController.subscribe(java.lang.String): 1 error(s): [Error in object 'prjId': codes []; arguments []; default message [Payload value must not be empty]]
原因是@SubscribeMapping标注的方法定义了入参,在订阅时却没有传参给后端。
两种办法:
(1)@SubscribeMapping标注的方法不定义入参,也不用传参(如果参数可有可无);
(2)如果一定要传参数,可以@SubscribeMapping配合@DestinationVariable注解,和@PathVariable类似
@SubscribeMapping("/topic/subscribe/{prjId}")
public ApiResponseBody subscribe(@DestinationVariable("prjId") String prjId){
log.info("订阅 /first,项目编号 : {}", prjId);
return ApiResponseBody.defaultSuccess("订阅成功!");
}
3.发送消息报错(一步一榔头 →_→),nested exception is java.lang.IllegalArgumentException: Expected destination pattern "/user/{userId}/**"
org.springframework.messaging.MessageDeliveryException: Failed to handle GenericMessage [payload=byte[122], headers={simpMessageType=MESSAGE, conversionHint=method 'text' parameter -1, contentType=application/json, simpSessionId=2elhhrww, simpDestination=/user/first}] to org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask@53a64803 in UserDestinationMessageHandler[DefaultUserDestinationResolver[prefix=/user/]]; nested exception is java.lang.IllegalArgumentException: Expected destination pattern "/user/{userId}/**"
原因:增加了@SendTo("/user/first")注解,路径错误,"/user"比较特殊,配置类中,已经将它注册为点对点的端点,registry.setUserDestinationPrefix("/user"),需要用@SendToUser
@SendTo 与 @SendToUser
解决办法:
(1)检查、更正路径;
(2)换成@SendToUser注解
4.@SendToUser无效
@SubscribeMapping("/user/receive/test")
public ApiResponseBody subscribe(){
log.info("订阅2");
return ApiResponseBody.defaultSuccess("订阅成功!");
}
@MessageMapping("/topic/receive2")
@SendToUser("/receive/test")
public ApiResponseBody text2(String text){
log.info("接收消息: {}", text);
return ApiResponseBody.defaultSuccess("发送信息成功!");
}
讲道理,当前端发送触发@MessageMapping("/topic/receive2")标注的方法后,返回后应该转发给"/user/receive/test",发送信息到时没问题,但就是转发不到订阅的目标,也不报错,费解。。。。
后来发现和配置有关,registry.enableSimpleBroker("/topic")只配置了 "/topic",所以"/user/receive/test"无法应用于@SendToUser
解决办法:
(1)增加配置,registry.enableSimpleBroker("/topic","/receive");
(2)将路径修改为/topic/test,如下:
@SubscribeMapping("/user/topic/test")//修改
public ApiResponseBody subscribe(){
log.info("订阅2");
return ApiResponseBody.defaultSuccess("订阅成功!");
}
@MessageMapping("/topic/receive2")
@SendToUser("/topic/test")//修改
public ApiResponseBody text2(String text){
log.info("接收消息: {}", text);
return ApiResponseBody.defaultSuccess("发送信息成功!");
}
当然前端订阅路径也要修改为 "/user/topic/test"
- 前端订阅、发送信息的端点都要以"/"开头,否则会定于不到,而且不报错