SRWebSocket源码解析

WebSocket协议

中文翻译的WebSocket协议

SRWebSocket

一. 初始化

- (void)_SR_commonInit
{
    NSString *scheme = _url.scheme.lowercaseString;
    // 只支持ws wss http https 四种协议
    assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
    //wss https 是安全型的协议
    if([schemeisEqualToString:@"wss"] || [schemeisEqualToString:@"https"]) {
        _secure=YES;
    }

    _readyState = SR_CONNECTING;  //Socket默认状态
    _consumerStopped = YES;

    //客户端选择的版本13
    _webSocketVersion = 13;

    //初始化工作的队列,串行
    _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
    // Going to set a specific on the queue so we can validate we're on the work queue 队列设置一个标识
    dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);
    _delegateDispatchQueue = dispatch_get_main_queue();
    sr_dispatch_retain(_delegateDispatchQueue);

    _readBuffer = [[NSMutableData alloc] init]; //读buffer
    _outputBuffer = [[NSMutableData alloc] init]; // 写buffer
    _currentFrameData = [[NSMutableData alloc] init];//当前数据帧
    _consumers = [[NSMutableArray alloc] init];//消费者数组
    _consumerPool = [[SRIOConsumerPool alloc] init];//消费者池,管理消费者数组
    _scheduledRunloops = [[NSMutableSet alloc] init];//注册的runloop,放到集合里

    // 初始化流对象
    [self _initializeStreams];
}

// 初始化流对象
- (void)_initializeStreams;
{
    assert(_url.port.unsignedIntValue <= UINT32_MAX);
    uint32_t port = _url.port.unsignedIntValue;//取端口值
    
    //默认端口号:http 80 https 443;
    if (port == 0) {
        if (!_secure) {
            port = 80;
        } else {
            port = 443;
        }
    }

    /*这里为什么要用CFStream创建NSInputStream和NSOutputStream ?
     NSStream类不支持连接到远程主机,CFStream支持.两者可以转换。
    CFStreamCreatePairWithSocketToHost函数并传递主机名和端口号,来获 取一个CFReadStreamRef和一个CFWriteStreamRef来进行通信,然后我们可以将它们转换为NSInputStream和NSOutputStream对象来处理。*/
    NSString *host = _url.host;
    CFReadStreamRef readStream = NULL;
    CFWriteStreamRef writeStream = NULL;
    
    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);
    
    //把C语言的输入输出流转化成OC对象
    //CFBridgingRelease 作用:非OC指针指向OC
    _outputStream = CFBridgingRelease(writeStream);
    _inputStream = CFBridgingRelease(readStream);
    
    _inputStream.delegate = self;
    _outputStream.delegate = self;
}

二. 建立连接

  1. 开启一个常驻子线程,子线程里创建runloop。
  2. 输入流和输出流注册到runloop里。runloop一直跑圈监听流事件
  3. 打开流对象
- (void)openConnection;
{
    /*
     1> 如果是wss,https: _outputStream进行SSL配置
     2> _outputStream 和 _inputStream 进行networkServiceType配置
     */
    [self _updateSecureStreamOptions];
    
    if (!_scheduledRunloops.count) {
        //SR_networkRunLoop会开启一个带runloop的常驻子线程,流对象注册到runloop
        [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
    }
   
    [_outputStream open];
    [_inputStream open]; //open后会调用代理方法
}
  1. 流对象的回调事件:
  • (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
 case NSStreamEventOpenCompleted: { // open成功
                SRFastLog(@"NSStreamEventOpenCompleted %@", aStream);
                if (self.readyState >= SR_CLOSING) {
                    return;
                }
                assert(_readBuffer);
                
                // didConnect fires after certificate verification if we're using pinned certificates.
                BOOL usingPinnedCerts = [[_urlRequest SR_SSLPinnedCertificates] count] > 0;
                if ((!_secure || !usingPinnedCerts) && self.readyState == SR_CONNECTING && aStream == _inputStream) {
                    [self didConnect]; //发送握手协议
                }
                [self _pumpWriting];
                [self _pumpScanner];
                break;
            }
  1. 流对象open成功后,发送握手数据包,这个数据包由一个HTTP升级请求构成,包含一系列必须的和可选的header字段

5.1 构建request ,设置各种请求头

- (void)didConnect;
{//流打开成功后的操作,开始发送http请求建立连接
    SRFastLog(@"Connected");
    //创建一个http消息对象
    CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR("GET"), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);
    
    // Set host first so it defaults
    //设置head
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Host"), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@"%@:%@", _url.host, _url.port] : _url.host));
    //Sec-WebSocket-Key的值必须是由一个随机生成的16字节的随机数通过base64编码得到的。每一个连接都必须随机的选择随机数。
    NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];
    SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);
    
    if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
        _secKey = [keyBytes base64EncodedStringWithOptions:0];
    } else {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
        _secKey = [keyBytes base64Encoding];
#pragma clang diagnostic pop
    }
    
    assert([_secKey length] == 24);

    // Apply cookies if any have been provided
    //同步cookies
    NSDictionary * cookies = [NSHTTPCookie requestHeaderFieldsWithCookies:[self requestCookies]];
    for (NSString * cookieKey in cookies) { //拿到cookie值
        NSString * cookieValue = [cookies objectForKey:cookieKey];
        if ([cookieKey length] && [cookieValue length]) {
            CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)cookieKey, (__bridge CFStringRef)cookieValue);//设置到request的 head里
        }
    }
 
    // set header for http basic auth
     //设置http的基础auth,用户名密码认证
    if (_url.user.length && _url.password.length) {
        NSData *userAndPassword = [[NSString stringWithFormat:@"%@:%@", _url.user, _url.password] dataUsingEncoding:NSUTF8StringEncoding];
        NSString *userAndPasswordBase64Encoded;
        if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
            userAndPasswordBase64Encoded = [userAndPassword base64EncodedStringWithOptions:0];
        } else {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
            userAndPasswordBase64Encoded = [userAndPassword base64Encoding];
#pragma clang diagnostic pop
        }
        //编码后用户名密码
        _basicAuthorizationString = [NSString stringWithFormat:@"Basic %@", userAndPasswordBase64Encoded];
        CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Authorization"), (__bridge CFStringRef)_basicAuthorizationString); //设置head Authorization
    }
    
    //web socket规范header,各种请求头
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Upgrade"), CFSTR("websocket"));
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Connection"), CFSTR("Upgrade"));
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Version"), (__bridge CFStringRef)[NSString stringWithFormat:@"%ld", (long)_webSocketVersion]);
    
    //设置Origin header
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Origin"), (__bridge CFStringRef)_url.SR_origin);
    
    //用户初始化的协议数组,可以约束websocket的一些行为
    if (_requestedProtocols) {
        CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Protocol"), (__bridge CFStringRef)[_requestedProtocols componentsJoinedByString:@", "]);
    }

    //同步_urlRequest的header
    [_urlRequest.allHTTPHeaderFields enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
        CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)key, (__bridge CFStringRef)obj);
    }];
  
    //返回一个序列化 , CFBridgingRelease和 __bridge transfer一个意思, CFHTTPMessageCopySerializedMessage copy一份新的并且序列化,返回CFDataRef
    NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));
    
    CFRelease(request);

     //用outputStream 发送http请求
    [self _writeData:message];
    
     //校验服务端请求
    [self _readHTTPHeader];
}

5.2 把发送的数据拼接到_outputBuffer

 [_outputBuffer appendData:data]; // 拼接到buffer上

5.3 输出流发送http请求

- (void)_pumpWriting;
{//有可能会调用多次
    [self assertOnWorkQueue];
    
    NSUInteger dataLength = _outputBuffer.length; //当前已写的数据大小
    //(buffer的总长度 - 已经读到的位置),也就是剩余未读的字节数
    // 有剩余字节数 && 接收者能被写入
    if (dataLength - _outputBufferOffset > 0 && _outputStream.hasSpaceAvailable) {
        
        //向服务端发送http请求
        /*
         1> - (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)len;
         方法意思:读取len长度的内容到buffer中,返回值为读取的实际长度,所以最大为len
         2> bytes方法指向接收者管理的一段连续的内存,是一个指针
         
         _outputBuffer.bytes指向的是_outputBuffer的内存首地址。可以理解为数据的第0个位置
         _outputBuffer.bytes + _outputBufferOffset ,从距离首地址_outputBufferOffset的位置处
         write:_outputBuffer.bytes + _outputBufferOffset: 表示从距离首地址_outputBufferOffset的位置处开始写数据
         maxLength: 限定长度是为了保证取到的数据正确。不至于取多。因为bytes指向的是一块内存区域
         */
        
        //把_outputBuffer对象上数据写入到_outputStream对象上
        NSInteger bytesWritten = [_outputStream write:_outputBuffer.bytes + _outputBufferOffset maxLength:dataLength - _outputBufferOffset];
        if (bytesWritten == -1) { // -1 说明写入发生了错误
            [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2145 userInfo:[NSDictionary dictionaryWithObject:@"Error writing to stream" forKey:NSLocalizedDescriptionKey]]];
             return;
        }
        
        _outputBufferOffset += bytesWritten; //当前写的位置
        
        //超过4KB && 写的位置大于总数据长度的一半
        // 重新生成buffer,重置偏移量
        if (_outputBufferOffset > 4096 && _outputBufferOffset > (_outputBuffer.length >> 1)) {
            _outputBuffer = [[NSMutableData alloc] initWithBytes:(char *)_outputBuffer.bytes + _outputBufferOffset length:_outputBuffer.length - _outputBufferOffset];
            _outputBufferOffset = 0;
        }
    }
    ......
    ......
}
  1. 校验服务端请求
    6.1 生产者-消费者设计模式
- (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
{    
    [self assertOnWorkQueue];
    //生产: 添加待消费对象到消费池里
    [_consumers addObject:[_consumerPool consumerWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
    //消费:从数组里取出消费对象进行处理
    [self _pumpScanner];
}

6.2 待消费对象: 主要管理了两个block,在被消费的时候进行回调处理

typedef size_t (^stream_scanner)(NSData *collected_data);

typedef void (^data_callback)(SRWebSocket *webSocket,  NSData *data);

6.3 消费池对待消费对象的管理:
6.3.1 如果消费池里有,就复用。消费池里没有,就创建待消费对象

SRIOConsumer *consumer = nil;
    if (_bufferedConsumers.count) {
        consumer = [_bufferedConsumers lastObject];
        [_bufferedConsumers removeLastObject];
    } else {
        consumer = [[SRIOConsumer alloc] init];
    }

6.3.2 待消费者被消费后,根据条件对待消费者进行回收

- (void)returnConsumer:(SRIOConsumer *)consumer;
{
   //_poolSize 初始化给定值为8
    if (_bufferedConsumers.count < _poolSize) { 
        [_bufferedConsumers addObject:consumer];
    }
}

6.4 输出流发送http请求后,随即创建待消费对象,传入一个处理服务端握手消息的block

- (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data_callback)dataHandler;
{
     // 处理服务端握手消息的block做的事情:为了得到完整的http响应头部,不粘包。返回实际包的长度,方便上层截取。
       stream_scanner consumer = ^size_t(NSData *data) {
        __block size_t found_size = 0;
        __block size_t match_count = 0;
        
        size_t size = data.length;//得到数据长度
        const unsigned char *buffer = data.bytes;//得到数据指针,字符数组
        for (size_t i = 0; i < size; i++ ) {
            //扫描buffer
            if (((const unsigned char *)buffer)[i] == ((const unsigned char *)bytes)[match_count]) {
                match_count += 1;//匹配数+1
                 //读到\r\n\r\n标识符为止,得到完整的http响应
                if (match_count == length) { //length == 4
                    found_size = i + 1;//读取数据长度等于 i+ 1
                    break;
                }
            } else {
                match_count = 0;
            }
        }
        return found_size;//返回要读取数据的长度,没匹配成功就是0
    };
    
    //验证服务端http响应的时候,才传consumer
   //生产待消费对象
    [self _addConsumerWithScanner:consumer callback:dataHandler];
}

6.5 服务端响应
6.5.1 校验证书

   //  如果是wss或https,而且_pinnedCertFound 为NO,而且事件类型是有可读数据未读,或者事件类型是还有空余空间可写
    if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {
        
        // 本地如果有证书
        NSArray *sslCerts = [_urlRequest SR_SSLPinnedCertificates];
        if (sslCerts) {
            
            // 获取服务端的SSL证书
            SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];
            if (secTrust) {
                NSInteger numCerts = SecTrustGetCertificateCount(secTrust);
                for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {
                    SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);
                    NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));
                    
                    for (id ref in sslCerts) {
                        SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;
                        NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));
                        
                        if ([trustedCertData isEqualToData:certData]) {//证书校验
                            _pinnedCertFound = YES;
                            break;
                        }
                    }
                }
            }
          //如果为NO,SSL认证失败,会断开连接,
            if (!_pinnedCertFound) {
                dispatch_async(_workQueue, ^{
                    NSDictionary *userInfo = @{ NSLocalizedDescriptionKey : @"Invalid server cert" };
                    [weakSelf _failWithError:[NSError errorWithDomain:@"org.lolrus.SocketRocket" code:23556 userInfo:userInfo]];
                });
                return;
            } else if (aStream == _outputStream) {//如果流是输出流,则打开流成功
                dispatch_async(_workQueue, ^{
                    [self didConnect];//流打开成功后的操作,开始发送http请求建立连接
                });
            }
        }
    }

6.5.2 拼接服务端返回的数据到_readBuffer上

  case NSStreamEventHasBytesAvailable: {
                SRFastLog(@"NSStreamEventHasBytesAvailable %@", aStream);
                const int bufferSize = 2048;
                uint8_t buffer[bufferSize];
                
                while (_inputStream.hasBytesAvailable) { //从服务端返回的数据
                     //读取数据,一次读2048
                    NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];
                    
                    if (bytes_read > 0) { //拼接数据
                        [_readBuffer appendBytes:buffer length:bytes_read];
                    } else if (bytes_read < 0) {
                        [self _failWithError:_inputStream.streamError];//读取错误
                    }
                     //如果读取的不等于最大的,说明读完了,跳出循环
                    if (bytes_read != bufferSize) {
                        break;
                    }
                };
                [self _pumpScanner]; //消费数据
                break;
            }

6.5.3 取出消费者进行消费

size_t curSize = _readBuffer.length - _readBufferOffset;//当前buffer剩余未读的数据大小
    if (!curSize) {
        return didWork;
    }
    
    SRIOConsumer *consumer = [_consumers objectAtIndex:0];//从消费池中取出第一个待消费者进行消费
    
    size_t bytesNeeded = consumer.bytesNeeded;
    
    size_t foundSize = 0;
    if (consumer.consumer) {
        NSData *tempView = [NSData dataWithBytesNoCopy:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset freeWhenDone:NO];  
        foundSize = consumer.consumer(tempView);//得到完整的握手响应数据大小
    } else {
        assert(consumer.bytesNeeded);
        if (curSize >= bytesNeeded) {
            foundSize = bytesNeeded;
        } else if (consumer.readToCurrentFrame) {
            foundSize = curSize;
        }
    }
    
    NSData *slice = nil;
    if (consumer.readToCurrentFrame || foundSize) {
        NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize);
        slice = [_readBuffer subdataWithRange:sliceRange];//截取数据
        
        _readBufferOffset += foundSize;
        
        //重置_readBufferOffset
        if (_readBufferOffset > 4096 && _readBufferOffset > (_readBuffer.length >> 1)) {
            _readBuffer = [[NSMutableData alloc] initWithBytes:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset];
            _readBufferOffset = 0;
        }
        
        if (consumer.unmaskBytes) {
            NSMutableData *mutableSlice = [slice mutableCopy];
            
            NSUInteger len = mutableSlice.length;
            uint8_t *bytes = mutableSlice.mutableBytes;
            
            for (NSUInteger i = 0; i < len; i++) {
                bytes[i] = bytes[i] ^ _currentReadMaskKey[_currentReadMaskOffset % sizeof(_currentReadMaskKey)];
                _currentReadMaskOffset += 1;
            }
            
            slice = mutableSlice;
        }
        
        if (consumer.readToCurrentFrame) {
            [_currentFrameData appendData:slice];
            
            _readOpCount += 1;
            
            if (_currentFrameOpcode == SROpCodeTextFrame) {
                // Validate UTF8 stuff.
                size_t currentDataSize = _currentFrameData.length;
                if (_currentFrameOpcode == SROpCodeTextFrame && currentDataSize > 0) {
                    // TODO: Optimize the crap out of this.  Don't really have to copy all the data each time
                    
                    size_t scanSize = currentDataSize - _currentStringScanPosition;
                    
                    NSData *scan_data = [_currentFrameData subdataWithRange:NSMakeRange(_currentStringScanPosition, scanSize)];
                    int32_t valid_utf8_size = validate_dispatch_data_partial_string(scan_data);
                    
                    if (valid_utf8_size == -1) {
                        [self closeWithCode:SRStatusCodeInvalidUTF8 reason:@"Text frames must be valid UTF-8"];
                        dispatch_async(_workQueue, ^{
                            [self closeConnection];
                        });
                        return didWork;
                    } else {
                        _currentStringScanPosition += valid_utf8_size;
                    }
                } 
                
            }
            
            consumer.bytesNeeded -= foundSize;
            
            if (consumer.bytesNeeded == 0) {
                [_consumers removeObjectAtIndex:0];
                consumer.handler(self, nil);
                [_consumerPool returnConsumer:consumer];
                didWork = YES;
            }
        } else if (foundSize) {
            [_consumers removeObjectAtIndex:0];
            consumer.handler(self, slice);//回调到上层
            [_consumerPool returnConsumer:consumer];
            didWork = YES;
        }

6.5.4 校验服务端握手请求

    NSInteger responseCode = CFHTTPMessageGetResponseStatusCode(_receivedHTTPHeaders);
    
    if (responseCode >= 400) {
        SRFastLog(@"Request failed with response code %d", responseCode);
        [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2132 userInfo:@{NSLocalizedDescriptionKey:[NSString stringWithFormat:@"received bad response code from server %ld", (long)responseCode], SRHTTPResponseErrorKey:@(responseCode)}]];
        return;
    }

    /*
     如果客户端收到的Sec-WebSocket-Acceptheader字段或者Sec-WebSocket-Acceptheader字段不等于通过Sec-WebSocket-Key字段的值(作为一个字符串,而不是base64解码后)和"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"串联起来,忽略所有前后空格进行base64 SHA-1编码的值,那么客户端必须关闭连接。
     */
    if(![self _checkHandshake:_receivedHTTPHeaders]) {
        [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2133 userInfo:[NSDictionary dictionaryWithObject:[NSString stringWithFormat:@"Invalid Sec-WebSocket-Accept response"] forKey:NSLocalizedDescriptionKey]]];
        return;
    }
    
    NSString *negotiatedProtocol = CFBridgingRelease(CFHTTPMessageCopyHeaderFieldValue(_receivedHTTPHeaders, CFSTR("Sec-WebSocket-Protocol")));
    if (negotiatedProtocol) {
        // Make sure we requested the protocol
        if ([_requestedProtocols indexOfObject:negotiatedProtocol] == NSNotFound) {
            [self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2133 userInfo:[NSDictionary dictionaryWithObject:[NSString stringWithFormat:@"Server specified Sec-WebSocket-Protocol that wasn't requested"] forKey:NSLocalizedDescriptionKey]]];
            return;
        }
        
        _protocol = negotiatedProtocol;
    }

7 建立连接
校验服务端握手请求成功后,服务端和客户端就可以相互发送数据了

 //如果服务端的响应通过了上述的验证过程,那么WebSocket就已经建立连接了,并且WebSocket的连接状态也到了OPEN状态
    self.readyState = SR_OPEN; // 建立连接
    
    // 添加待消费者,处理从服务端传过来的数据帧
    if (!_didFail) {
        [self _readFrameNew];
    }

    [self _performDelegateBlock:^{
        if ([self.delegate respondsToSelector:@selector(webSocketDidOpen:)]) {
            [self.delegate webSocketDidOpen:self]; //告知代理websocket连接成功
        };
    }];

三. 数据帧格式

在WebSocket协议中,数据是通过一系列数据帧来进行传输的。
格式如下:

      0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-------+-+-------------+-------------------------------+
     |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
     |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
     |N|V|V|V|       |S|             |   (if payload len==126/127)   |
     | |1|2|3|       |K|             |                               |
     +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
     |     Extended payload length continued, if payload len == 127  |
     + - - - - - - - - - - - - - - - +-------------------------------+
     |                               |Masking-key, if MASK set to 1  |
     +-------------------------------+-------------------------------+
     | Masking-key (continued)       |          Payload Data         |
     +-------------------------------- - - - - - - - - - - - - - - - +
     :                     Payload Data continued ...                :
     + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
     |                     Payload Data continued ...                |
     +---------------------------------------------------------------+

FIN: 1 bit

​ 表示这是消息的最后一个片段。第一个片段也有可能是最后一个片段。

RSV1,RSV2,RSV3: 每个1 bit

​ 必须设置为0,除非扩展了非0值含义的扩展。如果收到了一个非0值但是没有扩展任何非0值的含义,接收终端必须断开WebSocket连接。

Opcode: 4 bit

​ 定义“有效负载数据”的解释。如果收到一个未知的操作码,接收终端必须断开WebSocket连接。下面的值是被定义过的。

​ %x0 表示一个持续帧

​ %x1 表示一个文本帧

​ %x2 表示一个二进制帧

​ %x3-7 预留给以后的非控制帧

​ %x8 表示一个连接关闭包

​ %x9 表示一个ping包

​ %xA 表示一个pong包

​ %xB-F 预留给以后的控制帧

Mask: 1 bit

​ mask标志位,定义“有效负载数据”是否添加掩码。如果设置为1,那么掩码的键值存在于Masking-Key中,根据5.3节描述,这个一般用于解码“有效负载数据”。所有的从客户端发送到服务端的帧都需要设置这个bit位为1。

Payload length: 7 bits, 7+16 bits, or 7+64 bits

​ 以字节为单位的“有效负载数据”长度,如果值为0-125,那么就表示负载数据的长度。如果是126,那么接下来的2个bytes解释为16bit的无符号整形作为负载数据的长度。如果是127,那么接下来的8个bytes解释为一个64bit的无符号整形(最高位的bit必须为0)作为负载数据的长度。多字节长度量以网络字节顺序表示(译注:应该是指大端序和小端序)。在所有的示例中,长度值必须使用最小字节数来进行编码,例如:长度为124字节的字符串不可用使用序列126,0,124进行编码。有效负载长度是指“扩展数据”+“应用数据”的长度。“扩展数据”的长度可能为0,那么有效负载长度就是“应用数据”的长度。

Masking-Key: 0 or 4 bytes

​ 所有从客户端发往服务端的数据帧都已经与一个包含在这一帧中的32 bit的掩码进行过了运算。如果mask标志位(1 bit)为1,那么这个字段存在,如果标志位为0,那么这个字段不存在。在5.3节中会介绍更多关于客户端到服务端增加掩码的信息。

Payload data: (x+y) bytes

​ “有效负载数据”是指“扩展数据”和“应用数据”。

Extension data: x bytes

​ 除非协商过扩展,否则“扩展数据”长度为0 bytes。在握手协议中,任何扩展都必须指定“扩展数据”的长度,这个长度如何进行计算,以及这个扩展如何使用。如果存在扩展,那么这个“扩展数据”包含在总的有效负载长度中。

Application data: y bytes

​ 任意的“应用数据”,占用“扩展数据”后面的剩余所有字段。“应用数据”的长度等于有效负载长度减去“扩展应用”长度。

基础数据帧协议通过ABNF进行了正式的定义。需要重点知道的是,这些数据都是二进制的,而不是ASCII字符。例如,长度为1 bit的字段的值为%x0 / %x1代表的是一个值为0/1的单独的bit,而不是一整个字节(8 bit)来代表ASCII编码的字符“0”和“1”。一个长度为4 bit的范围是%x0-F的字段值代表的是4个bit,而不是字节(8 bit)对应的ASCII码的值。不要指定字符编码:“规则解析为一组最终的值,有时候是字符。在ABNF中,字符仅仅是一个非负的数字。在特定的上下文中,会根据特定的值的映射(编码)编码集(例如ASCII)”。在这里,指定的编码类型是将每个字段编码为特定的bits数组的二进制编码的最终数据。

四. 客户端向服务端发送数据

客户端发送的ping心跳包、字符串、二进制数据:

  1. 都会调用_sendFrameWithOpcode方法准备要发送的数据帧,包括设置数据帧的头信息,数据添加掩码
  2. 把数据帧写入_outputBuffer上
  3. _outputStream发送数据到服务端
typedef enum  {
    SROpCodeTextFrame = 0x1,//数据帧
    SROpCodeBinaryFrame = 0x2,//数据帧
    // 3-7 reserved.
    SROpCodeConnectionClose = 0x8, //控制帧
    SROpCodePing = 0x9,//控制帧
    SROpCodePong = 0xA,//控制帧
    // B-F reserved.
} SROpCode;

//数据帧的帧头,总共是2个字节
typedef struct {
    BOOL fin; //等于1. 表示这是消息的最后一个片段。第一个片段也有可能是最后一个片段。
//  BOOL rsv1;
//  BOOL rsv2;
//  BOOL rsv3;
    uint8_t opcode;
    BOOL masked; //从客户端发送到服务端的帧都需要设置这个bit位为1
    uint64_t payload_length; //Payload length: 7 bits, 7+16 bits, or 7+64 bits
} frame_header;

//按WebSocket协议设置数据帧的前2个字节共计16位,拼接上加掩码的data
- (void)_sendFrameWithOpcode:(SROpCode)opcode data:(id)data; {
    [self assertOnWorkQueue];
    
    if (nil == data) {
        return;
    }
    
    NSAssert([data isKindOfClass:[NSData class]] || [data isKindOfClass:[NSString class]], @"NSString or NSData");
    //获取payload len
    size_t payloadLength = [data isKindOfClass:[NSString class]] ? [(NSString *)data lengthOfBytesUsingEncoding:NSUTF8StringEncoding] : [data length];
        
    NSMutableData *frame = [[NSMutableData alloc] initWithLength:payloadLength + SRFrameHeaderOverhead];
    if (!frame) {
        [self closeWithCode:SRStatusCodeMessageTooBig reason:@"Message too big"];
        return;
    }
    uint8_t *frame_buffer = (uint8_t *)[frame mutableBytes];
    
    //2个十六进制是一个字节,总共8位
    //数据帧头部的第一个字节:包含了fin和opcode
    frame_buffer[0] = SRFinMask | opcode; //第一个字节
    
    BOOL useMask = YES;
#ifdef NOMASK
    useMask = NO;
#endif
    
    if (useMask) {
    // set the mask and header
        frame_buffer[1] |= SRMaskMask; //数据帧头部的mask值,客户端发送到服务端的必须置为1
    }
    
    size_t frame_buffer_size = 2; //数据帧头部总共为2个字节
    
    const uint8_t *unmasked_payload = NULL;
    if ([data isKindOfClass:[NSData class]]) {
        unmasked_payload = (uint8_t *)[data bytes]; // 要发送的数据的长度
    } else if ([data isKindOfClass:[NSString class]]) {
        unmasked_payload =  (const uint8_t *)[data UTF8String];
    } else {
        return;
    }
    //数据帧头部的Payload len设置
    //多字节长度量以网络字节顺序表示(译注:应该是指大端序和小端序) EndianU16_BtoN 和 EndianU64_BtoN 处理大小端模式的
    if (payloadLength < 126) { //有效负载数据长度:如果值为0-125字节,那么就表示负载数据的长度
        frame_buffer[1] |= payloadLength;
    } else if (payloadLength <= UINT16_MAX) {
        frame_buffer[1] |= 126; //如果是126,那么接下来的2个bytes解释为16bit的无符号整形作为负载数据的长度。
        *((uint16_t *)(frame_buffer + frame_buffer_size)) = EndianU16_BtoN((uint16_t)payloadLength);
        frame_buffer_size += sizeof(uint16_t);
    } else {
        frame_buffer[1] |= 127;//如果是127,那么接下来的8个bytes解释为一个64bit的无符号整形(最高位的bit必须为0)作为负载数据的长度。
        *((uint64_t *)(frame_buffer + frame_buffer_size)) = EndianU64_BtoN((uint64_t)payloadLength);
        frame_buffer_size += sizeof(uint64_t);
    }
        
    if (!useMask) {
        for (size_t i = 0; i < payloadLength; i++) {
            frame_buffer[frame_buffer_size] = unmasked_payload[i];
            frame_buffer_size += 1;
        }
    } else {//​ 所有从客户端发往服务端的数据帧都已经与一个包含在这一帧中的32 bit的掩码进行过了运算
        //Masking-Key:掩码的key,占用4个字节
        uint8_t *mask_key = frame_buffer + frame_buffer_size;
        SecRandomCopyBytes(kSecRandomDefault, sizeof(uint32_t), (uint8_t *)mask_key);
        frame_buffer_size += sizeof(uint32_t);
        
        //发送的数据按位与mask-key进行异或运算
        // TODO: could probably optimize this with SIMD
        for (size_t i = 0; i < payloadLength; i++) {
            frame_buffer[frame_buffer_size] = unmasked_payload[i] ^ mask_key[i % sizeof(uint32_t)];//异或运算
            frame_buffer_size += 1;
        }
    }
    assert(frame_buffer_size <= [frame length]);
    frame.length = frame_buffer_size;
     //写数据
    [self _writeData:frame]; 
}

五. 服务端向客户端发送数据

  1. 在建立连接的时候, 调用了_readFrameNew方法.这个方法的作用就是添加了一个待消费者到消费池
 //如果服务端的响应通过了上述的验证过程,那么WebSocket就已经建立连接了,并且WebSocket的连接状态也到了OPEN状态
    self.readyState = SR_OPEN; // 建立连接
    
     // 添加待消费者,处理从服务端传过来的数据帧
     if (!_didFail) {
        [self _readFrameNew];
    }
  1. 服务端返回数据时,会调用流代理方法
  2. eventCode等于NSStreamEventHasBytesAvailable时,把数据拼接到_readBuffer上
  3. 调用方法[self _pumpScanner]从消费池取出待消费对象处理数据帧
    5.先解析数据帧的前2个字节,也就是数据帧的头.
        __block frame_header header = {0};
        
        const uint8_t *headerBuffer = data.bytes;
        assert(data.length >= 2);
        
         //判断第一帧 FIN
        if (headerBuffer[0] & SRRsvMask) {
            [self _closeWithProtocolError:@"Server used RSV bits"];
            return;
        }
        
        //得到Qpcode
        uint8_t receivedOpcode = (SROpCodeMask & headerBuffer[0]);
        
        //判断帧类型,是否是指定的控制帧
        BOOL isControlFrame = (receivedOpcode == SROpCodePing || receivedOpcode == SROpCodePong || receivedOpcode == SROpCodeConnectionClose);
        
        //如果不是指定帧,而且receivedOpcode不等于0,而且_currentFrameCount消息帧大于0,错误关闭
        if (!isControlFrame && receivedOpcode != 0 && self->_currentFrameCount > 0) {
            [self _closeWithProtocolError:@"all data frames after the initial data frame must have opcode 0"];
            return;
        }
        
         // 没消息
        if (receivedOpcode == 0 && self->_currentFrameCount == 0) {
            [self _closeWithProtocolError:@"cannot continue a message"];
            return;
        }
        //正常读取
        //得到opcode
        header.opcode = receivedOpcode == 0 ? self->_currentFrameOpcode : receivedOpcode;
        
         //得到fin
        header.fin = !!(SRFinMask & headerBuffer[0]);
        
        //得到Mask
        header.masked = !!(SRMaskMask & headerBuffer[1]);
          //得到数据长度
        header.payload_length = SRPayloadLenMask & headerBuffer[1];
        
        headerBuffer = NULL;
        
        //客户端不能接受掩码的数据
        if (header.masked) {
            [self _closeWithProtocolError:@"Client must receive unmasked data"];
        }
        
        size_t extra_bytes_needed = header.masked ? sizeof(_currentReadMaskKey) : 0;
        //得到长度
        /*
         [Payload len]小于126,用[Payload len]来读后续的Payload数据;
         [Payload len]大于等于126,用[Extended payload length]来读后续的Payload数据;
         当Payload len小于126时,Payload len所代表的数值也就是数据长度.
         当Payload len大于等于126时,我们就需要读Payload len后面的Extended payload length,Extended payload length所代表的数值也就是数据长度.
         */
        if (header.payload_length == 126) {
            extra_bytes_needed += sizeof(uint16_t);
        } else if (header.payload_length == 127) {
            extra_bytes_needed += sizeof(uint64_t);
        }

6.根据数据帧头信息,获取了要截取的数据大小,添加待消费者到消费池, 然后消费者消费

[self _addConsumerWithDataLength:(size_t)frame_header.payload_length callback:^(SRWebSocket *self, NSData *newData) {
            if (isControlFrame) { //如果是控制帧
                [self _handleFrameWithData:newData opCode:frame_header.opcode];
            } else {
                if (frame_header.fin) {//消息帧的最后一个片段
                    [self _handleFrameWithData:self->_currentFrameData opCode:frame_header.opcode];
                } else {//消息帧的非最后一个片段
                    // TODO add assert that opcode is not a control;
                    [self _readFrameContinue]; //继续生产待消费对象
                }
                
            }
        } readToCurrentFrame:!isControlFrame unmaskBytes:frame_header.masked];

7.如果是上述消息帧的最后一个片段和控制帧,继续生产待消费对象。等待下次服务端给客户端发消息的时候用

BOOL isControlFrame = (opcode == SROpCodePing || opcode == SROpCodePong || opcode == SROpCodeConnectionClose);
    if (!isControlFrame) {
        [self _readFrameNew];
    } else {
        dispatch_async(_workQueue, ^{
            [self _readFrameContinue];
        });
    }
  1. 根据opcode 进行不同的处理
    8.1 如果是SROpCodeTextFrame或SROpCodeBinaryFrame,说明收到了服务端的message
[self _performDelegateBlock:^{
        [self.delegate webSocket:self didReceiveMessage:message];
    }];

8.2 如果是SROpCodePing,说明收到了服务端的心跳消息,客户端应该响应pong

 // Need to pingpong this off _callbackQueue first to make sure messages happen in order
    [self _performDelegateBlock:^{
        dispatch_async(_workQueue, ^{
            [self _sendFrameWithOpcode:SROpCodePong data:pingData];
        });
    }];

8.3 如果是SROpCodePong,说明客户端发送的心跳消息服务端响应了,WebSocket连接正常

[self _performDelegateBlock:^{
        if ([self.delegate respondsToSelector:@selector(webSocket:didReceivePong:)]) {
            [self.delegate webSocket:self didReceivePong:pongData];
        }
    }];

8.4 如果是SROpCodeConnectionClose,说明客户端收到了服务端的关闭帧,客户端作关闭连接的处理

- (void)handleCloseWithData:(NSData *)data;

Tips: 纯属个人记录总结,方便日后个人查阅,如有错误,欢迎指出

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。