数据的封装与传输
上一篇文章讲到Websocket握手协议的处理,现在开始说数据的传输。Websocket数据帧的封装和传输其实和处理握手请求的流程差不太多,都需要通过bytebuffer写入Socket的输出流或者从输入流读取。这里我们从解析数据帧开始,知道如何解析数据帧,封装也就不成话下。
我们来重新复习一下Websocket的数据传输协议
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
具体每一bit的意思
FIN 1bit 表示信息的最后一帧
RSV 1-3 1bit each 以后备用的 默认都为 0
Opcode 4bit 帧类型,稍后细说
Mask 1bit 掩码,是否加密数据,默认必须置为1
Payload 7bit 数据的长度
Masking-key 1 or 4 bit 掩码
Payload data (x + y) bytes 数据
Extension data x bytes 扩展数据
Application data y bytes 程序数据
其中较为重要的是Opcode字段,这个字段表示帧的类型,例如这个传输的帧是文本类型还是二进制类型,二进制类型传输的数据可以是图片或者语音之类的。
OPCODE:4位
解释PayloadData,如果接收到未知的opcode,接收端必须关闭连接。
0x0表示附加数据帧
0x1表示文本数据帧
0x2表示二进制数据帧
0x3-7暂时无定义,为以后的非控制帧保留
0x8表示连接关闭
0x9表示ping
0xA表示pong
0xB-F暂时无定义,为以后的控制帧保留
当我们解析完数据帧后,需要根据Opcode字段的类型进行对消息的不同回调处理,帧数据可以如下定义,Opcode通过枚举定义出来
public abstract interface Framedata
{
public abstract boolean isFin();
public abstract boolean getTransfereMasked();
public abstract Opcode getOpcode();
public abstract ByteBuffer getPayloadData();
public abstract void append(Framedata paramFramedata)
throws InvalidFrameException;
public static enum Opcode
{
CONTINUOUS, TEXT, BINARY, PING, PONG, CLOSING;
}
}
解析首先得判断Fin字段和Opcode字段。这两个字段跟一个消息分片(Fragment)的概念有关,一般的已知长度的消息,Fin值为1,表示结束,Opcode值不能为0,可以看一下上面代码的枚举类型,0代表CONTINUOUS,就是说有连续的数据帧会发送过来。
而某些未知长度的消息,则需要把消息分片发送。根据我的理解,实时的语音聊天就属于这种情形。这时候前面帧的Fin值为0,Opcode值为0,最后的结束帧Fin值为1,Opcode不为0。
首先应该是读取二进制流解析数据,从bytebuffer中取出数据,按byte来读取,一个byte有8个bit,数据帧是按bit来定义的,我们还得从byte中解析出具体Websocket协议中的每一个帧。
下面是读取二进制流的代码,值得关注的是通过位与运算获取具体每一个bit的数据,另外比较麻烦的是playload的处理,以及还需要根据Mask掩码来解密数据,传输协议里面有一位Mask,代表是否加密数据,默认设置为1。这里就不细说了
public Framedata translateSingleFrame(ByteBuffer buffer) throws Draft_10.IncompleteException, InvalidDataException {
int maxpacketsize = buffer.remaining();
int realpacketsize = 2;
if (maxpacketsize < realpacketsize)
throw new IncompleteException(realpacketsize);
byte b1 = buffer.get();
boolean FIN = b1 >> 8 != 0;
byte rsv = (byte) ((b1 & 0x7F) >> 4);
if (rsv != 0)
throw new InvalidFrameException("bad rsv " + rsv);
byte b2 = buffer.get();
boolean MASK = (b2 & 0xFFFFFF80) != 0;
int payloadlength = (byte) (b2 & 0x7F);
Framedata.Opcode optcode = toOpcode((byte) (b1 & 0xF));
if ((!FIN) && (
(optcode == Framedata.Opcode.PING) || (optcode == Framedata.Opcode.PONG) || (optcode == Framedata.Opcode.CLOSING))) {
throw new InvalidFrameException("control frames may no be fragmented");
}
if ((payloadlength < 0) || (payloadlength > 125)) {
if ((optcode == Framedata.Opcode.PING) || (optcode == Framedata.Opcode.PONG) || (optcode == Framedata.Opcode.CLOSING)) {
throw new InvalidFrameException("more than 125 octets");
}
if (payloadlength == 126) {
realpacketsize += 2;
if (maxpacketsize < realpacketsize)
throw new IncompleteException(realpacketsize);
byte[] sizebytes = new byte[3];
sizebytes[1] = buffer.get();
sizebytes[2] = buffer.get();
payloadlength = new BigInteger(sizebytes).intValue();
} else {
realpacketsize += 8;
if (maxpacketsize < realpacketsize)
throw new IncompleteException(realpacketsize);
byte[] bytes = new byte[8];
for (int i = 0; i < 8; i++) {
bytes[i] = buffer.get();
}
long length = new BigInteger(bytes).longValue();
if (length > 2147483647L) {
throw new LimitExedeedException("Payloadsize is to big...");
}
payloadlength = (int) length;
}
}
realpacketsize += (MASK ? 4 : 0);
realpacketsize += payloadlength;
if (maxpacketsize < realpacketsize) {
throw new IncompleteException(realpacketsize);
}
ByteBuffer payload = ByteBuffer.allocate(checkAlloc(payloadlength));
if (MASK) {
byte[] maskskey = new byte[4];
buffer.get(maskskey);
for (int i = 0; i < payloadlength; i++)
payload.put((byte) (buffer.get() ^ maskskey[(i % 4)]));
} else {
payload.put(buffer.array(), buffer.position(), payload.limit());
buffer.position(buffer.position() + payload.limit());
}
FrameBuilder frame;
FrameBuilder frame;
if (optcode == Framedata.Opcode.CLOSING) {
frame = new CloseFrameBuilder();
} else {
frame = new FramedataImpl1();
frame.setFin(FIN);
frame.setOptcode(optcode);
}
payload.flip();
frame.setPayload(payload);
return frame;
}
读取完后就是根据这个数据帧的类型来进行不同的回调
private void decodeFrames(ByteBuffer socketBuffer) {
try {
List frames = this.draft.translateFrame(socketBuffer);//读取二进制流
for (Framedata f : frames) {
if (DEBUG)
System.out.println("matched frame: " + f);
Framedata.Opcode curop = f.getOpcode();
boolean fin = f.isFin();
if (curop == Framedata.Opcode.CLOSING) { //关闭帧
int code = 1005;
String reason = "";
if ((f instanceof CloseFrame)) {
CloseFrame cf = (CloseFrame) f;
code = cf.getCloseCode();
reason = cf.getMessage();
}
if (this.readystate == WebSocket.READYSTATE.CLOSING) {
closeConnection(code, reason, true);
} else if (this.draft.getCloseHandshakeType() == Draft.CloseHandshakeType.TWOWAY)
close(code, reason, true);
else {
flushAndClose(code, reason, false);
}
} else if (curop == Framedata.Opcode.PING) { //Ping
this.wsl.onWebsocketPing(this, f);
} else if (curop == Framedata.Opcode.PONG) { //Pong
this.wsl.onWebsocketPong(this, f);
} else if ((!fin) || (curop == Framedata.Opcode.CONTINUOUS)) { //分片消息
if (curop != Framedata.Opcode.CONTINUOUS) {
if (this.current_continuous_frame_opcode != null)
throw new InvalidDataException(1002, "Previous continuous frame sequence not completed.");
this.current_continuous_frame_opcode = curop;
} else if (fin) {
if (this.current_continuous_frame_opcode == null)
throw new InvalidDataException(1002, "Continuous frame sequence was not started.");
this.current_continuous_frame_opcode = null;
} else if (this.current_continuous_frame_opcode == null) {
throw new InvalidDataException(1002, "Continuous frame sequence was not started.");
}
try {
this.wsl.onWebsocketMessageFragment(this, f);
} catch (RuntimeException e) {
this.wsl.onWebsocketError(this, e);
}
} else { //普通消息
if (this.current_continuous_frame_opcode != null)
throw new InvalidDataException(1002, "Continuous frame sequence not completed.");
if (curop == Framedata.Opcode.TEXT) //文本消息
try {
this.wsl.onWebsocketMessage(this, Charsetfunctions.stringUtf8(f.getPayloadData()));
} catch (RuntimeException e) {
this.wsl.onWebsocketError(this, e);
}
else if (curop == Framedata.Opcode.BINARY) //二进制消息
try {
this.wsl.onWebsocketMessage(this, f.getPayloadData());
} catch (RuntimeException e) {
this.wsl.onWebsocketError(this, e);
}
else
throw new InvalidDataException(1002, "non control or continious frame expected");
}
}
} catch (InvalidDataException e1) {
this.wsl.onWebsocketError(this, e1);
close(e1);
return;
}
}
现在说一下控制帧的处理,WebSocket控制帧有3种:Close(关闭帧)、Ping以及Pong。Close关闭帧很容易理解,客户端如果接受到了就关闭连接,客户端也可以发送关闭帧给服务端。Ping和Pong是websocket里的心跳,用来保证客户端是在线的,一般来说只有服务端给客户端发送Ping,然后客户端发送Pong来回应,表明自己仍然在线。
我们来看一下Nathan Rajlich的Java-Websocket代码,客户端对Ping的处理很简单,把收到的Ping帧改一下Opcode的类型,就可以发送回给服务端了。可以看到客户端处理服务端发送的Pong回调的方法是空的。
public void onWebsocketPing(WebSocket conn, Framedata f) {
FramedataImpl1 resp = new FramedataImpl1(f);
resp.setOptcode(Framedata.Opcode.PONG);
conn.sendFrame(resp);
}
public void onWebsocketPong(WebSocket conn, Framedata f) {
}
这篇暂时写到这里好了,下一篇写写Websocket Client和Websocket Server的实现