1.问题
11月21日,云南勐腊发生地震,ICL在推送相关地震数据后,Netty客户端收到的数据在日志中显示为不完整的JSON数据,导致json反序列化失败:
2.分析
PIS作为长链接的客户端,目前与ICL的通信协议是websocket,对应的数据处理报文类:io.netty.handler.codec.http.websocketx.TextWebSocketFrame
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
String text = textFrame.text();
LOGGER.info("【PIS-client】 received message:{}", text);
TypeProtocol typeProtocol = JSON.parseObject(text, TypeProtocol.class);
MessageAction<String, String> action = dispatcher.getMsgAction(typeProtocol.getType());
String response = action.process(text);
if (StringUtils.isNotBlank(response)) {
TextWebSocketFrame socketFrame = new TextWebSocketFrame(response);
ch.writeAndFlush(socketFrame);
}
} else if (frame instanceof PongWebSocketFrame) {
LOGGER.info("【PIS-client】 received pong");
} else if (frame instanceof CloseWebSocketFrame) {
LOGGER.info("【PIS-client】 received closing");
ch.close();
} else if (frame instanceof BinaryWebSocketFrame) {
LOGGER.info("【PIS-client】 received Binary");
}
初步怀疑是因为ICL发送的数据超出一定长度导致数据分片发送或被截断。
3.处理
本地启动Netty 服务和Netty Client,尝试使用相似的数据复现该问题,未成功复现。
4.解决
查阅相关网上资料后,得到如下信息:
websocket定义了一种“分片帧”,如果消息比较大,一次性接收不完,可以把数据分片传输。netty的实现就是:ContinuationWebSocketFrame 它有个属性:finalFragment 表示当前的数据,是否是最后一帧数据。
针对该信息,调整了处理逻辑:
WebSocketFrame frame = (WebSocketFrame) msg;
LOGGER.info("Received incoming frame [{}]", frame.getClass().getName());
if (frame instanceof CloseWebSocketFrame) {
LOGGER.info("【PIS-client】 received CloseWebSocketFrame");
if (frameBuffer != null) {
handleMessageCompleted(ctx, frameBuffer.toString());
}
frameBuffer = null;
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
} else if (frame instanceof PongWebSocketFrame) {
LOGGER.info("【PIS-client】 received pong");
return;
} else if (frame instanceof BinaryWebSocketFrame) {
LOGGER.info("【PIS-client】 received Binary");
return;
}
//正常业务数据
if (frame instanceof TextWebSocketFrame) {
frameBuffer = new StringBuilder();
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
String text = textFrame.text();
LOGGER.info("【PIS-client】received TextWebSocketFrame message:{}", text);
frameBuffer.append(text);
} else if (frame instanceof ContinuationWebSocketFrame) {
ContinuationWebSocketFrame continuationWebSocketFrame = (ContinuationWebSocketFrame) frame;
String cwsText = continuationWebSocketFrame.text();
LOGGER.info("【PIS-client】 received ContinuationWebSocketFrame ,value:{}", cwsText);
if (frameBuffer != null) {
frameBuffer.append(cwsText);
} else {
LOGGER.warn("【PIS-client】Continuation frame received without initial frame.");
}
} else {
LOGGER.warn("【PIS-client】 received unsupported frame {}", frame.toString());
return;
}
// Check if Text or Continuation Frame is final fragment and handle if needed.
if (frame.isFinalFragment()) {
handleMessageCompleted(ctx, frameBuffer.toString());
frameBuffer = null;
}
其中,frameBuffer为在当前的handler定义的成员变量stringBuilder,用来拼接TextWebSocketFrame
和ContinuationWebSocketFrame
中出现的文本字符。
目前,使用修改后的逻辑接收ICL提供的数据,未再发现问题。
5.疑问
网上对ContinuationWebSocketFrame的介绍比较少,目前还有以下疑点:
websocketframe的分片行为,到底是服务端行为,还是客户端行为
-
“分片帧”的大小是多少,如何控制
后续如果对这些问题有进展,会在本文进行更新。