最近在做一个项目,场景是需要与一些智能货架进行对接,当货架物品拿起或放下时,会推送一条数据到后端接口,此时前端的大屏上能够实时展示商品详情及商品数量。最终选定前端使用angular来绘制页面。
需求实现方式无外乎两种1.轮询方式,不断请求后端,弊端就是及时性较差,且不断请求后端,会加大后端压力。2.使用websocket,后端收到消息后,立刻推送数据给前端。
踩过一些坑之后后,最终还是决定使用websocket的模式来实现,在网上找了不少资料,都没有很完整的,所幸最终还是实现了功能,demo如下。
angular版本:
"dependencies": {
"@angular/animations": "^7.1.1",
"@angular/common": "~6.1.1",
"@angular/compiler": "~7.0.0",
"@angular/core": "~6.1.1",
"@angular/forms": "~6.1.1",
"@angular/http": "~6.1.1",
"@angular/platform-browser": "~6.1.1",
"@angular/platform-browser-dynamic": "~6.1.1",
"@angular/router": "~6.1.1",
"core-js": "^2.5.4",
"rxjs": "~6.3.3",
"zone.js": "~0.8.26"
}
废话不多说,直接上代码:
前端:
这里因为能够明确是Chrome浏览器的最新版本,因此并没有做兼容性处理,但是做了心跳机制,断开后能够保证重连
websocket.service.ts
import {Injectable} from '@angular/core';
import {interval, Subject} from 'rxjs';
/**
* websocket服务
* @author LiQun
* @date 2019/1/25
*/
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
messageSubject; // subject对象,用于发送事件
private url; // 默认请求的url
private webSocket: WebSocket; // websocket对象
connectSuccess = false; // websocket 连接成功
period = 60 * 1000 * 10; // 10分钟检查一次
serverTimeoutSubscription = null; // 定时检测连接对象
reconnectFlag = false; // 重连
reconnectPeriod = 5 * 1000; // 重连失败,则5秒钟重连一次
reconnectSubscription = null; // 重连订阅对象
runTimeSubscription; // 记录运行连接subscription
runTimePeriod = 60 * 10000; // 记录运行连接时间
constructor() {
this.messageSubject = new Subject();
console.log('开始心跳检测');
// 进入程序就进行心跳检测,避免出现开始就连接中断,后续不重连
this.heartCheckStart();
this.calcRunTime();
}
/**
* 发送消息
* @author LiQun
* @date 2019/1/22
* @param message 发送消息
*/
sendMessage(message) {
this.webSocket.send(message);
}
/**
* 创建新连接
* @author LiQun
* @date 2019/1/22
* @param url 要连接的url
*/
connect(url) {
if (!!url) {
this.url = url;
}
// 创建websocket对象
this.createWebSocket();
}
/**
* 创建连接
* @author LiQun
* @date 2019/1/22
*/
createWebSocket() {
// 如果没有建立过连接,才建立连接并且添加时间监听
this.webSocket = new WebSocket(this.url);
// 建立连接成功
this.webSocket.onopen = (e) => this.onOpen(e);
// 接收到消息
this.webSocket.onmessage = (e) => this.onMessage(e);
// 连接关闭
this.webSocket.onclose = (e) => this.onClose(e);
// 异常
this.webSocket.onerror = (e) => this.onError(e);
}
/**
* 连接打开
* @author LiQun
* @date 2019/1/22
* @param e 打开事件
*/
onOpen(e) {
console.log('websocket 已连接');
// 设置连接成功
this.connectSuccess = true;
// 如果是重连中
if (this.reconnectFlag) {
// 1.停止重连
this.stopReconnect();
// 2.重新开启心跳
this.heartCheckStart();
// 3.重新开始计算运行时间
this.calcRunTime();
}
}
/**
* 接受到消息
* @author LiQun
* @date 2019/1/22
* @param event 接受消息事件
*/
onMessage(event) {
console.log('接收到的消息', event.data);
// 将接受到的消息发布出去
const message = JSON.parse(event.data);
console.log('接收到消息时间', new Date().getTime());
this.messageSubject.next(message);
}
/**
* 连接关闭
* @author LiQun
* @date 2019/1/22
*/
private onClose(e) {
console.log('连接关闭', e);
this.connectSuccess = false;
this.webSocket.close();
// 关闭时开始重连
this.reconnect();
this.stopRunTime();
// throw new Error('webSocket connection closed:)');
}
/**
* 连接异常
* @author LiQun
* @date 2019/1/22
*/
private onError(e) {
// 出现异常时一定会进onClose,所以只在onClose做一次重连动作
console.log('连接异常', e);
this.connectSuccess = false;
// throw new Error('webSocket connection error:)');
}
/**
* 开始重新连接
* @author LiQun
* @date 2019/1/22
*/
reconnect() {
// 如果已重连,则直接return,避免重复连接
if (this.connectSuccess) {
this.stopReconnect();
console.log('已经连接成功,停止重连');
return;
}
// 如果正在连接中,则直接return,避免产生多个轮训事件
if (this.reconnectFlag) {
console.log('正在重连,直接返回');
return;
}
// 开始重连
this.reconnectFlag = true;
// 如果没能成功连接,则定时重连
this.reconnectSubscription = interval(this.reconnectPeriod).subscribe(async (val) => {
console.log(`重连:${val}次`);
const url = this.url;
// 重新连接
this.connect(url);
});
}
/**
* 停止重连
* @author LiQun
* @date 2019/1/22
*/
stopReconnect() {
// 连接标识置为false
this.reconnectFlag = false;
// 取消订阅
if (typeof this.reconnectSubscription !== 'undefined' && this.reconnectSubscription != null) {
this.reconnectSubscription.unsubscribe();
}
}
/**
* 开始心跳检测
* @author LiQun
* @date 2019/1/22
*/
heartCheckStart() {
this.serverTimeoutSubscription = interval(this.period).subscribe((val) => {
// 保持连接状态,重置下
if (this.webSocket != null && this.webSocket.readyState === 1) {
console.log(val, '连接状态,发送消息保持连接');
} else {
// 停止心跳
this.heartCheckStop();
// 开始重连
this.reconnect();
console.log('连接已断开,重新连接');
}
});
}
/**
* 停止心跳检测
* @author LiQun
* @date 2019/1/22
*/
heartCheckStop() {
// 取消订阅停止心跳
if (typeof this.serverTimeoutSubscription !== 'undefined' && this.serverTimeoutSubscription != null) {
this.serverTimeoutSubscription.unsubscribe();
}
}
/**
* 开始计算运行时间
* @author LiQun
* @date 2019/1/25
*/
calcRunTime() {
this.runTimeSubscription = interval(this.runTimePeriod).subscribe(period => {
console.log('运行时间', `${period}分钟`);
});
}
/**
* 停止计算运行时间
* @author LiQun
* @date 2019/1/25
*/
stopRunTime() {
if (typeof this.runTimeSubscription !== 'undefined' && this.runTimeSubscription !== null) {
this.runTimeSubscription.unsubscribe();
}
}
}
websocket-page.component.html
<section>
大家好!我是:{{currentUser}}
</section>
<section *ngFor="let msg of messages">
<section>{{msg}}</section>
</section>
<section>
当前登录用户:{{users}}
</section>
<section>
<label>
<select name="type" [(ngModel)]="type">
<option value="all">发送给所有人</option>
<option value="single">发送给个人</option>
</select>
</label>
</section>
<section>
<label *ngIf="type === 'single'">
<select name="user" [(ngModel)]="sendToUser">
<option *ngFor="let person of users" [value]="person">{{person}}</option>
</select>
</label>
</section>
<p *ngIf="error">{{error | json}}</p>
<p *ngIf="completed">completed!</p>
<label>
<input type="text" [(ngModel)]="message">
</label>
<button (click)="send()">发送</button>
websocket-page.component.ts
import {Component, OnInit} from '@angular/core';
import {WebsocketService} from '../../service/websocket/websocket.service';
import {ActivatedRoute, Params} from '@angular/router';
@Component({
selector: 'app-websocket-page',
templateUrl: './websocket-page.component.html',
styleUrls: ['./websocket-page.component.css']
})
export class WebsocketPageComponent implements OnInit {
messages = []; // 消息列表
message; // 发送的消息内容
error: any; // 异常信息
completed = false; // 发送完成
type = SEND_TYPE.ALL; // 默认类型发送给所有人
users = []; // 登陆的用户
sendToUser; // 需要发送消息的用户
currentUser; // 当前用户
constructor(
private webSocketService: WebsocketService,
private activatedRoute: ActivatedRoute
) {
// 从路由中获取参数
this.activatedRoute.params.subscribe((params: Params) => {
this.currentUser = params['id'];
});
}
ngOnInit(): void {
// 连接websocket
this.webSocketService.connect(`ws://localhost:8080/ws//echo?id=${this.currentUser}`);
// 接收消息
this.webSocketService.messageSubject.subscribe(
data => {
// 如果是用户登陆,则添加到登陆列表中
if (data.users) {
this.users = data.users;
} else {
// 否则添加到消息列表
this.messages.push(data.msg);
}
},
err => this.error = err,
() => this.completed = true
);
}
/**
* 发送消息
* @author LiQun
* @date 2019/1/25
*/
send() {
// 创建消息对象
const msg = {
msg: this.message, // 消息内容
type: this.type === SEND_TYPE.ALL ? SEND_TYPE.ALL : SEND_TYPE.SINGLE, // 类型
to: this.type === SEND_TYPE.SINGLE ? this.sendToUser : undefined // 要发送的对象
};
// 发送
this.webSocketService.sendMessage(JSON.stringify(msg));
// 发送完成,情况message内容
this.message = '';
}
}
export enum SEND_TYPE {
ALL = 'all',
SINGLE = 'single'
}
java后台(springboot):
WebSocketConfig.java
package com.gemini.websocketdemo01.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* websocket 配置文件
*
* @author LiQun
* @date 2019/1/22
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private MyHandShakeInterceptor handshake;
@Resource
private MyHandler handler;
/**
* 实现 WebSocketConfigurer 接口,重写 registerWebSocketHandlers 方法,这是一个核心实现方法,配置 websocket 入口,允许访问的域、注册 Handler、SockJs 支持和拦截器。
* <p>
* registry.addHandler()注册和路由的功能,当客户端发起 websocket 连接,把 /path 交给对应的 handler 处理,而不实现具体的业务逻辑,可以理解为收集和任务分发中心。
* <p>
* addInterceptors,顾名思义就是为 handler 添加拦截器,可以在调用 handler 前后加入我们自己的逻辑代码。
* <p>
* setAllowedOrigins(String[] domains),允许指定的域名或 IP (含端口号)建立长连接,如果只允许自家域名访问,这里轻松设置。如果不限时使用”*”号,如果指定了域名,则必须要以 http 或 https 开头。
*
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
//部分 支持websocket 的访问链接,允许跨域
registry.addHandler(handler, "/echo").addInterceptors(handshake).setAllowedOrigins("*");
//部分 不支持websocket的访问链接,允许跨域
registry.addHandler(handler, "/sockjs/echo").addInterceptors(handshake).setAllowedOrigins("*").withSockJS();
}
}
MyHandShakeInterceptor.java
package com.gemini.websocketdemo01.websocket;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
/**
* websocket消息拦截器
* @author LiQun
* @date 2019/1/22
*/
@Service
public class MyHandShakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(
ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
String id = request.getURI().toString().split("id=")[1];
System.out.println("当前session的ID=" + id);
// 从session中获取到当前登录的用户信息. 作为socket的账号信息. session的的WEBSOCKET_USER_ID信息,在用户打开页面的时候设置.
attributes.put("WEBSOCKET_USER_ID", id);
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest,
ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
MyHandler.java
package com.gemini.websocketdemo01.websocket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.*;
import org.apache.commons.logging.Log;
/**
* 处理websocket消息
*
* @author LiQun
* @date 2019/1/22
*/
@Service
public class MyHandler implements WebSocketHandler {
private static Log logger = LogFactory.getLog(MyHandler.class);
/**
* 在线用户,将其保存在set中,避免用户重复登录,出现多个session
*/
private static final Map<String, WebSocketSession> USERS;
static {
USERS = Collections.synchronizedMap(new HashMap<>());
}
private static final String SEND_ALL = "all";
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
System.out.println("链接成功.....");
logger.info("afterConnectionEstablished");
String id = (String) session.getAttributes().get("WEBSOCKET_USER_ID");
logger.info("用户id:" + id);
if (id != null) {
USERS.put(id, session);
JSONObject obj = new JSONObject();
// 统计一下当前登录系统的用户有多少个
obj.put("count", USERS.size());
obj.put("users", USERS.keySet().toArray());
session.sendMessage(new TextMessage(obj.toJSONString()));
}
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
JSONObject msg = JSON.parseObject(message.getPayload().toString());
logger.info("处理要发送的消息:" + message.getPayload().toString());
JSONObject obj = new JSONObject();
String type = msg.get("type").toString();
if (StringUtils.isNotBlank(type) && SEND_ALL.equals(type)) {
//给所有人
obj.put("msg", msg.getString("msg"));
logger.info("给所有人发消息");
sendMessageToUsers(new TextMessage(obj.toJSONString()));
} else {
//给个人
String to = msg.getString("to");
obj.put("msg", msg.getString("msg"));
logger.info("给个人发消息");
sendMessageToUser(to, new TextMessage(obj.toJSONString()));
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
session.close();
}
logger.info("链接出错,关闭链接,异常信息:" + exception.getMessage());
String userId = getUserId(session);
if (USERS.get(userId) != null) {
USERS.remove(userId);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
logger.info("链接关闭,关闭信息:" + closeStatus.toString());
String userId = getUserId(session);
if (USERS.get(userId) != null) {
USERS.remove(userId);
}
}
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 获取用户id
*
* @author LiQun
* @date 2019/1/22
*/
private String getUserId(WebSocketSession session) {
try {
return (String) session.getAttributes().get("WEBSOCKET_USER_ID");
} catch (Exception e) {
return null;
}
}
/**
* 给所有在线用户发送消息
* @param message 文本消息
*/
public void sendMessageToUsers(TextMessage message) {
WebSocketSession user = null;
for (String key : USERS.keySet()) {
user = USERS.get(key);
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 给某个用户发送消息
*
* @param userName 用户id
* @param message 消息
*/
public void sendMessageToUser(String userName, TextMessage message) {
WebSocketSession user = USERS.get(userName);
try {
if (user.isOpen()) {
user.sendMessage(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 根据当前用户id登出
*
* @author LiQun
* @date 2019/1/22
* @param userId 用户id
*/
public void logout(String userId) {
USERS.remove(userId);
logger.info("用户登出,id:" + userId);
}
}
WebSocketController.java
package com.gemini.websocketdemo01.websocket;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.socket.TextMessage;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;
/**
* websocket 消息发送Controller
*
* @author LiQun
* @date 2019/1/22
*/
@Controller
@RequestMapping("/socket")
public class WebSocketController {
@Resource
private MyHandler myHandler;
/**
* 发送消息给指定人
*
* @param request
* @return
*/
@RequestMapping("/send")
public Map<String, Object> send(HttpServletRequest request) {
Map<String, Object> result = new HashMap<>();
try {
// 假设用户jack登录,存储到session中
String id = request.getParameter("id");
String message = request.getParameter("message");
this.myHandler.sendMessageToUser(id, new TextMessage(message));
} catch (Exception e) {
result.put("success", false);
result.put("message", e.getMessage());
}
return result;
}
/**
* 指定id下线
*
* @author LiQun
* @date 2019/1/22
*/
@RequestMapping("logout")
public Map<String, Object> logout(HttpServletRequest request) {
Map<String, Object> result = new HashMap<>();
try {
String id = request.getParameter("id");
this.myHandler.logout(id);
result.put("success", true);
} catch (Exception e) {
result.put("success", false);
result.put("message", e.getMessage());
}
return result;
}
}
StringUtils.java
package com.gemini.websocketdemo01.websocket;
/**
* string工具类
* @author LiQun
* @date 2019/1/25
*/
public class StringUtils {
/**
* 判断指定字符串是否不等于null和空字符串
*
* @param str 指定字符串
* @return 如果不等于null和空字符串则返回true,否则返回false
*/
public static boolean isNotBlank(String str) {
return !isBlank(str);
}
/**
* 判断指定字符串是否等于null或空字符串
*
* @param str 指定字符串
* @return 如果等于null或空字符串则返回true,否则返回false
*/
public static boolean isBlank(String str) {
return str == null || "".equals(str.trim()) || str == "null";
}
}
运行工程后分别打开两个地址
http://localhost:4200/#/websocket/01
http://localhost:4200/#/websocket/02
最终效果图:
代码已上传到github
前端地址: https://github.com/jytx/websocket-demo.git