SpringBoot整合WebSocket实现数据实时发送

WebSocket简介

目的

HTML5 WebSocket设计出来的目的就是取代轮询和长连接,使客户端浏览器具备像C/S框架下桌面系统的即时通讯能力,实现了浏览器和服务器全双工通信,建立在TCP之上,虽然WebSocket和HTTP一样通过TCP来传输数据,但WebSocket可以主动的向对方发送或接收数据,就像Socket一样;并且WebSocket需要类似TCP的客户端和服务端通过握手连接,连接成功后才能互相通信。

优点

双向通信、事件驱动、异步、使用ws或wss协议的客户端能够真正实现意义上的推送功能。

缺点

少部分浏览器不支持。

示例

社交聊天(微信、QQ)、弹幕、多玩家玩游戏、协同编辑、股票基金实时报价、体育实况更新、视频会议/聊天、基于位置的应用、在线教育、智能家居等高实时性的场景。

WebSocket请求响应客户端服务器交互图
请求响应客户端服务器交互图.png

WebSocket方式减少了很多TCP打开和关闭连接的操作,WebSocket的资源利用率高。

java WebSocket实现

Oracle 发布的 java 的 WebSocket 的规范是 JSR356规范 ,Tomcat从7.0.27开始支持WebSocket,从7.0.47开始支持JSR-356。

websocket简单实现分为以下几个步骤:

  1. 添加websocket库
  2. 编写后台代码
  3. 编写前端代码。
添加websocket整合springboot依赖
   <!--websocket-->
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
   </dependency>
后台代码实现

配置类


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 首先注入一个ServerEndpointExporterBean,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
 * 链接:https://www.imooc.com/article/70702?block_id=tuijian_wz
 */
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

websocket接口实现


@ServerEndpoint(value = "/websocket") //接受websocket请求路径
@Component
public class PoundWebSocket {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 保存所有在线socket连接
     */
    private static Map<String, PoundWebSocket> webSocketMap = new LinkedHashMap<>();

    /**
     * 记录当前在线数目
     */
    private static int count = 0;

    /**
     * 当前连接(每个websocket连入都会创建一个MyWebSocket实例
     */
    private Session session;

    /**
     * 创建监听串口
     */
    private static SerialPort serialPort = null;

    /**
     * 创建监听器
     */
    private static SerialPortEventListener serialPortEventListener = null;

    /**
     * 监听串口
     */
    private static String PORT_NAME;

    /**
     * 监听串口波特率
     */
    private static int BAUD_RATE;

    /**
     * 数据位
     */
    private static int DATA_BITS;

    /**
     * 停止位
     */
    private static int STOP_BITS;

    /**
     * 奇偶位
     */
    private static int PARITY;

    /**
     * 地磅型号
     */
    private static String MODEL;

    private static IPoundInfoService poundInfoService;

    private static ApplicationContext applicationContext;

    public static void setApplicationContext(ApplicationContext applicationContext) {
        PoundWebSocket.applicationContext = applicationContext;
    }

    private static StringBuffer stringBuffer = new StringBuffer();

    /**
     * 处理连接建立
     *
     * @param session
     */
    @OnOpen
    public void onOpen(Session session) {
        if (poundInfoService == null) {
            poundInfoService = applicationContext.getBean(IPoundInfoService.class);
        }
        //获取地磅信息
        PoundInfo poundInfo = poundInfoService.findOne();
        PORT_NAME = poundInfo.getSerialPort();
        BAUD_RATE = poundInfo.getBaudRate();
        MODEL = poundInfo.getModel();
        DATA_BITS = poundInfo.getDataBits() != null ? poundInfo.getDataBits() : SerialPort.DATABITS_8;
        STOP_BITS = poundInfo.getStopBits() != null ? poundInfo.getStopBits() : SerialPort.STOPBITS_1;
        PARITY = poundInfo.getParity() != null ? poundInfo.getParity() : SerialPort.PARITY_NONE;

        this.session = session;
        webSocketMap.put(session.getId(), this);
        addCount();
//        logger.info("新的连接加入:{}", session.getId());
        try {
            //确保串口已被关闭,未关闭会导致重新监听串口失败
            if (serialPort != null) {
                SerialPortUtil.closePort(serialPort);
                serialPort = null;
            }
            //创建串口 COM5位串口名称 9600波特率
            if (serialPort == null && StringUtils.isNotEmpty(PORT_NAME) && StringUtils.isNotEmpty(MODEL)) {
                serialPort = SerialPortUtil.openPort(PORT_NAME, BAUD_RATE, DATA_BITS, PARITY, STOP_BITS);
//                logger.info("创建串口:{}", serialPort);
                //设置串口监听
                SerialPortUtil.addListener(serialPort, new SerialPortEventListener() {

                    @Override
                    public void serialEvent(SerialPortEvent serialPortEvent) {
                        if (serialPortEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
                            try {
                                //读取串口数据
                                byte[] bytes = SerialPortUtil.readFromPort(serialPort);

                                //根据型号解析字符串
                                switch (MODEL) {
                                    case PoundConstant.MODEL_XK_3190:
                                        parsingString1(bytes);
                                        break;
                                    case PoundConstant.MODEL_XK_3190_10:
                                        parsingString2(bytes);
                                        break;
                                    case PoundConstant.MODEL_D_2008:
                                        parsingString1(bytes);
                                        break;
                                    case PoundConstant.MODEL_DK_3230_D_6:
                                        parsingString3(bytes);
                                        break;
                                    case PoundConstant.MODEL_D_2009_F:
                                        parsingString4(bytes);
                                        break;
                                    default:
                                        String value = String.valueOf(Integer.valueOf(new String(bytes, "GB2312")) - RandomUtil.randomInt(1000, 10000));
                                        sendMessageToAll(value);
                                }

//                                System.out.println("收到的数据:" + new String(bytes, "GB2312") + "----" + new Date());

                            } catch (ReadDataFromSerialPortFailure readDataFromSerialPortFailure) {
                                logger.error(readDataFromSerialPortFailure.toString());
                            } catch (SerialPortInputStreamCloseFailure serialPortInputStreamCloseFailure) {
                                logger.error(serialPortInputStreamCloseFailure.toString());
                            } catch (UnsupportedEncodingException e) {
                                logger.error(e.toString());
                            } catch (IOException e) {
                                logger.error(e.toString());
                            }
                        }
                    }
                });
            }
        } catch (SerialPortParameterFailure serialPortParameterFailure) {
            logger.error(serialPortParameterFailure.toString());
        } catch (NotASerialPort notASerialPort) {
            logger.error(notASerialPort.toString());
        } catch (NoSuchPort noSuchPort) {
            logger.error(noSuchPort.toString());
        } catch (PortInUse portInUse) {
            logger.error(portInUse.toString());
        } catch (TooManyListeners tooManyListeners) {
            logger.error(tooManyListeners.toString());
        }
    }

    /**
     * 解析字符串 方法1
     *
     * @param bytes 获取的字节码
     */
    private void parsingString1(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //将ASCII码转成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }

        //解析字符串
        String[] strs = sb.toString().trim().split("\\+");
        int weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Integer.parseInt(strs[j].trim().substring(0, 6));
                //发送数据
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 解析字符串 方法2
     *
     * @param bytes 获取的字节码
     */
    private void parsingString2(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //将ASCII码转成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }
        //解析字符串
        String[] strs = sb.toString().trim().split("\\+");
        double weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Double.parseDouble(strs[j].trim().substring(0, 6)) / 10;
                //发送数据
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 解析字符串 方法3
     *
     * @param bytes 获取的字节码
     */
    private void parsingString3(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //将ASCII码转成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }

//        logger.info("sb:" + sb.toString());
        sb.reverse();

        //解析字符串
        String[] strs = sb.toString().trim().split("\\=");
        double weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Double.parseDouble(strs[j].trim());
                //发送数据
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 解析字符串 方法3
     *
     * @param bytes 获取的字节码
     */
    private void parsingString4(byte[] bytes) {
        StringBuffer sb = new StringBuffer();
        //将ASCII码转成字符串
        for (int i = 0; i < bytes.length; i++) {
            sb.append((char) Integer.parseInt(String.valueOf(bytes[i])));
        }

//        logger.info("sb:" + sb.reverse());
        //字符串反转
        sb.reverse();

        //解析字符串
        String[] strs = sb.toString().trim().split("\\=");
        int weight = 0;
        for (int j = 0; j < strs.length; j++) {
            if (strs[j].trim().length() >= 6) {
                weight = Integer.parseInt(strs[j].trim().substring(0, 6));
                //发送数据
                sendMessageToAll(String.valueOf(weight));
                break;
            }
        }
    }

    /**
     * 接受消息
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        logger.info("收到客户端{}消息:{}", session.getId(), message);
        try {
            this.sendMessage(message);
        } catch (Exception e) {
            logger.error(e.toString());
        }
    }

    /**
     * 处理错误
     *
     * @param error
     * @param session
     */
    @OnError
    public void onError(Throwable error, Session session) {
        logger.info("发生错误{},{}", session.getId(), error.getMessage());
    }

    /**
     * 处理连接关闭
     */
    @OnClose
    public void onClose() {
        webSocketMap.remove(this.session.getId());
        reduceCount();
        logger.info("连接关闭:{}", this.session.getId());

        //连接关闭后关闭串口,下一次打开连接重新监听串口
        if (serialPort != null) {
            SerialPortUtil.closePort(serialPort);
            serialPort = null;
        }
    }

    /**
     * 群发消息
     *
     * @param message
     */
    public void sendMessageToAll(String message) {
        for (int i = 0; i < webSocketMap.size(); i++) {
            try {
//                logger.info("session:id=" + session.getId());
                this.session.getBasicRemote().sendText(message);
            } catch (IOException e) {
                logger.error(e.getMessage());
            }
        }
    }

    /**
     * 发送消息
     *
     * @param message
     * @throws IOException
     */
    public void sendMessage(String message) throws IOException {
//        logger.info("session:id=" + session.getId());
        this.session.getBasicRemote().sendText(message);
    }

    //广播消息
    public static void broadcast() {
        PoundWebSocket.webSocketMap.forEach((k, v) -> {
            try {
                v.sendMessage("这是一条测试广播");
            } catch (Exception e) {
            }
        });
    }

    //获取在线连接数目
    public static int getCount() {
        return count;
    }

    //操作count,使用synchronized确保线程安全
    public static synchronized void addCount() {
        PoundWebSocket.count++;
    }

    public static synchronized void reduceCount() {
        PoundWebSocket.count--;
    }

}

解决websocket不能注入bean的问题

import com.yotrio.pound.sockets.PoundWebSocket;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext;

@ServletComponentScan
// mapper 接口类扫描包配置
@MapperScan("com.yotrio.pound.dao")
@SpringBootApplication
public class PoundClientApplication {

    public static void main(String[] args) {
        // 程序启动入口
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
//        SpringApplication.run(PoundClientApplication.class, args);

        ConfigurableApplicationContext applicationContext = SpringApplication.run(PoundClientApplication.class, args);
        //解决websocket不能注入bean的问题
        PoundWebSocket.setApplicationContext(applicationContext);
    }

}
前台js实现
    //websocket实现
    var websocket;
    var socketUrl = "ws://127.0.0.1:8000/websocket";
    var count = 0;
    if ('WebSocket' in window) {
        // console.log("此浏览器支持websocket");
        websocket = new WebSocket(socketUrl);
    } else if ('MozWebSocket' in window) {
        alert("此浏览器只支持MozWebSocket");
    } else {
        alert("此浏览器只支持SockJS");
    }
    websocket.onopen = function (evnt) {
        // $("#tou").html("链接服务器成功!");
        console.log("链接服务器成功");
    };
    websocket.onmessage = function (evnt) {
        // console.log("event", evnt.data);
        $("#currentWeight").val(evnt.data);
    };
    websocket.onerror = function (evnt) {
        console.log("消息异常:" + evnt.data);
    };
    websocket.onclose = function (evnt) {
        console.log("与服务器断开了链接");
    }

友情链接

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容