WebSocket介绍
WebSocket protocol 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信(full-duplex)。一开始的握手需要借助HTTP请求完成。
WebSocket和HTTP、FTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP/IP之上的。
连接的过程是:
首先,客户端发起http请求,http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等;
然后,服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据;
最后,客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。
MPWebsocket
mixpanel的websocket代码和facebook的socketRocket类似,而SRWebsocket代码也是开源的。。
它主要的功能api有:
- (void)webSocket:(MPWebSocket *)webSocket didReceiveMessage:(id)message;
- (void)webSocketDidOpen:(MPWebSocket *)webSocket;
- (void)webSocket:(MPWebSocket *)webSocket didFailWithError:(NSError *)error;
- (void)webSocket:(MPWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean;
使用起来比较简单,我们用这几个代理方法就能实现我们大部分的功能了。
初始化流程
包括对schem进行断言,只支持ws/wss/http/https四种。
当前socket状态,是正在连接,还是已连接、断开等等。
初始化工作队列,以及流回调线程等等。
初始化读写缓冲区:_readBuffer、_outputBuffer。
- (void)_MP_commonInit;
{
NSString *scheme = _url.scheme.lowercaseString;
assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
_secure = YES;
}
_readyState = MPWebSocketStateConnecting;
_consumerStopped = YES;
_webSocketVersion = 13;
_workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
// Going to set a specific on the queue so we can validate we're on the work queue
dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);
_delegateDispatchQueue = dispatch_get_main_queue();
mp_dispatch_retain(_delegateDispatchQueue);
_readBuffer = [[NSMutableData alloc] init];
_outputBuffer = [[NSMutableData alloc] init];
_currentFrameData = [[NSMutableData alloc] init];
_consumers = [NSMutableArray array];
_consumerPool = [[MPIOConsumerPool alloc] init];
_scheduledRunloops = [NSMutableSet set];
[self _initializeStreams];
// default handlers
}
开启连接 (给输入输出流绑定一个runloop)
- (void)_connect
{
if (!_scheduledRunloops.count) { // 判断有没有runloop,
[self scheduleInRunLoop:[NSRunLoop mp_networkRunLoop] forMode:NSDefaultRunLoopMode];
// 创建一个带有runloop的常驻线程,模式为 NSDefaultRunLoopMode
}
// 开启输入输出流
[_outputStream open];
[_inputStream open];
}
把socket放入run loop中 并打开流
- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
{
[_outputStream scheduleInRunLoop:aRunLoop forMode:mode];
[_inputStream scheduleInRunLoop:aRunLoop forMode:mode];
[_scheduledRunloops addObject:@[aRunLoop, mode]];
}
打开成功后会发送一个http请求 GET,作为第一次握手,WebSocket建立连接前,都会以http请求作为握手的方式,这个方法就是在构造http的请求头
- (void)didConnect
{
MPLogInfo(@"Connected");
CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR("GET"), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);
// Set host first so it defaults
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Host"), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@"%@:%@", _url.host, _url.port] : _url.host));
// 生成密钥,
NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];
int result = SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);
if (result != 0) {
MPLogError(@"Failed to generate random bytes with status: %d", result);
}
// 用base64转码
_secKey = [keyBytes base64EncodedStringWithOptions:NSDataBase64Encoding64CharacterLineLength];
// 断言编码后长度24
assert(_secKey.length == 24);
//web socket规范head
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Upgrade"), CFSTR("websocket"));
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Connection"), CFSTR("Upgrade"));
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Version"), (__bridge CFStringRef)[NSString stringWithFormat:@"%ld", (long)_webSocketVersion]);
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Origin"), (__bridge CFStringRef)_url.mp_origin);
if (_requestedProtocols) {//用户初始化的协议数组,可以约束websocket的一些行为
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Protocol"), (__bridge CFStringRef)[_requestedProtocols componentsJoinedByString:@", "]);
}
// 吧 _urlRequest中原有的head 设置到request中去
[_urlRequest.allHTTPHeaderFields enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {
CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)key, (__bridge CFStringRef)obj);
}];
//返回一个序列化 , CFBridgingRelease和 __bridge transfer一个意思, CFHTTPMessageCopySerializedMessage copy一份新的并且序列化,返回CFDataRef
NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));
//释放request
CFRelease(request);
//把这个request当成data去写
[self _writeData:message];
//读取http的头部
[self _readHTTPHeader];
}
处理消息帧方法
数据是通过CFStream流的方式回调回来的,每次拿到流数据,都是先放在数据缓冲区中,然后去读当前消息帧的头部,得到当前数据包的大小,然后再去创建消费者对象consumer,去读取缓冲区指定数据包大小的内容,读完才会回调给我们上层用户,所以,我们如果用SRWebSocket完全不需要考虑数据断包、粘包的问题,每次到达的数据,都是一条完整的数据
- (void)_readHTTPHeader;
{
if (_receivedHTTPHeaders == NULL) {
//序列化的http消息
_receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);
}
//不停的add consumer去读数据
[self _readUntilHeaderCompleteWithCallback:^(MPWebSocket *websocket, NSData *data) {
//拼接数据,拼到头部
CFHTTPMessageAppendBytes(websocket->_receivedHTTPHeaders, (const UInt8 *)data.bytes, (CFIndex)data.length);
// 持续进行反序列化直到CFHTTPMessageIsHeaderComplete 返回TRUE。如果不去检测CFHTTPMessageIsHeaderComplete返回TRUE,这个消息可能不完整和不可靠。
//判断是否接受完
if (CFHTTPMessageIsHeaderComplete(websocket->_receivedHTTPHeaders)) {
MPLogDebug(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(websocket->_receivedHTTPHeaders)));
[websocket _HTTPHeadersDidFinish]; //
} else {
//没读完递归调
[websocket _readHTTPHeader];
}
}];
}
检查握手信息
//我们发出这个http请求后,得到服务端的响应头,去按照服务端的方式加密Sec-WebSocket-Key,判断与Sec-WebSocket-Accept是否相同,相同则表明握手成功,否则失败处理。
- (BOOL)_checkHandshake:(CFHTTPMessageRef)httpMessage;
{
// 是不是允许的header
NSString *acceptHeader = CFBridgingRelease(CFHTTPMessageCopyHeaderFieldValue(httpMessage, CFSTR("Sec-WebSocket-Accept")));
// 为nil被服务器拒绝
if (acceptHeader == nil) {
return NO;
}
//得到
NSString *concatenatedString = [_secKey stringByAppendingString:MPWebSocketAppendToSecKeyString];
//期待accept的字符串
NSString *expectedAccept = [concatenatedString stringBySHA1ThenBase64Encoding];
//判断是否相同,相同就握手信息对了
return [acceptHeader isEqualToString:expectedAccept];
}
至此都成功的话,一个WebSocket连接建立完毕。
MPWebsocket基于NSStreamDelegate来完成数据流的读写
- (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
{
if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {
NSArray *sslCerts = [_urlRequest mp_SSLPinnedCertificates];
if (sslCerts) {
SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];
if (secTrust) {
NSInteger numCerts = SecTrustGetCertificateCount(secTrust);
for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {
SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);
NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));
for (id ref in sslCerts) {
SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;
NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));
if ([trustedCertData isEqualToData:certData]) {
_pinnedCertFound = YES;
break;
}
}
}
}
if (!_pinnedCertFound) {
dispatch_async(_workQueue, ^{
[self _failWithError:[NSError errorWithDomain:@"org.lolrus.SocketRocket" code:23556 userInfo:@{NSLocalizedDescriptionKey: [NSString stringWithFormat:@"Invalid server cert"]}]];
});
return;
}
}
}
dispatch_async(_workQueue, ^{
switch (eventCode) {
case NSStreamEventOpenCompleted: {
MPLogDebug(@"NSStreamEventOpenCompleted %@", aStream);
if (self.readyState >= MPWebSocketStateClosing) {
return;
}
assert(self->_readBuffer);
if (self.readyState == MPWebSocketStateConnecting && aStream == self->_inputStream) {
[self didConnect];
}
[self _pumpWriting];
[self _pumpScanner];
break;
}
case NSStreamEventErrorOccurred: {
MPLogError(@"NSStreamEventErrorOccurred %@ %@", aStream, [[aStream streamError] copy]);
/// TODO specify error better!
[self _failWithError:aStream.streamError];
self->_readBufferOffset = 0;
[self->_readBuffer setLength:0];
break;
}
case NSStreamEventEndEncountered: {
[self _pumpScanner];
MPLogDebug(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
} else {
if (self.readyState != MPWebSocketStateClosed) {
self.readyState = MPWebSocketStateClosed;
[self _scheduleCleanup];
}
if (!self->_sentClose && !self->_failed) {
self->_sentClose = YES;
// If we get closed in this state it's probably not clean because we should be sending this when we send messages
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:MPStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];
}
}];
}
}
break;
}
case NSStreamEventHasBytesAvailable: {
MPLogDebug(@"NSStreamEventHasBytesAvailable %@", aStream);
const int bufferSize = 2048;
uint8_t buffer[bufferSize];
while (self->_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];
if (bytes_read > 0) {
[self->_readBuffer appendBytes:buffer length:(NSUInteger)bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:self->_inputStream.streamError];
}
if (bytes_read != bufferSize) {
break;
}
}
[self _pumpScanner];
break;
}
case NSStreamEventHasSpaceAvailable: {
MPLogDebug(@"NSStreamEventHasSpaceAvailable %@", aStream);
[self _pumpWriting];
break;
}
default:
MPLogDebug(@"(default) %@", aStream);
break;
}
});
}