知识点:NSInputStream
、 NSOutputStream
、NSStreamDelegate
、 CFStreamCreatePairWithSocketToHost
、scheduleInRunLoop
、 SHA1
/* From RFC:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
*/
- 在把NSData或NSString经过SHA1加密然后再转换成Base64的字符串时,一个很清晰明了的命名
stringBySHA1ThenBase64Encoding
,这个命名把返回值及所做的操作,先是SHA1加密,然后再转换成Base64字符串的事说的一清二楚,以后多借鉴这种简单而高效的命名方式 - SHA即Secure Hash Algorithm(安全散列算法) 是美国国家安全局 (NSA) 设计,美国国家标准与技术研究院 (NIST) 发布的一系列密码散列函数,直接调用系统的方法
extern unsigned char *CC_SHA1(const void *data, CC_LONG len, unsigned char *md)
SocketRocket的实现方式如下:
static NSString *newSHA1String(const char *bytes, size_t length) {
uint8_t md[CC_SHA1_DIGEST_LENGTH];
assert(length >= 0);
assert(length <= UINT32_MAX);
CC_SHA1(bytes, (CC_LONG)length, md);
NSData *data = [NSData dataWithBytes:md length:CC_SHA1_DIGEST_LENGTH];
if ([data respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
return [data base64EncodedStringWithOptions:0];
}
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
return [data base64Encoding];
#pragma clang diagnostic pop
}
@implementation NSData (SRWebSocket)
-(NSString *)stringBySHA1ThenBase64Encoding;
{
return newSHA1String(self.bytes, self.length);
}
@end
@implementation NSString (SRWebSocket)
-(NSString *)stringBySHA1ThenBase64Encoding;
{
return newSHA1String(self.UTF8String, self.length);
}
@end
创建SocketRocket的所有方法最后都调用这一个方法
- (id)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray *)protocols allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates;
{
self = [super init];
if (self) {
assert(request.URL);
_url = request.URL;
_urlRequest = request;
_allowsUntrustedSSLCertificates = allowsUntrustedSSLCertificates;
_requestedProtocols = [protocols copy];
[self _SR_commonInit];
}
return self;
}
这个共通初始化调用方法判断url的scheme,只支持ws、wss、http、https,如果是wss、https就是安全请求
- (void)_SR_commonInit;
{
NSString *scheme = _url.scheme.lowercaseString;
assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
_secure = YES;
}
_readyState = SR_CONNECTING;
_consumerStopped = YES;
_webSocketVersion = 13;
_workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
// Going to set a specific on the queue so we can validate we're on the work queue
dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);
_delegateDispatchQueue = dispatch_get_main_queue();
sr_dispatch_retain(_delegateDispatchQueue);
_readBuffer = [[NSMutableData alloc] init];
_outputBuffer = [[NSMutableData alloc] init];
_currentFrameData = [[NSMutableData alloc] init];
_consumers = [[NSMutableArray alloc] init];
_consumerPool = [[SRIOConsumerPool alloc] init];
_scheduledRunloops = [[NSMutableSet alloc] init];
[self _initializeStreams];
// default handlers
}
- 如果没有设置端口,则设置默认的80或安全请求的443端口
- 使用
CFStreamCreatePairWithSocketToHost
创建socket连接
同时创建NSInputStream及NSOutputStream
- (void)_initializeStreams;
{
assert(_url.port.unsignedIntValue <= UINT32_MAX);
uint32_t port = _url.port.unsignedIntValue;
if (port == 0) {
if (!_secure) {
port = 80;
} else {
port = 443;
}
}
NSString *host = _url.host;
CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);
_outputStream = CFBridgingRelease(writeStream);
_inputStream = CFBridgingRelease(readStream);
_inputStream.delegate = self;
_outputStream.delegate = self;
}
- 开启连接,在超时时间后状态还是链接中,则说明链接超时,执行出错提示
- 对安全链接进行设置
- 如果没有设置过outputStream和inputStream的的runloop,设置默认的runloop
-(void)open;
{
assert(_url);
NSAssert(_readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once");
_selfRetain = self;
if (_urlRequest.timeoutInterval > 0)
{
dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, _urlRequest.timeoutInterval * NSEC_PER_SEC);
dispatch_after(popTime, dispatch_get_main_queue(), ^(void){
if (self.readyState == SR_CONNECTING)
[self _failWithError:[NSError errorWithDomain:@"com.squareup.SocketRocket" code:504 userInfo:@{NSLocalizedDescriptionKey: @"Timeout Connecting to Server"}]];
});
}
[self openConnection];
}
- (void)openConnection;
{
[self _updateSecureStreamOptions];
if (!_scheduledRunloops.count) {
[self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
}
[_outputStream open];
[_inputStream open];
}
#pragma mark 设置NSStream工作所在的runloop及mode
- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
{
[_outputStream scheduleInRunLoop:aRunLoop forMode:mode];
[_inputStream scheduleInRunLoop:aRunLoop forMode:mode];
[_scheduledRunloops addObject:@[aRunLoop, mode]];
}
- (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
{
[_outputStream removeFromRunLoop:aRunLoop forMode:mode];
[_inputStream removeFromRunLoop:aRunLoop forMode:mode];
[_scheduledRunloops removeObject:@[aRunLoop, mode]];
}
- 发送数据
- 发送的连接数据时放在初始化的request中,以后发送心跳包的ping是使用send方法
-(void)send:(id)data;
{
NSAssert(self.readyState != SR_CONNECTING, @"Invalid State: Cannot call send: until connection is open");
// TODO: maybe not copy this for performance
data = [data copy];
dispatch_async(_workQueue, ^{
if ([data isKindOfClass:[NSString class]]) {
[self _sendFrameWithOpcode:SROpCodeTextFrame data:[(NSString *)data dataUsingEncoding:NSUTF8StringEncoding]];
} else if ([data isKindOfClass:[NSData class]]) {
[self _sendFrameWithOpcode:SROpCodeBinaryFrame data:data];
} else if (data == nil) {
[self _sendFrameWithOpcode:SROpCodeTextFrame data:data];
} else {
assert(NO);
}
});
}
将数据封装成socket的协议数据
static const size_t SRFrameHeaderOverhead = 32;
-(void)_sendFrameWithOpcode:(SROpCode)opcode data:(id)data;
{
[self assertOnWorkQueue];
if (nil == data) {
return;
}
NSAssert([data isKindOfClass:[NSData class]] || [data isKindOfClass:[NSString class]], @"NSString or NSData");
size_t payloadLength = [data isKindOfClass:[NSString class]] ? [(NSString *)data lengthOfBytesUsingEncoding:NSUTF8StringEncoding] : [data length];
NSMutableData *frame = [[NSMutableData alloc] initWithLength:payloadLength + SRFrameHeaderOverhead];
if (!frame) {
[self closeWithCode:SRStatusCodeMessageTooBig reason:@"Message too big"];
return;
}
uint8_t *frame_buffer = (uint8_t *)[frame mutableBytes];
// set fin
frame_buffer[0] = SRFinMask | opcode;
BOOL useMask = YES;
#ifdef NOMASK
useMask = NO;
#endif
if (useMask) {
// set the mask and header
frame_buffer[1] |= SRMaskMask;
}
size_t frame_buffer_size = 2;
const uint8_t *unmasked_payload = NULL;
if ([data isKindOfClass:[NSData class]]) {
unmasked_payload = (uint8_t *)[data bytes];
} else if ([data isKindOfClass:[NSString class]]) {
unmasked_payload = (const uint8_t *)[data UTF8String];
} else {
return;
}
if (payloadLength < 126) {
frame_buffer[1] |= payloadLength;
} else if (payloadLength <= UINT16_MAX) {
frame_buffer[1] |= 126;
*((uint16_t *)(frame_buffer + frame_buffer_size)) = EndianU16_BtoN((uint16_t)payloadLength);
frame_buffer_size += sizeof(uint16_t);
} else {
frame_buffer[1] |= 127;
*((uint64_t *)(frame_buffer + frame_buffer_size)) = EndianU64_BtoN((uint64_t)payloadLength);
frame_buffer_size += sizeof(uint64_t);
}
if (!useMask) {
for (size_t i = 0; i < payloadLength; i++) {
frame_buffer[frame_buffer_size] = unmasked_payload[i];
frame_buffer_size += 1;
}
} else {
uint8_t *mask_key = frame_buffer + frame_buffer_size;
SecRandomCopyBytes(kSecRandomDefault, sizeof(uint32_t), (uint8_t *)mask_key);
frame_buffer_size += sizeof(uint32_t);
// TODO: could probably optimize this with SIMD
for (size_t i = 0; i < payloadLength; i++) {
frame_buffer[frame_buffer_size] = unmasked_payload[i] ^ mask_key[i % sizeof(uint32_t)];
frame_buffer_size += 1;
}
}
assert(frame_buffer_size <= [frame length]);
frame.length = frame_buffer_size;
[self _writeData:frame];
}
- 发送数据
- 忘后台发送数据总共有4个地方,
1、是发送一般数据或心跳包
2、调用close时也是给后台发送数据
3、4都是在Input/outputStream的代理回调时
- (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode
- (void)safeHandleEvent:(NSStreamEvent)eventCode stream:(NSStream *)aStream
NSStreamEventOpenCompleted/NSStreamEventHasSpaceAvailable
-(void)_writeData:(NSData *)data;
{
[self assertOnWorkQueue];
if (_closeWhenFinishedWriting) {
return;
}
[_outputBuffer appendData:data];
[self _pumpWriting];
}
-(void)_pumpWriting;
{
[self assertOnWorkQueue];
NSUInteger dataLength = _outputBuffer.length;
if (dataLength - _outputBufferOffset > 0 && _outputStream.hasSpaceAvailable) {
NSInteger bytesWritten = [_outputStream write:_outputBuffer.bytes + _outputBufferOffset maxLength:dataLength - _outputBufferOffset];
if (bytesWritten == -1) {
[self _failWithError:[NSError errorWithDomain:SRWebSocketErrorDomain code:2145 userInfo:[NSDictionary dictionaryWithObject:@"Error writing to stream" forKey:NSLocalizedDescriptionKey]]];
return;
}
_outputBufferOffset += bytesWritten;
if (_outputBufferOffset > 4096 && _outputBufferOffset > (_outputBuffer.length >> 1)) {
_outputBuffer = [[NSMutableData alloc] initWithBytes:(char *)_outputBuffer.bytes + _outputBufferOffset length:_outputBuffer.length - _outputBufferOffset];
_outputBufferOffset = 0;
} } if (_closeWhenFinishedWriting &&
_outputBuffer.length - _outputBufferOffset == 0 &&
(_inputStream.streamStatus != NSStreamStatusNotOpen &&
_inputStream.streamStatus != NSStreamStatusClosed) &&
!_sentClose) {
_sentClose = YES;
@synchronized(self) {
[_outputStream close];
[_inputStream close];
for (NSArray *runLoop in [_scheduledRunloops copy]) {
[self unscheduleFromRunLoop:[runLoop objectAtIndex:0] forMode:[runLoop objectAtIndex:1]];
}
}
if (!_failed) {
[self _performDelegateBlock:^{
if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {
[self.delegate webSocket:self didCloseWithCode:_closeCode reason:_closeReason wasClean:YES];
}
}];
} [self _scheduleCleanup]; } }
- 清空数据
-(void)_scheduleCleanup {
@synchronized(self) {
if (_cleanupScheduled) {
return; }
_cleanupScheduled = YES;
// Cleanup NSStream delegate's in the same RunLoop used by the streams themselves:
// This way we'll prevent race conditions between handleEvent and SRWebsocket's dealloc
NSTimer *timer = [NSTimer timerWithTimeInterval:(0.0f) target:self selector:@selector(_cleanupSelfReference:) userInfo:nil repeats:NO];
[[NSRunLoop SR_networkRunLoop] addTimer:timer forMode:NSDefaultRunLoopMode];}
}
-(void)_cleanupSelfReference:(NSTimer *)timer
{
@synchronized(self) {
// Nuke NSStream delegate's
_inputStream.delegate = nil;
_outputStream.delegate = nil;
// Remove the streams, right now, from the networkRunLoop
[_inputStream close];
[_outputStream close];
}
// Cleanup selfRetain in the same GCD queue as usual
dispatch_async(_workQueue, ^{
_selfRetain = nil;
});
}