一. 初始化
- (void)_SR_commonInit
{
NSString *scheme = _url.scheme.lowercaseString;
// 只支持ws wss http https 四种协议
assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
//wss https 是安全型的协议
if([schemeisEqualToString:@"wss"] || [schemeisEqualToString:@"https"]) {
_secure=YES;
}
_readyState = SR_CONNECTING; //Socket默认状态
_consumerStopped = YES;
//客户端选择的版本13
_webSocketVersion = 13;
//初始化工作的队列,串行
_workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
// Going to set a specific on the queue so we can validate we're on the work queue 队列设置一个标识
dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);
_delegateDispatchQueue = dispatch_get_main_queue();
sr_dispatch_retain(_delegateDispatchQueue);
_readBuffer = [[NSMutableData alloc] init]; //读buffer
_outputBuffer = [[NSMutableData alloc] init]; // 写buffer
_currentFrameData = [[NSMutableData alloc] init];//当前数据帧
_consumers = [[NSMutableArray alloc] init];//消费者数组
_consumerPool = [[SRIOConsumerPool alloc] init];//消费者池,管理消费者数组
_scheduledRunloops = [[NSMutableSet alloc] init];//注册的runloop,放到集合里
// 初始化流对象
[self _initializeStreams];
}
// 初始化流对象
- (void)_initializeStreams;
{
assert(_url.port.unsignedIntValue <= UINT32_MAX);
uint32_t port = _url.port.unsignedIntValue;//取端口值
//默认端口号:http 80 https 443;
if (port == 0) {
if (!_secure) {
port = 80;
} else {
port = 443;
}
}
/*这里为什么要用CFStream创建NSInputStream和NSOutputStream ?
NSStream类不支持连接到远程主机,CFStream支持.两者可以转换。
CFStreamCreatePairWithSocketToHost函数并传递主机名和端口号,来获 取一个CFReadStreamRef和一个CFWriteStreamRef来进行通信,然后我们可以将它们转换为NSInputStream和NSOutputStream对象来处理。*/
NSString *host = _url.host;
CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);
//把C语言的输入输出流转化成OC对象
//CFBridgingRelease 作用:非OC指针指向OC
_outputStream = CFBridgingRelease(writeStream);
_inputStream = CFBridgingRelease(readStream);
_inputStream.delegate = self;
_outputStream.delegate = self;
}
二. 建立连接
- 开启一个常驻子线程,子线程里创建runloop。
- 输入流和输出流注册到runloop里。runloop一直跑圈监听流事件
- 打开流对象
- (void)openConnection;
{
/*
1> 如果是wss,https: _outputStream进行SSL配置
2> _outputStream 和 _inputStream 进行networkServiceType配置
*/
[self _updateSecureStreamOptions];
if (!_scheduledRunloops.count) {
//SR_networkRunLoop会开启一个带runloop的常驻子线程,流对象注册到runloop
[self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
}
[_outputStream open];
[_inputStream open]; //open后会调用代理方法
}
- 流对象的回调事件:
- (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
case NSStreamEventOpenCompleted: { // open成功
SRFastLog(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= SR_CLOSING) {
return;
}
assert(_readBuffer);
// didConnect fires after certificate verification if we're using pinned certificates.
BOOL usingPinnedCerts = [[_urlRequest SR_SSLPinnedCertificates] count] > 0;
if ((!_secure || !usingPinnedCerts) && self.readyState == SR_CONNECTING && aStream == _inputStream) {
[self didConnect]; //发送握手协议
}
[self _pumpWriting];
[self _pumpScanner];
break;
}
- 流对象open成功后,发送握手数据包,这个数据包由一个HTTP升级请求构成,包含一系列必须的和可选的header字段
5.1 构建request ,设置各种请求头
- (void)didConnect;
{//流打开成功后的操作,开始发送http请求建立连接
SRFastLog(@"Connected");
//创建一个http消息对象
CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR("GET"), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);
// Set host first so it defaults
//设置head
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Host"), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@"%@:%@", _url.host, _url.port] : _url.host));
//Sec-WebSocket-Key的值必须是由一个随机生成的16字节的随机数通过base64编码得到的。每一个连接都必须随机的选择随机数。
NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];
SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);
if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
_secKey = [keyBytes base64EncodedStringWithOptions:0];
} else {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
_secKey = [keyBytes base64Encoding];
#pragma clang diagnostic pop
}
assert([_secKey length] == 24);
// Apply cookies if any have been provided
//同步cookies
NSDictionary * cookies = [NSHTTPCookie requestHeaderFieldsWithCookies:[self requestCookies]];
for (NSString * cookieKey in cookies) { //拿到cookie值
NSString * cookieValue = [cookies objectForKey:cookieKey];
if ([cookieKey length] && [cookieValue length]) {
CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)cookieKey, (__bridge CFStringRef)cookieValue);//设置到request的 head里
}
}
// set header for http basic auth
//设置http的基础auth,用户名密码认证
if (_url.user.length && _url.password.length) {
NSData *userAndPassword = [[NSString stringWithFormat:@"%@:%@", _url.user, _url.password] dataUsingEncoding:NSUTF8StringEncoding];
NSString *userAndPasswordBase64Encoded;
if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
userAndPasswordBase64Encoded = [userAndPassword base64EncodedStringWithOptions:0];
} else {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
userAndPasswordBase64Encoded = [userAndPassword base64Encoding];
#pragma clang diagnostic pop
}
//编码后用户名密码
_basicAuthorizationString = [NSString stringWithFormat:@"Basic %@", userAndPasswordBase64Encoded];
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Authorization"), (__bridge CFStringRef)_basicAuthorizationString); //设置head Authorization
}
//web socket规范header,各种请求头
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Upgrade"), CFSTR("websocket"));
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Connection"), CFSTR("Upgrade"));
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Version"), (__bridge CFStringRef)[NSString stringWithFormat:@"%ld", (long)_webSocketVersion]);
//设置Origin header
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Origin"), (__bridge CFStringRef)_url.SR_origin);
//用户初始化的协议数组,可以约束websocket的一些行为
if (_requestedProtocols) {
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Protocol"), (__bridge CFStringRef)[_requestedProtocols componentsJoinedByString:@", "]);
}
//同步_urlRequest的header
[_urlRequest.allHTTPHeaderFields enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)key, (__bridge CFStringRef)obj);
}];
//返回一个序列化 , CFBridgingRelease和 __bridge transfer一个意思, CFHTTPMessageCopySerializedMessage copy一份新的并且序列化,返回CFDataRef
NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));
CFRelease(request);
//用outputStream 发送http请求
[self _writeData:message];
//校验服务端请求
[self _readHTTPHeader];
}
5.2 把发送的数据拼接到_outputBuffer
[_outputBuffer appendData:data]; // 拼接到buffer上
5.3 输出流发送http请求
- (void)_pumpWriting;
{//有可能会调用多次
[self assertOnWorkQueue];
NSUInteger dataLength = _outputBuffer.length; //当前已写的数据大小
//(buffer的总长度 - 已经读到的位置),也就是剩余未读的字节数
// 有剩余字节数 && 接收者能被写入
if (dataLength - _outputBufferOffset > 0 && _outputStream.hasSpaceAvailable) {
//向服务端发送http请求
/*
1> - (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)len;
方法意思:读取len长度的内容到buffer中,返回值为读取的实际长度,所以最大为len
2> bytes方法指向接收者管理的一段连续的内存,是一个指针
_outputBuffer.bytes指向的是_outputBuffer的内存首地址。可以理解为数据的第0个位置
_outputBuffer.bytes + _outputBufferOffset ,从距离首地址_outputBufferOffset的位置处
write:_outputBuffer.bytes + _outputBufferOffset: 表示从距离首地址_outputBufferOffset的位置处开始写数据
maxLength: 限定长度是为了保证取到的数据正确。不至于取多。因为bytes指向的是一块内存区域
*/
//把_outputBuffer对象上数据写入到_outputStream对象上
NSInteger bytesWritten = [_outputStream write:_outputBuffer.bytes + _outputBufferOffset maxLength:dataLength - _outputBufferOffset];
if (bytesWritten == -1) { // -1 说明写入发生了错误
[self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2145 userInfo:[NSDictionary dictionaryWithObject:@"Error writing to stream" forKey:NSLocalizedDescriptionKey]]];
return;
}
_outputBufferOffset += bytesWritten; //当前写的位置
//超过4KB && 写的位置大于总数据长度的一半
// 重新生成buffer,重置偏移量
if (_outputBufferOffset > 4096 && _outputBufferOffset > (_outputBuffer.length >> 1)) {
_outputBuffer = [[NSMutableData alloc] initWithBytes:(char *)_outputBuffer.bytes + _outputBufferOffset length:_outputBuffer.length - _outputBufferOffset];
_outputBufferOffset = 0;
}
}
......
......
}
- 校验服务端请求
6.1 生产者-消费者设计模式
- (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
{
[self assertOnWorkQueue];
//生产: 添加待消费对象到消费池里
[_consumers addObject:[_consumerPool consumerWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
//消费:从数组里取出消费对象进行处理
[self _pumpScanner];
}
6.2 待消费对象: 主要管理了两个block,在被消费的时候进行回调处理
typedef size_t (^stream_scanner)(NSData *collected_data);
typedef void (^data_callback)(SRWebSocket *webSocket, NSData *data);
6.3 消费池对待消费对象的管理:
6.3.1 如果消费池里有,就复用。消费池里没有,就创建待消费对象
SRIOConsumer *consumer = nil;
if (_bufferedConsumers.count) {
consumer = [_bufferedConsumers lastObject];
[_bufferedConsumers removeLastObject];
} else {
consumer = [[SRIOConsumer alloc] init];
}
6.3.2 待消费者被消费后,根据条件对待消费者进行回收
- (void)returnConsumer:(SRIOConsumer *)consumer;
{
//_poolSize 初始化给定值为8
if (_bufferedConsumers.count < _poolSize) {
[_bufferedConsumers addObject:consumer];
}
}
6.4 输出流发送http请求后,随即创建待消费对象,传入一个处理服务端握手消息的block
- (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data_callback)dataHandler;
{
// 处理服务端握手消息的block做的事情:为了得到完整的http响应头部,不粘包。返回实际包的长度,方便上层截取。
stream_scanner consumer = ^size_t(NSData *data) {
__block size_t found_size = 0;
__block size_t match_count = 0;
size_t size = data.length;//得到数据长度
const unsigned char *buffer = data.bytes;//得到数据指针,字符数组
for (size_t i = 0; i < size; i++ ) {
//扫描buffer
if (((const unsigned char *)buffer)[i] == ((const unsigned char *)bytes)[match_count]) {
match_count += 1;//匹配数+1
//读到\r\n\r\n标识符为止,得到完整的http响应
if (match_count == length) { //length == 4
found_size = i + 1;//读取数据长度等于 i+ 1
break;
}
} else {
match_count = 0;
}
}
return found_size;//返回要读取数据的长度,没匹配成功就是0
};
//验证服务端http响应的时候,才传consumer
//生产待消费对象
[self _addConsumerWithScanner:consumer callback:dataHandler];
}
6.5 服务端响应
6.5.1 校验证书
// 如果是wss或https,而且_pinnedCertFound 为NO,而且事件类型是有可读数据未读,或者事件类型是还有空余空间可写
if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {
// 本地如果有证书
NSArray *sslCerts = [_urlRequest SR_SSLPinnedCertificates];
if (sslCerts) {
// 获取服务端的SSL证书
SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];
if (secTrust) {
NSInteger numCerts = SecTrustGetCertificateCount(secTrust);
for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {
SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);
NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));
for (id ref in sslCerts) {
SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;
NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));
if ([trustedCertData isEqualToData:certData]) {//证书校验
_pinnedCertFound = YES;
break;
}
}
}
}
//如果为NO,SSL认证失败,会断开连接,
if (!_pinnedCertFound) {
dispatch_async(_workQueue, ^{
NSDictionary *userInfo = @{ NSLocalizedDescriptionKey : @"Invalid server cert" };
[weakSelf _failWithError:[NSError errorWithDomain:@"org.lolrus.SocketRocket" code:23556 userInfo:userInfo]];
});
return;
} else if (aStream == _outputStream) {//如果流是输出流,则打开流成功
dispatch_async(_workQueue, ^{
[self didConnect];//流打开成功后的操作,开始发送http请求建立连接
});
}
}
}
6.5.2 拼接服务端返回的数据到_readBuffer上
case NSStreamEventHasBytesAvailable: {
SRFastLog(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];
while (_inputStream.hasBytesAvailable) { //从服务端返回的数据
//读取数据,一次读2048
NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];
if (bytes_read > 0) { //拼接数据
[_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:_inputStream.streamError];//读取错误
}
//如果读取的不等于最大的,说明读完了,跳出循环
if (bytes_read != bufferSize) {
break;
}
};
[self _pumpScanner]; //消费数据
break;
}
6.5.3 取出消费者进行消费
size_t curSize = _readBuffer.length - _readBufferOffset;//当前buffer剩余未读的数据大小
if (!curSize) {
return didWork;
}
SRIOConsumer *consumer = [_consumers objectAtIndex:0];//从消费池中取出第一个待消费者进行消费
size_t bytesNeeded = consumer.bytesNeeded;
size_t foundSize = 0;
if (consumer.consumer) {
NSData *tempView = [NSData dataWithBytesNoCopy:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset freeWhenDone:NO];
foundSize = consumer.consumer(tempView);//得到完整的握手响应数据大小
} else {
assert(consumer.bytesNeeded);
if (curSize >= bytesNeeded) {
foundSize = bytesNeeded;
} else if (consumer.readToCurrentFrame) {
foundSize = curSize;
}
}
NSData *slice = nil;
if (consumer.readToCurrentFrame || foundSize) {
NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize);
slice = [_readBuffer subdataWithRange:sliceRange];//截取数据
_readBufferOffset += foundSize;
//重置_readBufferOffset
if (_readBufferOffset > 4096 && _readBufferOffset > (_readBuffer.length >> 1)) {
_readBuffer = [[NSMutableData alloc] initWithBytes:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset];
_readBufferOffset = 0;
}
if (consumer.unmaskBytes) {
NSMutableData *mutableSlice = [slice mutableCopy];
NSUInteger len = mutableSlice.length;
uint8_t *bytes = mutableSlice.mutableBytes;
for (NSUInteger i = 0; i < len; i++) {
bytes[i] = bytes[i] ^ _currentReadMaskKey[_currentReadMaskOffset % sizeof(_currentReadMaskKey)];
_currentReadMaskOffset += 1;
}
slice = mutableSlice;
}
if (consumer.readToCurrentFrame) {
[_currentFrameData appendData:slice];
_readOpCount += 1;
if (_currentFrameOpcode == SROpCodeTextFrame) {
// Validate UTF8 stuff.
size_t currentDataSize = _currentFrameData.length;
if (_currentFrameOpcode == SROpCodeTextFrame && currentDataSize > 0) {
// TODO: Optimize the crap out of this. Don't really have to copy all the data each time
size_t scanSize = currentDataSize - _currentStringScanPosition;
NSData *scan_data = [_currentFrameData subdataWithRange:NSMakeRange(_currentStringScanPosition, scanSize)];
int32_t valid_utf8_size = validate_dispatch_data_partial_string(scan_data);
if (valid_utf8_size == -1) {
[self closeWithCode:SRStatusCodeInvalidUTF8 reason:@"Text frames must be valid UTF-8"];
dispatch_async(_workQueue, ^{
[self closeConnection];
});
return didWork;
} else {
_currentStringScanPosition += valid_utf8_size;
}
}
}
consumer.bytesNeeded -= foundSize;
if (consumer.bytesNeeded == 0) {
[_consumers removeObjectAtIndex:0];
consumer.handler(self, nil);
[_consumerPool returnConsumer:consumer];
didWork = YES;
}
} else if (foundSize) {
[_consumers removeObjectAtIndex:0];
consumer.handler(self, slice);//回调到上层
[_consumerPool returnConsumer:consumer];
didWork = YES;
}
6.5.4 校验服务端握手请求
NSInteger responseCode = CFHTTPMessageGetResponseStatusCode(_receivedHTTPHeaders);
if (responseCode >= 400) {
SRFastLog(@"Request failed with response code %d", responseCode);
[self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2132 userInfo:@{NSLocalizedDescriptionKey:[NSString stringWithFormat:@"received bad response code from server %ld", (long)responseCode], SRHTTPResponseErrorKey:@(responseCode)}]];
return;
}
/*
如果客户端收到的Sec-WebSocket-Acceptheader字段或者Sec-WebSocket-Acceptheader字段不等于通过Sec-WebSocket-Key字段的值(作为一个字符串,而不是base64解码后)和"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"串联起来,忽略所有前后空格进行base64 SHA-1编码的值,那么客户端必须关闭连接。
*/
if(![self _checkHandshake:_receivedHTTPHeaders]) {
[self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2133 userInfo:[NSDictionary dictionaryWithObject:[NSString stringWithFormat:@"Invalid Sec-WebSocket-Accept response"] forKey:NSLocalizedDescriptionKey]]];
return;
}
NSString *negotiatedProtocol = CFBridgingRelease(CFHTTPMessageCopyHeaderFieldValue(_receivedHTTPHeaders, CFSTR("Sec-WebSocket-Protocol")));
if (negotiatedProtocol) {
// Make sure we requested the protocol
if ([_requestedProtocols indexOfObject:negotiatedProtocol] == NSNotFound) {
[self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2133 userInfo:[NSDictionary dictionaryWithObject:[NSString stringWithFormat:@"Server specified Sec-WebSocket-Protocol that wasn't requested"] forKey:NSLocalizedDescriptionKey]]];
return;
}
_protocol = negotiatedProtocol;
}
7 建立连接
校验服务端握手请求成功后,服务端和客户端就可以相互发送数据了
//如果服务端的响应通过了上述的验证过程,那么WebSocket就已经建立连接了,并且WebSocket的连接状态也到了OPEN状态
self.readyState = SR_OPEN; // 建立连接
// 添加待消费者,处理从服务端传过来的数据帧
if (!_didFail) {
[self _readFrameNew];
}
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocketDidOpen:)]) {
[self.delegate webSocketDidOpen:self]; //告知代理websocket连接成功
};
}];
三. 数据帧格式
在WebSocket协议中,数据是通过一系列数据帧来进行传输的。
格式如下:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
FIN: 1 bit
表示这是消息的最后一个片段。第一个片段也有可能是最后一个片段。
RSV1,RSV2,RSV3: 每个1 bit
必须设置为0,除非扩展了非0值含义的扩展。如果收到了一个非0值但是没有扩展任何非0值的含义,接收终端必须断开WebSocket连接。
Opcode: 4 bit
定义“有效负载数据”的解释。如果收到一个未知的操作码,接收终端必须断开WebSocket连接。下面的值是被定义过的。
%x0 表示一个持续帧
%x1 表示一个文本帧
%x2 表示一个二进制帧
%x3-7 预留给以后的非控制帧
%x8 表示一个连接关闭包
%x9 表示一个ping包
%xA 表示一个pong包
%xB-F 预留给以后的控制帧
Mask: 1 bit
mask标志位,定义“有效负载数据”是否添加掩码。如果设置为1,那么掩码的键值存在于Masking-Key中,根据5.3节描述,这个一般用于解码“有效负载数据”。所有的从客户端发送到服务端的帧都需要设置这个bit位为1。
Payload length: 7 bits, 7+16 bits, or 7+64 bits
以字节为单位的“有效负载数据”长度,如果值为0-125,那么就表示负载数据的长度。如果是126,那么接下来的2个bytes解释为16bit的无符号整形作为负载数据的长度。如果是127,那么接下来的8个bytes解释为一个64bit的无符号整形(最高位的bit必须为0)作为负载数据的长度。多字节长度量以网络字节顺序表示(译注:应该是指大端序和小端序)。在所有的示例中,长度值必须使用最小字节数来进行编码,例如:长度为124字节的字符串不可用使用序列126,0,124进行编码。有效负载长度是指“扩展数据”+“应用数据”的长度。“扩展数据”的长度可能为0,那么有效负载长度就是“应用数据”的长度。
Masking-Key: 0 or 4 bytes
所有从客户端发往服务端的数据帧都已经与一个包含在这一帧中的32 bit的掩码进行过了运算。如果mask标志位(1 bit)为1,那么这个字段存在,如果标志位为0,那么这个字段不存在。在5.3节中会介绍更多关于客户端到服务端增加掩码的信息。
Payload data: (x+y) bytes
“有效负载数据”是指“扩展数据”和“应用数据”。
Extension data: x bytes
除非协商过扩展,否则“扩展数据”长度为0 bytes。在握手协议中,任何扩展都必须指定“扩展数据”的长度,这个长度如何进行计算,以及这个扩展如何使用。如果存在扩展,那么这个“扩展数据”包含在总的有效负载长度中。
Application data: y bytes
任意的“应用数据”,占用“扩展数据”后面的剩余所有字段。“应用数据”的长度等于有效负载长度减去“扩展应用”长度。
基础数据帧协议通过ABNF进行了正式的定义。需要重点知道的是,这些数据都是二进制的,而不是ASCII字符。例如,长度为1 bit的字段的值为%x0 / %x1代表的是一个值为0/1的单独的bit,而不是一整个字节(8 bit)来代表ASCII编码的字符“0”和“1”。一个长度为4 bit的范围是%x0-F的字段值代表的是4个bit,而不是字节(8 bit)对应的ASCII码的值。不要指定字符编码:“规则解析为一组最终的值,有时候是字符。在ABNF中,字符仅仅是一个非负的数字。在特定的上下文中,会根据特定的值的映射(编码)编码集(例如ASCII)”。在这里,指定的编码类型是将每个字段编码为特定的bits数组的二进制编码的最终数据。
四. 客户端向服务端发送数据
客户端发送的ping心跳包、字符串、二进制数据:
- 都会调用_sendFrameWithOpcode方法准备要发送的数据帧,包括设置数据帧的头信息,数据添加掩码
- 把数据帧写入_outputBuffer上
- _outputStream发送数据到服务端
typedef enum {
SROpCodeTextFrame = 0x1,//数据帧
SROpCodeBinaryFrame = 0x2,//数据帧
// 3-7 reserved.
SROpCodeConnectionClose = 0x8, //控制帧
SROpCodePing = 0x9,//控制帧
SROpCodePong = 0xA,//控制帧
// B-F reserved.
} SROpCode;
//数据帧的帧头,总共是2个字节
typedef struct {
BOOL fin; //等于1. 表示这是消息的最后一个片段。第一个片段也有可能是最后一个片段。
// BOOL rsv1;
// BOOL rsv2;
// BOOL rsv3;
uint8_t opcode;
BOOL masked; //从客户端发送到服务端的帧都需要设置这个bit位为1
uint64_t payload_length; //Payload length: 7 bits, 7+16 bits, or 7+64 bits
} frame_header;
//按WebSocket协议设置数据帧的前2个字节共计16位,拼接上加掩码的data
- (void)_sendFrameWithOpcode:(SROpCode)opcode data:(id)data; {
[self assertOnWorkQueue];
if (nil == data) {
return;
}
NSAssert([data isKindOfClass:[NSData class]] || [data isKindOfClass:[NSString class]], @"NSString or NSData");
//获取payload len
size_t payloadLength = [data isKindOfClass:[NSString class]] ? [(NSString *)data lengthOfBytesUsingEncoding:NSUTF8StringEncoding] : [data length];
NSMutableData *frame = [[NSMutableData alloc] initWithLength:payloadLength + SRFrameHeaderOverhead];
if (!frame) {
[self closeWithCode:SRStatusCodeMessageTooBig reason:@"Message too big"];
return;
}
uint8_t *frame_buffer = (uint8_t *)[frame mutableBytes];
//2个十六进制是一个字节,总共8位
//数据帧头部的第一个字节:包含了fin和opcode
frame_buffer[0] = SRFinMask | opcode; //第一个字节
BOOL useMask = YES;
#ifdef NOMASK
useMask = NO;
#endif
if (useMask) {
// set the mask and header
frame_buffer[1] |= SRMaskMask; //数据帧头部的mask值,客户端发送到服务端的必须置为1
}
size_t frame_buffer_size = 2; //数据帧头部总共为2个字节
const uint8_t *unmasked_payload = NULL;
if ([data isKindOfClass:[NSData class]]) {
unmasked_payload = (uint8_t *)[data bytes]; // 要发送的数据的长度
} else if ([data isKindOfClass:[NSString class]]) {
unmasked_payload = (const uint8_t *)[data UTF8String];
} else {
return;
}
//数据帧头部的Payload len设置
//多字节长度量以网络字节顺序表示(译注:应该是指大端序和小端序) EndianU16_BtoN 和 EndianU64_BtoN 处理大小端模式的
if (payloadLength < 126) { //有效负载数据长度:如果值为0-125字节,那么就表示负载数据的长度
frame_buffer[1] |= payloadLength;
} else if (payloadLength <= UINT16_MAX) {
frame_buffer[1] |= 126; //如果是126,那么接下来的2个bytes解释为16bit的无符号整形作为负载数据的长度。
*((uint16_t *)(frame_buffer + frame_buffer_size)) = EndianU16_BtoN((uint16_t)payloadLength);
frame_buffer_size += sizeof(uint16_t);
} else {
frame_buffer[1] |= 127;//如果是127,那么接下来的8个bytes解释为一个64bit的无符号整形(最高位的bit必须为0)作为负载数据的长度。
*((uint64_t *)(frame_buffer + frame_buffer_size)) = EndianU64_BtoN((uint64_t)payloadLength);
frame_buffer_size += sizeof(uint64_t);
}
if (!useMask) {
for (size_t i = 0; i < payloadLength; i++) {
frame_buffer[frame_buffer_size] = unmasked_payload[i];
frame_buffer_size += 1;
}
} else {// 所有从客户端发往服务端的数据帧都已经与一个包含在这一帧中的32 bit的掩码进行过了运算
//Masking-Key:掩码的key,占用4个字节
uint8_t *mask_key = frame_buffer + frame_buffer_size;
SecRandomCopyBytes(kSecRandomDefault, sizeof(uint32_t), (uint8_t *)mask_key);
frame_buffer_size += sizeof(uint32_t);
//发送的数据按位与mask-key进行异或运算
// TODO: could probably optimize this with SIMD
for (size_t i = 0; i < payloadLength; i++) {
frame_buffer[frame_buffer_size] = unmasked_payload[i] ^ mask_key[i % sizeof(uint32_t)];//异或运算
frame_buffer_size += 1;
}
}
assert(frame_buffer_size <= [frame length]);
frame.length = frame_buffer_size;
//写数据
[self _writeData:frame];
}
五. 服务端向客户端发送数据
- 在建立连接的时候, 调用了_readFrameNew方法.这个方法的作用就是添加了一个待消费者到消费池
//如果服务端的响应通过了上述的验证过程,那么WebSocket就已经建立连接了,并且WebSocket的连接状态也到了OPEN状态
self.readyState = SR_OPEN; // 建立连接
// 添加待消费者,处理从服务端传过来的数据帧
if (!_didFail) {
[self _readFrameNew];
}
- 服务端返回数据时,会调用流代理方法
- eventCode等于NSStreamEventHasBytesAvailable时,把数据拼接到_readBuffer上
- 调用方法[self _pumpScanner]从消费池取出待消费对象处理数据帧
5.先解析数据帧的前2个字节,也就是数据帧的头.
__block frame_header header = {0};
const uint8_t *headerBuffer = data.bytes;
assert(data.length >= 2);
//判断第一帧 FIN
if (headerBuffer[0] & SRRsvMask) {
[self _closeWithProtocolError:@"Server used RSV bits"];
return;
}
//得到Qpcode
uint8_t receivedOpcode = (SROpCodeMask & headerBuffer[0]);
//判断帧类型,是否是指定的控制帧
BOOL isControlFrame = (receivedOpcode == SROpCodePing || receivedOpcode == SROpCodePong || receivedOpcode == SROpCodeConnectionClose);
//如果不是指定帧,而且receivedOpcode不等于0,而且_currentFrameCount消息帧大于0,错误关闭
if (!isControlFrame && receivedOpcode != 0 && self->_currentFrameCount > 0) {
[self _closeWithProtocolError:@"all data frames after the initial data frame must have opcode 0"];
return;
}
// 没消息
if (receivedOpcode == 0 && self->_currentFrameCount == 0) {
[self _closeWithProtocolError:@"cannot continue a message"];
return;
}
//正常读取
//得到opcode
header.opcode = receivedOpcode == 0 ? self->_currentFrameOpcode : receivedOpcode;
//得到fin
header.fin = !!(SRFinMask & headerBuffer[0]);
//得到Mask
header.masked = !!(SRMaskMask & headerBuffer[1]);
//得到数据长度
header.payload_length = SRPayloadLenMask & headerBuffer[1];
headerBuffer = NULL;
//客户端不能接受掩码的数据
if (header.masked) {
[self _closeWithProtocolError:@"Client must receive unmasked data"];
}
size_t extra_bytes_needed = header.masked ? sizeof(_currentReadMaskKey) : 0;
//得到长度
/*
[Payload len]小于126,用[Payload len]来读后续的Payload数据;
[Payload len]大于等于126,用[Extended payload length]来读后续的Payload数据;
当Payload len小于126时,Payload len所代表的数值也就是数据长度.
当Payload len大于等于126时,我们就需要读Payload len后面的Extended payload length,Extended payload length所代表的数值也就是数据长度.
*/
if (header.payload_length == 126) {
extra_bytes_needed += sizeof(uint16_t);
} else if (header.payload_length == 127) {
extra_bytes_needed += sizeof(uint64_t);
}
6.根据数据帧头信息,获取了要截取的数据大小,添加待消费者到消费池, 然后消费者消费
[self _addConsumerWithDataLength:(size_t)frame_header.payload_length callback:^(SRWebSocket *self, NSData *newData) {
if (isControlFrame) { //如果是控制帧
[self _handleFrameWithData:newData opCode:frame_header.opcode];
} else {
if (frame_header.fin) {//消息帧的最后一个片段
[self _handleFrameWithData:self->_currentFrameData opCode:frame_header.opcode];
} else {//消息帧的非最后一个片段
// TODO add assert that opcode is not a control;
[self _readFrameContinue]; //继续生产待消费对象
}
}
} readToCurrentFrame:!isControlFrame unmaskBytes:frame_header.masked];
7.如果是上述消息帧的最后一个片段和控制帧,继续生产待消费对象。等待下次服务端给客户端发消息的时候用
BOOL isControlFrame = (opcode == SROpCodePing || opcode == SROpCodePong || opcode == SROpCodeConnectionClose);
if (!isControlFrame) {
[self _readFrameNew];
} else {
dispatch_async(_workQueue, ^{
[self _readFrameContinue];
});
}
- 根据opcode 进行不同的处理
8.1 如果是SROpCodeTextFrame或SROpCodeBinaryFrame,说明收到了服务端的message
[self _performDelegateBlock:^{
[self.delegate webSocket:self didReceiveMessage:message];
}];
8.2 如果是SROpCodePing,说明收到了服务端的心跳消息,客户端应该响应pong
// Need to pingpong this off _callbackQueue first to make sure messages happen in order
[self _performDelegateBlock:^{
dispatch_async(_workQueue, ^{
[self _sendFrameWithOpcode:SROpCodePong data:pingData];
});
}];
8.3 如果是SROpCodePong,说明客户端发送的心跳消息服务端响应了,WebSocket连接正常
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didReceivePong:)]) {
[self.delegate webSocket:self didReceivePong:pongData];
}
}];
8.4 如果是SROpCodeConnectionClose,说明客户端收到了服务端的关闭帧,客户端作关闭连接的处理
- (void)handleCloseWithData:(NSData *)data;
Tips: 纯属个人记录总结,方便日后个人查阅,如有错误,欢迎指出