angular 整合websocket

最近在做一个项目,场景是需要与一些智能货架进行对接,当货架物品拿起或放下时,会推送一条数据到后端接口,此时前端的大屏上能够实时展示商品详情及商品数量。最终选定前端使用angular来绘制页面。

需求实现方式无外乎两种1.轮询方式,不断请求后端,弊端就是及时性较差,且不断请求后端,会加大后端压力。2.使用websocket,后端收到消息后,立刻推送数据给前端。
踩过一些坑之后后,最终还是决定使用websocket的模式来实现,在网上找了不少资料,都没有很完整的,所幸最终还是实现了功能,demo如下。

angular版本:

"dependencies": {
    "@angular/animations": "^7.1.1",
    "@angular/common": "~6.1.1",
    "@angular/compiler": "~7.0.0",
    "@angular/core": "~6.1.1",
    "@angular/forms": "~6.1.1",
    "@angular/http": "~6.1.1",
    "@angular/platform-browser": "~6.1.1",
    "@angular/platform-browser-dynamic": "~6.1.1",
    "@angular/router": "~6.1.1",
    "core-js": "^2.5.4",
    "rxjs": "~6.3.3",
    "zone.js": "~0.8.26"
  }

废话不多说,直接上代码:
前端:
这里因为能够明确是Chrome浏览器的最新版本,因此并没有做兼容性处理,但是做了心跳机制,断开后能够保证重连
websocket.service.ts

import {Injectable} from '@angular/core';
import {interval, Subject} from 'rxjs';

/**
 * websocket服务
 * @author LiQun
 * @date 2019/1/25
 */
@Injectable({
  providedIn: 'root'
})
export class WebsocketService {
  messageSubject;                                 // subject对象,用于发送事件
  private url;                                    // 默认请求的url
  private webSocket: WebSocket;                   // websocket对象
  connectSuccess = false;                         // websocket 连接成功
  period = 60 * 1000 * 10;                        // 10分钟检查一次
  serverTimeoutSubscription = null;               // 定时检测连接对象
  reconnectFlag = false;                          // 重连
  reconnectPeriod = 5 * 1000;                     // 重连失败,则5秒钟重连一次
  reconnectSubscription = null;                   // 重连订阅对象
  runTimeSubscription;                            // 记录运行连接subscription
  runTimePeriod = 60 * 10000;                     // 记录运行连接时间

  constructor() {
    this.messageSubject = new Subject();
    console.log('开始心跳检测');
    // 进入程序就进行心跳检测,避免出现开始就连接中断,后续不重连
    this.heartCheckStart();
    this.calcRunTime();
  }

  /**
   * 发送消息
   * @author LiQun
   * @date 2019/1/22
   * @param message 发送消息
   */
  sendMessage(message) {
    this.webSocket.send(message);
  }

  /**
   * 创建新连接
   * @author LiQun
   * @date 2019/1/22
   * @param url 要连接的url
   */
  connect(url) {
    if (!!url) {
      this.url = url;
    }
    // 创建websocket对象
    this.createWebSocket();
  }

  /**
   * 创建连接
   * @author LiQun
   * @date 2019/1/22
   */
  createWebSocket() {
    // 如果没有建立过连接,才建立连接并且添加时间监听
    this.webSocket = new WebSocket(this.url);
    // 建立连接成功
    this.webSocket.onopen = (e) => this.onOpen(e);
    // 接收到消息
    this.webSocket.onmessage = (e) => this.onMessage(e);
    // 连接关闭
    this.webSocket.onclose = (e) => this.onClose(e);
    // 异常
    this.webSocket.onerror = (e) => this.onError(e);
  }

  /**
   * 连接打开
   * @author LiQun
   * @date 2019/1/22
   * @param e 打开事件
   */
  onOpen(e) {
    console.log('websocket 已连接');
    // 设置连接成功
    this.connectSuccess = true;
    // 如果是重连中
    if (this.reconnectFlag) {
      // 1.停止重连
      this.stopReconnect();
      // 2.重新开启心跳
      this.heartCheckStart();
      // 3.重新开始计算运行时间
      this.calcRunTime();
    }
  }

  /**
   * 接受到消息
   * @author LiQun
   * @date 2019/1/22
   * @param event 接受消息事件
   */
  onMessage(event) {
    console.log('接收到的消息', event.data);
    // 将接受到的消息发布出去
    const message = JSON.parse(event.data);
    console.log('接收到消息时间', new Date().getTime());
    this.messageSubject.next(message);
  }

  /**
   * 连接关闭
   * @author LiQun
   * @date 2019/1/22
   */
  private onClose(e) {
    console.log('连接关闭', e);
    this.connectSuccess = false;
    this.webSocket.close();
    // 关闭时开始重连
    this.reconnect();
    this.stopRunTime();
    // throw new Error('webSocket connection closed:)');
  }

  /**
   * 连接异常
   * @author LiQun
   * @date 2019/1/22
   */
  private onError(e) {
    // 出现异常时一定会进onClose,所以只在onClose做一次重连动作
    console.log('连接异常', e);
    this.connectSuccess = false;
    // throw new Error('webSocket connection error:)');
  }

  /**
   * 开始重新连接
   * @author LiQun
   * @date 2019/1/22
   */
  reconnect() {
    // 如果已重连,则直接return,避免重复连接
    if (this.connectSuccess) {
      this.stopReconnect();
      console.log('已经连接成功,停止重连');
      return;
    }
    // 如果正在连接中,则直接return,避免产生多个轮训事件
    if (this.reconnectFlag) {
      console.log('正在重连,直接返回');
      return;
    }
    // 开始重连
    this.reconnectFlag = true;
    // 如果没能成功连接,则定时重连
    this.reconnectSubscription = interval(this.reconnectPeriod).subscribe(async (val) => {
      console.log(`重连:${val}次`);
      const url = this.url;
      // 重新连接
      this.connect(url);
    });
  }

  /**
   * 停止重连
   * @author LiQun
   * @date 2019/1/22
   */
  stopReconnect() {
    // 连接标识置为false
    this.reconnectFlag = false;
    // 取消订阅
    if (typeof this.reconnectSubscription !== 'undefined' && this.reconnectSubscription != null) {
      this.reconnectSubscription.unsubscribe();
    }
  }

  /**
   * 开始心跳检测
   * @author LiQun
   * @date 2019/1/22
   */
  heartCheckStart() {
    this.serverTimeoutSubscription = interval(this.period).subscribe((val) => {
      // 保持连接状态,重置下
      if (this.webSocket != null && this.webSocket.readyState === 1) {
        console.log(val, '连接状态,发送消息保持连接');
      } else {
        // 停止心跳
        this.heartCheckStop();
        // 开始重连
        this.reconnect();
        console.log('连接已断开,重新连接');
      }
    });
  }

  /**
   * 停止心跳检测
   * @author LiQun
   * @date 2019/1/22
   */
  heartCheckStop() {
    // 取消订阅停止心跳
    if (typeof this.serverTimeoutSubscription !== 'undefined' && this.serverTimeoutSubscription != null) {
      this.serverTimeoutSubscription.unsubscribe();
    }
  }

  /**
   * 开始计算运行时间
   * @author LiQun
   * @date 2019/1/25
   */
  calcRunTime() {
    this.runTimeSubscription = interval(this.runTimePeriod).subscribe(period => {
      console.log('运行时间', `${period}分钟`);
    });
  }

  /**
   * 停止计算运行时间
   * @author LiQun
   * @date 2019/1/25
   */
  stopRunTime() {
    if (typeof this.runTimeSubscription !== 'undefined' && this.runTimeSubscription !== null) {
      this.runTimeSubscription.unsubscribe();
    }
  }
}

websocket-page.component.html

<section>
  大家好!我是:{{currentUser}}
</section>
<section *ngFor="let msg of messages">
  <section>{{msg}}</section>
</section>
<section>
  当前登录用户:{{users}}
</section>
<section>
  <label>
    <select name="type" [(ngModel)]="type">
      <option value="all">发送给所有人</option>
      <option value="single">发送给个人</option>
    </select>
  </label>
</section>
<section>
  <label *ngIf="type === 'single'">
    <select name="user" [(ngModel)]="sendToUser">
      <option *ngFor="let person of users" [value]="person">{{person}}</option>
    </select>
  </label>
</section>
<p *ngIf="error">{{error | json}}</p>
<p *ngIf="completed">completed!</p>
<label>
  <input type="text" [(ngModel)]="message">
</label>
<button (click)="send()">发送</button>

websocket-page.component.ts

import {Component, OnInit} from '@angular/core';
import {WebsocketService} from '../../service/websocket/websocket.service';
import {ActivatedRoute, Params} from '@angular/router';

@Component({
  selector: 'app-websocket-page',
  templateUrl: './websocket-page.component.html',
  styleUrls: ['./websocket-page.component.css']
})
export class WebsocketPageComponent implements OnInit {
  messages = [];                      // 消息列表
  message;                            // 发送的消息内容
  error: any;                         // 异常信息
  completed = false;                  // 发送完成
  type = SEND_TYPE.ALL;               // 默认类型发送给所有人
  users = [];                         // 登陆的用户
  sendToUser;                         // 需要发送消息的用户
  currentUser;                        // 当前用户
  constructor(
    private webSocketService: WebsocketService,
    private activatedRoute: ActivatedRoute
  ) {
    // 从路由中获取参数
    this.activatedRoute.params.subscribe((params: Params) => {
      this.currentUser = params['id'];
    });
  }

  ngOnInit(): void {
    // 连接websocket
    this.webSocketService.connect(`ws://localhost:8080/ws//echo?id=${this.currentUser}`);
    // 接收消息
    this.webSocketService.messageSubject.subscribe(
      data => {
        // 如果是用户登陆,则添加到登陆列表中
        if (data.users) {
          this.users = data.users;
        } else {
          // 否则添加到消息列表
          this.messages.push(data.msg);
        }
      },
      err => this.error = err,
      () => this.completed = true
    );
  }

  /**
   * 发送消息
   * @author LiQun
   * @date 2019/1/25
   */
  send() {
    // 创建消息对象
    const msg = {
      msg: this.message,                                                    // 消息内容
      type: this.type === SEND_TYPE.ALL ? SEND_TYPE.ALL : SEND_TYPE.SINGLE, // 类型
      to: this.type === SEND_TYPE.SINGLE ? this.sendToUser : undefined            // 要发送的对象
    };
    // 发送
    this.webSocketService.sendMessage(JSON.stringify(msg));
    // 发送完成,情况message内容
    this.message = '';
  }
}

export enum SEND_TYPE {
  ALL = 'all',
  SINGLE = 'single'
}

java后台(springboot):

WebSocketConfig.java

package com.gemini.websocketdemo01.websocket;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import javax.annotation.Resource;

/**
 * websocket 配置文件
 *
 * @author LiQun
 * @date 2019/1/22
 */
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {


    @Resource
    private MyHandShakeInterceptor handshake;

    @Resource
    private MyHandler handler;

    /**
     * 实现 WebSocketConfigurer 接口,重写 registerWebSocketHandlers 方法,这是一个核心实现方法,配置 websocket 入口,允许访问的域、注册 Handler、SockJs 支持和拦截器。
     * <p>
     * registry.addHandler()注册和路由的功能,当客户端发起 websocket 连接,把 /path 交给对应的 handler 处理,而不实现具体的业务逻辑,可以理解为收集和任务分发中心。
     * <p>
     * addInterceptors,顾名思义就是为 handler 添加拦截器,可以在调用 handler 前后加入我们自己的逻辑代码。
     * <p>
     * setAllowedOrigins(String[] domains),允许指定的域名或 IP (含端口号)建立长连接,如果只允许自家域名访问,这里轻松设置。如果不限时使用”*”号,如果指定了域名,则必须要以 http 或 https 开头。
     *
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        //部分 支持websocket 的访问链接,允许跨域
        registry.addHandler(handler, "/echo").addInterceptors(handshake).setAllowedOrigins("*");
        //部分 不支持websocket的访问链接,允许跨域
        registry.addHandler(handler, "/sockjs/echo").addInterceptors(handshake).setAllowedOrigins("*").withSockJS();
    }
}

MyHandShakeInterceptor.java

package com.gemini.websocketdemo01.websocket;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpServletRequest;
import java.util.Map;

/**
 * websocket消息拦截器
 * @author LiQun
 * @date 2019/1/22
 */
@Service
public class MyHandShakeInterceptor implements HandshakeInterceptor {
    @Override
    public boolean beforeHandshake(
            ServerHttpRequest request,
            ServerHttpResponse response, WebSocketHandler webSocketHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
            String id = request.getURI().toString().split("id=")[1];
            System.out.println("当前session的ID=" + id);
//             从session中获取到当前登录的用户信息. 作为socket的账号信息. session的的WEBSOCKET_USER_ID信息,在用户打开页面的时候设置.
            attributes.put("WEBSOCKET_USER_ID", id);
        }
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest,
                               ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

    }
}

MyHandler.java

package com.gemini.websocketdemo01.websocket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.*;

import org.apache.commons.logging.Log;

/**
 * 处理websocket消息
 *
 * @author LiQun
 * @date 2019/1/22
 */
@Service
public class MyHandler implements WebSocketHandler {

    private static Log logger = LogFactory.getLog(MyHandler.class);

    /**
     * 在线用户,将其保存在set中,避免用户重复登录,出现多个session
     */
    private static final Map<String, WebSocketSession> USERS;

    static {
        USERS = Collections.synchronizedMap(new HashMap<>());
    }

    private static final String SEND_ALL = "all";


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        System.out.println("链接成功.....");
        logger.info("afterConnectionEstablished");
        String id = (String) session.getAttributes().get("WEBSOCKET_USER_ID");
        logger.info("用户id:" + id);
        if (id != null) {
            USERS.put(id, session);
            JSONObject obj = new JSONObject();
            // 统计一下当前登录系统的用户有多少个
            obj.put("count", USERS.size());
            obj.put("users", USERS.keySet().toArray());
            session.sendMessage(new TextMessage(obj.toJSONString()));
        }
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        JSONObject msg = JSON.parseObject(message.getPayload().toString());
        logger.info("处理要发送的消息:" + message.getPayload().toString());
        JSONObject obj = new JSONObject();
        String type = msg.get("type").toString();
        if (StringUtils.isNotBlank(type) && SEND_ALL.equals(type)) {
            //给所有人
            obj.put("msg", msg.getString("msg"));
            logger.info("给所有人发消息");
            sendMessageToUsers(new TextMessage(obj.toJSONString()));
        } else {
            //给个人
            String to = msg.getString("to");
            obj.put("msg", msg.getString("msg"));
            logger.info("给个人发消息");
            sendMessageToUser(to, new TextMessage(obj.toJSONString()));
        }
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            session.close();
        }
        logger.info("链接出错,关闭链接,异常信息:" + exception.getMessage());
        String userId = getUserId(session);
        if (USERS.get(userId) != null) {
            USERS.remove(userId);
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        logger.info("链接关闭,关闭信息:" + closeStatus.toString());
        String userId = getUserId(session);
        if (USERS.get(userId) != null) {
            USERS.remove(userId);
        }
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 获取用户id
     *
     * @author LiQun
     * @date 2019/1/22
     */
    private String getUserId(WebSocketSession session) {
        try {
            return (String) session.getAttributes().get("WEBSOCKET_USER_ID");
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 给所有在线用户发送消息
     * @param message 文本消息
     */
    public void sendMessageToUsers(TextMessage message) {
        WebSocketSession user = null;
        for (String key : USERS.keySet()) {
            user = USERS.get(key);
            try {
                if (user.isOpen()) {
                    user.sendMessage(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 给某个用户发送消息
     *
     * @param userName 用户id
     * @param message  消息
     */
    public void sendMessageToUser(String userName, TextMessage message) {
        WebSocketSession user = USERS.get(userName);
        try {
            if (user.isOpen()) {
                user.sendMessage(message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据当前用户id登出
     *
     * @author LiQun
     * @date 2019/1/22
     * @param userId 用户id
     */
    public void logout(String userId) {
        USERS.remove(userId);
        logger.info("用户登出,id:" + userId);
    }
}

WebSocketController.java

package com.gemini.websocketdemo01.websocket;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.socket.TextMessage;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;

/**
 * websocket 消息发送Controller
 *
 * @author LiQun
 * @date 2019/1/22
 */
@Controller
@RequestMapping("/socket")
public class WebSocketController {

    @Resource
    private MyHandler myHandler;

    /**
     * 发送消息给指定人
     *
     * @param request
     * @return
     */
    @RequestMapping("/send")
    public Map<String, Object> send(HttpServletRequest request) {
        Map<String, Object> result = new HashMap<>();
        try {
            // 假设用户jack登录,存储到session中
            String id = request.getParameter("id");
            String message = request.getParameter("message");
            this.myHandler.sendMessageToUser(id, new TextMessage(message));
        } catch (Exception e) {
            result.put("success", false);
            result.put("message", e.getMessage());
        }
        return result;
    }

    /**
     * 指定id下线
     *
     * @author LiQun
     * @date 2019/1/22
     */
    @RequestMapping("logout")
    public Map<String, Object> logout(HttpServletRequest request) {
        Map<String, Object> result = new HashMap<>();
        try {
            String id = request.getParameter("id");
            this.myHandler.logout(id);
            result.put("success", true);
        } catch (Exception e) {
            result.put("success", false);
            result.put("message", e.getMessage());
        }
        return result;
    }
}

StringUtils.java

package com.gemini.websocketdemo01.websocket;

/**
 * string工具类
 * @author LiQun
 * @date 2019/1/25
 */
public class StringUtils {
    /**
     * 判断指定字符串是否不等于null和空字符串
     *
     * @param str 指定字符串
     * @return 如果不等于null和空字符串则返回true,否则返回false
     */
    public static boolean isNotBlank(String str) {
        return !isBlank(str);
    }

    /**
     * 判断指定字符串是否等于null或空字符串
     *
     * @param str 指定字符串
     * @return 如果等于null或空字符串则返回true,否则返回false
     */
    public static boolean isBlank(String str) {
        return str == null || "".equals(str.trim()) || str == "null";
    }
}

运行工程后分别打开两个地址
http://localhost:4200/#/websocket/01
http://localhost:4200/#/websocket/02
最终效果图:

websocket.gif

代码已上传到github
前端地址: https://github.com/jytx/websocket-demo.git

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容