部标监控平台指令开发

部标监控平台核心模块之一指令操作,网上关于这方面的设计文章基本没有,今天带大家一起来开发,文章里的MQ采用了RabbitMQ。
我们先来看下整个系统的流程图,各模块之间的通信采用MQ交互,例如JT808网关接收终端的位置、图片等信息放到MQ,WEB后台订阅后进行入库,JT809网关订阅后上传给上级平台;WEB后台下发指令通过MQ到JT808网关再到终端,终端应答后再通过MQ返回到WEB后台;WEB后台也可以下发指令操作JT809网关。


image.png

那么如何设计通用的WEB后台与各网关程序的MQ传输协议呢?

  1. 指令分为两种,同步指令和异步指令。例如客户端下发文本信息给终端,需要等待终端返回成功与否的应答,这种指令属于同步指令,可以设置一个等待超时时间。客户设置了定时拍照的规则,平台按照指定时间自动给终端发送拍照指令,客户端不需要等待终端应答结果,这种指令属于异步指令。
  2. 终端协议不止部标协议,还有各个厂商的私有协议,组包发给终端的内容肯定不一样。我们看下部标协议,各种指令变化的是消息体,利用这点特性我们可以根据指令ID按协议去组好消息体发送给JT808网关,JT808网关再组成完整协议包发给终端。


    image.png
  3. 如何精准的把指令发送到对应的网关程序呢?每个网关程序都会命名不同的节点名称,终端上线时会把终端信息带上节点名称缓存到redis,平台发送指令时先去redis查询终端有无上线,如果未上线就不用发指令到MQ了。如果上线了根据节点名称组成的路由规则发送到MQ,网关程序根据节点名称订阅MQ就能收到指令消息了。
我们先定义前端的指令接口,泛型T是各种指令的消息体参数字段的类:
@ApiModel("指令信息")
@Data
public class CommandDTO<T> {

    @ApiModelProperty(value = "终端ID", required = true, position = 1)
    @NotNull(message = "终端ID不能为空")
    private Long terminalId;

    @ApiModelProperty(value = "协议类型", required = true, position = 2)
    @NotNull(message = "协议类型不能为空")
    private ProtocolEnum protocol = ProtocolEnum.JT808;

    @ApiModelProperty(value = "下行指令", position = 3)
    private String downCommandId;

    @ApiModelProperty(value = "指令内容", position = 4)
    private Map<String, Object> params;

    @ApiModelProperty(value = "请求方式(同步/异步)", position = 5)
    private CommandRequestTypeEnum requestType = CommandRequestTypeEnum.SYNC;

    @ApiModelProperty(value = "超时时间(毫秒,请求方式为同步时有效)", position = 6)
    private Integer responseTimeout;

    @ApiModelProperty(value = "指令内容实体", position = 7)
    @Valid
    private T paramsEntity;

    public Class getParamsEntityClass() {
        return paramsEntity.getClass();
    }
定义各种指令的参数类,每个参数类必须实现IDownCommandService接口,实现buildMessageBody方法,这个方法是根据参数和协议组成消息体内容,以部标文本下发指令为例:
image.png
@ApiModel("0x8300文本信息下发参数")
@Data
@DownCommand(messageId = 0x8300, respMessageId = 0x0001, desc = "文本信息下发")
public class Command8300Param implements IDownCommandService {

    @NotNull
    @ApiModelProperty(value = "标志位", required = true, position = 1)
    private List<Integer> flags;

    @NotBlank
    @ApiModelProperty(value = "文本信息", required = true, position = 2)
    private String textMsg;

    @Override
    public byte[] buildMessageBody() throws Exception {
        byte[] msgBodyArr = null;
        ByteBuf msgBody = null;
        try {
            char[] chars = new char[8];
            for (int i = 0; i < 8; i++) {
                char value = flags.contains(i) ? '1' : '0';
                chars[7 - i] = value;
            }
            int flag = Integer.parseInt(new String(chars), 2);
            byte[] textMsgArr = textMsg.getBytes(CommonConstants.DEFAULT_CHARSET_NAME);
            msgBody = Unpooled.buffer(textMsgArr.length + 1);
            msgBody.writeByte(flag).writeBytes(textMsgArr);
            msgBodyArr = msgBody.array();
        } catch (Exception e) {
            throw e;
        } finally {
            if (msgBody != null) {
                ReferenceCountUtil.release(msgBody);
            }
        }
        return msgBodyArr;
    }
}

@DownCommand(messageId = 0x8300, respMessageId = 0x0001, desc = "文本信息下发"),这个注解表明了这个类对应的指令ID和应答指令ID。

定义JT808接口类Jt808CommandController:
@Api(tags = "JT808指令操作")
@RestController
@RequestMapping("/api/v1/monitor/commands/jt808")
@Slf4j
public class Jt808CommandController {

    @Autowired
    private CommandOperationService commandOperationService;

    @ApiOperation("设置终端参数")
    @PostMapping("/sendCommand8103")
    public ResultDTO<CommandProto> sendCommand8103(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8103Param> commandDTO) {
        return sendJt808Command(commandDTO);
    }

    @ApiOperation("查询终端参数")
    @PostMapping("/sendCommand8104")
    public ResultDTO<CommandProto> sendCommand8104(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8104Param> commandDTO) {
        return sendJt808Command(commandDTO);
    }

    @ApiOperation("终端控制")
    @PostMapping("/sendCommand8105")
    public ResultDTO<CommandProto> sendCommand8105(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8105Param> commandDTO) {
        return sendJt808Command(commandDTO);
    }

    @ApiOperation("文本信息下发参数")
    @PostMapping("/sendCommand8300")
    public ResultDTO<CommandProto> sendCommand8300(@ApiParam("指令信息") @Valid @RequestBody CommandDTO<Command8300Param> commandDTO) {
        return sendJt808Command(commandDTO);
    }

   /**
     * 发送JT808指令
     * @param commandDTO
     * @return
     */
    private ResultDTO<CommandProto> sendJt808Command(CommandDTO<?> commandDTO) {
        commandDTO.setProtocol(ProtocolEnum.JT808);
        CommandProto result = commandOperationService.sendCommandEntity(commandDTO);
        return new ResultDTO<>(result);
    }
}
所有的指令都通过CommandOperationService处理,里面会判断终端是否上线、指令是否异步以及发送到MQ,如果有需要可以在这个服务里把指令操作入库。

我们看下同步指令的关键代码,利用JDK的CompletableFuture去等待结果,eventListener.register(commandEventKey, future)为什么需要注册事件呢?这是因为同一时间同一终端同一种指令有可能会有多个用户同时操作,WEB后台收到终端应答后就可以把结果返回给这些用户,这种技术是利用spring自带的事件监听和发布:

/**
     * 同步方式需要等待终端应答
     *
     * @param commandDTO
     * @param downCommandInfo
     * @param paramsJson
     * @return
     */
    private CommandProto waitForResult(CommandDTO commandDTO, DownCommandInfo downCommandInfo, String paramsJson) {
        //注册指令监听事件
        CompletableFuture<CommandProto> future = new CompletableFuture<>();
        long terminalId = commandDTO.getTerminalId();
        int timeout = commandDTO.getResponseTimeout();
        String downCommandId = commandDTO.getDownCommandId();
        String commandEventKey = GnssUtils.buildCommandEventKey(terminalId, downCommandId);
        eventListener.register(commandEventKey, future);

        CommandSendResultEnum commandSendResultEnum = CommandSendResultEnum.FAILED;
        try {
            //等待应答结果
            CommandProto result = future.get(timeout, TimeUnit.MILLISECONDS);
            log.info("收到指令应答,终端ID:{},指令类型:{},指令参数:{},应答结果:{}", terminalId, downCommandId, paramsJson, result);
            return result;
        } catch (TimeoutException e) {
            commandSendResultEnum = CommandSendResultEnum.TIMEOUT;
            log.error("等待指令应答超时,终端ID:{},指令类型:{},指令参数:{},等待时间:{}", terminalId, downCommandId, paramsJson, timeout, e);
        } catch (Exception e) {
            commandSendResultEnum = CommandSendResultEnum.INTERNAL_SERVER_ERROR;
            log.error("等待指令应答异常,终端ID:{},指令类型:{},指令参数:{}", terminalId, downCommandId, paramsJson, e);
        }

        //注销指令监听事件
        eventListener.unregister(commandEventKey, future);
        CommandProto result = buildCommandResponse(commandDTO, downCommandInfo, paramsJson, commandSendResultEnum);
        return result;
    }

spring监听事件代码:

@Component
@Slf4j
public class CommandEventListener {
    private ConcurrentHashMap<String, CopyOnWriteArrayList<CompletableFuture<CommandProto>>> subscriberMap = new ConcurrentHashMap<>();

    @EventListener
    public void onApplicationEvent(CommandEvent event) {
        String commandEventKey = event.getCommandEventKey();
        CommandProto message = event.getMessage();
        subscriberMap.computeIfPresent(commandEventKey, (k, v) -> {
            v.forEach(future -> {
                future.complete(message);
            });
            return null;
        });
        log.info("广播指令事件,commandEventKey:{},message:{}", commandEventKey, message);
    }

    /**
     * 注册指令事件监听
     *
     * @param commandEventKey
     * @param future
     */
    public void register(String commandEventKey, CompletableFuture<CommandProto> future) {
        subscriberMap.computeIfAbsent(commandEventKey, k -> new CopyOnWriteArrayList<>()).add(future);
        log.info("注册指令事件监听,commandEventKey:{},size:{}", commandEventKey, subscriberMap.get(commandEventKey).size());
    }

    /**
     * 注销指令事件监听
     *
     * @param commandEventKey
     * @param future
     */
    public void unregister(String commandEventKey, CompletableFuture<CommandProto> future) {
        subscriberMap.computeIfPresent(commandEventKey, (k, v) -> {
            v.remove(future);
            if (v.isEmpty()) {
                log.info("删除指令事件监听,commandEventKey:{}", commandEventKey);
                return null;
            }
            log.info("注销指令事件监听,commandEventKey:{},size:{}", commandEventKey, v.size());
            return v;
        });
    }
}
如何接收终端返回的应答消息呢?我们在MQ定义了上行指令通道,通过订阅接收消息,然后通过spring的事件把结果发布,CommandEventListener的onApplicationEvent方法会把结果广播给CompletableFuture,CommandProto result = future.get(timeout, TimeUnit.MILLISECONDS)就能得到结果了。
@Component
@RabbitListener(queues = RabbitConstants.UP_COMMAND_QUEUE)
@Slf4j
public class RabbitUpCommandReceiver {

    @Autowired
    private EventPublisher eventPublisher;

    @RabbitHandler
    public void handleUpCommand(CommandProto upCommand, Channel channel, Message message) throws Exception {
        log.info("收到上行指令:{}", upCommand);
        try {
            //异步发送的响应结果
            if (upCommand.getRequestType() == CommandRequestTypeEnum.ASYNC) {
                return;
            }

            //同步发送的响应结果
            long terminalId = upCommand.getTerminalId();
            String downCommandId = upCommand.getDownCommandId();
            String commandEventKey = GnssUtils.buildCommandEventKey(terminalId, downCommandId);
            eventPublisher.publishCommandEvent(commandEventKey, upCommand);
        } catch (Exception e) {
            log.error("处理上行指令异常异常{}", upCommand, e);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
}
以上是整个流程的关键代码,那么MQ的指令传输协议是怎样的呢?CommandProto在这个类里面,我们采用protobuf序列化,性能和传输大小比json要强很多。这个公共包已经开源了,请自行参考https://github.com/gnss-pro/common-project

打开swagger接口文档测试接口:


image.png

总结:基础架构完成后,以后开发每条指令的接口只需要创建对应指令的参数类并实现组包的buildMessageBody方法,在controller添加接口,实现了几分钟就搞定一条指令接口。

项目地址:https://github.com/gnss-pro/gnss-web

官方网站:http://www.gps-pro.cn
开源地址:https://github.com/gnss-pro
微信:17158638841 或扫描下图

image.png

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354