IOS源码解析:GCDAsySocket

原创:知识进阶型文章
创作不易,请珍惜,之后会持续更新,不断完善
个人比较喜欢做笔记和写总结,毕竟好记性不如烂笔头哈哈,这些文章记录了我的IOS成长历程,希望能与大家一起进步
温馨提示:由于简书不支持目录跳转,大家可通过command + F 输入目录标题后迅速寻找到你所需要的内容

目录

  • 一、GCDAsyncSocket的成员变量
  • 二、创建socket
    • 1、提供给外界调用创建socket的接口方法
    • 2、层级调用
    • 3、最终调用的初始化方法
    • 4、在懒加载方法中进行一系列配置
  • 三、连接socket
    • 1、连接socket方法的具体实现
    • 2、包裹在自动释放池中的GCDBlock中的具体内容
    • 3、创建全局队列并异步执行代码块中的内容
    • 4、实现在连接之前的接口检查方法
    • 5、根据interface 得到IPV4和IPV6地址
  • 四、连接socket最终调用的方法
    • 1、lookup方法的内部实现
    • 2、connectWithAddress4方法的内部实现
    • 3、创建Socket
    • 4、连接Socket的究极方法
    • 5、连接成功后的回调:设置一些连接成功的状态
    • 6、注册Stream的回调和读stream的回调
    • 7、stream与runloop
  • 五、连接到服务器后开始读取数据
    • 1、用偏移量 maxLength 读取数据
    • 2、让读任务离队,开始执行这条读任务
    • 3、可能开启TLS
    • 4、读取数据
  • Demo
  • 参考文献

一、GCDAsyncSocket的成员变量

@implementation GCDAsyncSocket
{
    ...
}
标识当前socket的状态
uint32_t flags;

enum GCDAsyncSocketFlags
{
    kSocketStarted                 = 1 <<  0,  // If set, socket has been started (accepting/connecting)
    kConnected        
        ...
}
ipv4和ipv6的配置
uint16_t config;

enum GCDAsyncSocketConfig
{
    kIPv4Disabled              = 1 << 0,  // If set, IPv4 is disabled
    kIPv6Disabled              = 1 << 1,  // If set, IPv6 is disabled
    kPreferIPv6                = 1 << 2,  // If set, IPv6 is preferred over IPv4
    kAllowHalfDuplexConnection = 1 << 3,  // If set, the socket will stay open even if the read stream closes
};
GCDAsyncSocketDelegate属性
__weak id<GCDAsyncSocketDelegate> delegate;// 当连接socket、读写数据、关闭socket的时候进行回调代理属性
dispatch_queue_t delegateQueue;// 代理回调的queue
三种Socket类型对应三种地址
int socket4FD;// 本地IPV4Socket
int socket6FD;// 本地IPV6Socket
int socketUN;// unix域的套接字,用于进程之间通讯
NSURL *socketUrl;// 服务端url
int stateIndex;// 状态Index
NSData * connectInterface4;// 本机的IPV4地址
NSData * connectInterface6;// 本机的IPV6地址
NSData * connectInterfaceUN;// 本机unix域地址
这个类对Socket的操作都在这个串行queue中
dispatch_queue_t socketQueue;
接收源数据的回调事件
dispatch_source_t accept4Source;
dispatch_source_t accept6Source;
dispatch_source_t acceptUNSource;
链接、读写timer用的是GCD定时器
dispatch_source_t connectTimer;
dispatch_source_t readSource;
dispatch_source_t writeSource;
dispatch_source_t readTimer;
dispatch_source_t writeTimer;

connectTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, socketQueue);
dispatch_source_set_event_handler(connectTimer, ^{ @autoreleasepool {
    [strongSelf doConnectTimeout];
}});
读写数据包数组:类似queue,先读进去的先拿出来(FIFO)最大限制为5个包
NSMutableArray *readQueue;
NSMutableArray *writeQueue;
当前正在读写数据包
GCDAsyncReadPacket *currentRead;
GCDAsyncWritePacket *currentWrite;
当前socket未获取完的数据大小
unsigned long socketFDBytesAvailable;
全局公用的提前缓冲区:读取的数据会填充到Buffer中,填充满了后就会取出Buffer中的数据进行代理回调
GCDAsyncSocketPreBuffer *preBuffer;
读写数据流
CFStreamClientContext streamContext;
CFReadStreamRef readStream;// 读的数据流
CFWriteStreamRef writeStream;// 写的数据流
SSL认证
SSLContextRef sslContext;// SSL上下文,用来做SSL认证
GCDAsyncSocketPreBuffer *sslPreBuffer;// 全局公用的SSL的提前缓冲区
size_t sslWriteCachedLength;
OSStatus sslErrCode;// 记录SSL读取数据错误
OSStatus lastSSLHandshakeError;// 记录SSL握手的错误
socket队列的标识key
void *IsOnSocketQueueOrTargetQueueKey;

// 将socketQueue作上标记方便取出
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);

// 当前线程根据这个标识判断是不是在这个队列
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
{
    return delegate;
}

if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
{
    return delegateQueue;
}
连接备选服务端地址的延时 (另一个IPV4或IPV6)
NSTimeInterval alternateAddressDelay;

alternateAddressDelay = 0.3;// 默认0.3秒

// 用socket和address去连接服务器
[self connectSocket:socketFD address:address stateIndex:aStateIndex];

// 如果有备选地址
if (alternateAddress)
{
    // 延迟去连接备选的地址
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{
        [self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex];
    });
}

二、创建socket

self.socket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_global_queue(0, 0)];

1、提供给外界调用创建socket的接口方法

- (instancetype)init;
- (instancetype)initWithSocketQueue:(nullable dispatch_queue_t)sq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq socketQueue:(nullable dispatch_queue_t)sq;

2、层级调用

- (id)init
{
    return [self initWithDelegate:nil delegateQueue:NULL socketQueue:NULL];
}
- (id)initWithSocketQueue:(dispatch_queue_t)sq
{
    return [self initWithDelegate:nil delegateQueue:NULL socketQueue:sq];
}
- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq
{
    return [self initWithDelegate:aDelegate delegateQueue:dq socketQueue:NULL];
}

3、最终调用的初始化方法

- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq socketQueue:(dispatch_queue_t)sq
{
    if((self = [super init]))
    {
            ...
    }
    return self;
}
❶ 创建socket,属性值先都置为 -1
socket4FD = SOCKET_NULL;// 本机的ipv4
socket6FD = SOCKET_NULL;
socketUN = SOCKET_NULL;
socketUrl = nil;
stateIndex = 0;// 状态Index
❷ 如果scoketQueue存在却是global的则报错,没有的话创建一个scoketQueue(名字为:GCDAsyncSocket,NULL表示是串行队列)
if (sq)
{
    // 断言必须要一个非并行queue
    NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
             @"The given socketQueue parameter must not be a concurrent queue.");
    
    socketQueue = sq;
}
else
{
    socketQueue = dispatch_queue_create([GCDAsyncSocketQueueName UTF8String], NULL);
}
❸ 给当前队里加一个标识
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);
❹ 初始化读写数组,限制大小为5
readQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentRead = nil;

writeQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentWrite = nil;
❺ 设置缓冲区大小为4kb
preBuffer = [[GCDAsyncSocketPreBuffer alloc] initWithCapacity:(1024 * 4)];
❻ 设置连接备选服务端地址的延时为0.3秒
alternateAddressDelay = 0.3;

4、在懒加载方法中进行一系列配置

delegate
- (id)delegate
{
    if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))// 当前队列已经是同步串行
    {
        return delegate;
    }
    else// 否则让其处于同步串行
    {
        __block id result;
        
        // 同步串行队列:保证socket顺序安全,因为需要先建立链接才能读写数据,存在顺序性
        dispatch_sync(socketQueue, ^{
            result = self->delegate;
        });
        
        return result;
    }
}
delegateQueue
- (dispatch_queue_t)delegateQueue
{
    if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))// 当前代理队列已经是同步串行
    {
        return delegateQueue;
    }
    else// 否则让其处于同步串行
    {
        __block dispatch_queue_t result;
        
        // 保证代理数据回调顺序安全
        dispatch_sync(socketQueue, ^{
            result = self->delegateQueue;
        });
        
        return result;
    }
}

三、连接socket

[self.socket connectToHost:@"127.0.0.1" onPort:8090 withTimeout:-1 error:&error];

1、连接socket方法的具体实现

- (BOOL)connectToHost:(NSString *)inHost
               onPort:(uint16_t)port
         viaInterface:(NSString *)inInterface
          withTimeout:(NSTimeInterval)timeout
                error:(NSError **)errPtr
{
    ...
}
❶ 拿到host和interface,通过copy防止外界传入值被修改

很可能给我们的服务端的参数是一个可变字符串,所以我们需要copy,在Block里同步执行,这样就不需要担心它被改变。

NSString *host = [inHost copy];
NSString *interface = [inInterface copy];
❷ 包裹在自动释放池中的GCDBlock
  • 大量临时变量(connect : 重连)
  • 自定义线程管理 (NSOperation)
dispatch_block_t block = ^{ @autoreleasepool {
    ...
}
❸ 在socketQueue中执行这个Block,否则同步的调起这个queue去执行
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
    block();// 在socketQueue中执行这个Block
else
    dispatch_sync(socketQueue, block);// 否则同步的调起这个queue去执行
❹ 如果发生了错误,则将错误进行赋值
__block NSError *preConnectErr = nil;// 错误信息
...
f (errPtr) *errPtr = preConnectErr;
❺ 把连接是否成功的result返回
__block BOOL result = NO;// 返回结果
...
return result;

2、包裹在自动释放池中的GCDBlock中的具体内容

dispatch_block_t block = ^{ @autoreleasepool {
    ...
}
❶ 对host服务器地址有效性进行校验

return_from_block这个宏其实就是returnhost长度为0本来应该直接返回 但是目前在GCDBlock中具有异步性即编译器还没有识别到return的时候就直接跳过去了顺序执行下面内容,所以采用在预编译期执行的宏定义这种方式防止此类错误的发生。其实这种情况很少发生,因为系统对return已经进行过优化防止此类情况发生。

#define return_from_block  return

if ([host length] == 0)
{
    NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string.";
    preConnectErr = [self badParamError:msg];
    
    return_from_block;
}
❷ 在连接之前的接口检查,一般我们传nil
// interface表示本机的IP端口
if (![self preConnectWithInterface:interface error:&preConnectErr])
{
    return_from_block;
}
❸ flags做或等运算:将flags标识为开始Socket连接
self->flags |= kSocketStarted;
❹ 创建全局队列并异步执行
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(...);
❺ 启连接超时并返回YES
[self startConnectTimeout:timeout];

result = YES;

3、创建全局队列并异步执行代码块中的内容

dispatch_async(globalConcurrentQueue, ^{ @autoreleasepool {
}});
❶ 获取server地址数组(包含IPV4 IPV6的地址)
NSError *lookupErr = nil;// 查找错误
// lookupHost方法用来获取服务器地址,内部实现和获取本地地址大同小异,不再赘述
NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr];
❷ 如果有错
if (lookupErr)// 如果有错
{
    // 使用scocketQueue
    dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
        // 一些错误处理,比如清空一些数据等等
        [strongSelf lookup:aStateIndex didFail:lookupErr];
    }});
}
❸ 没有发生错误的话则遍历地址数组得到IPV4和IPV6地址
NSData *address4 = nil;
NSData *address6 = nil;

for (NSData *address in addresses)// 遍历地址数组
{
    // 判断是否address4为空且address为IPV4
    if (!address4 && [[self class] isIPv4Address:address])
    {
        address4 = address;
    }
    // 判断是否address6为空且address为IPV6
    else if (!address6 && [[self class] isIPv6Address:address])
    {
        address6 = address;
    }
}
❹ 在socketQueue同步队列中异步去发起连接以保证顺序执行且不堵塞线程
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
    [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});

4、实现在连接之前的接口检查方法

- (BOOL)preConnectWithInterface:(NSString *)interface error:(NSError **)errPtr
{
    ...
    return YES
}
❶ 先断言,如果当前的queue不是初始化的quueue,直接报错
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
❷ 进行一系列有效性 判断

&表示位与运算,因为枚举是用左位移<<运算定义的,所以可以用&来判断config包不包含某个枚举。因为一个值可能包含好几个枚举值,所以这时候不能用==来判断,只能用&来判断。

if (delegate == nil)// 无代理
if (delegateQueue == NULL) // 没有代理queue
if (![self isDisconnected])// 当前不是非连接状态(即已经连接了所以没必要再进行连接)

// 是否支持IPV4 IPV6
BOOL isIPv4Disabled = (config & kIPv4Disabled) ? YES : NO;
BOOL isIPv6Disabled = (config & kIPv6Disabled) ? YES : NO;

if (isIPv4Disabled && isIPv6Disabled) // 是否都不支持
{
    if (errPtr)
    {
        NSString *msg = @"Both IPv4 and IPv6 have been disabled. Must enable at least one protocol first.";
        *errPtr = [self badConfigError:msg];
    }
    return NO;
}
❸ 如果有interface本机地址(一般传入的是nil)
if (interface)
{
    NSMutableData *interface4 = nil;
    NSMutableData *interface6 = nil;
    
    // 得到本机的IPV4和IPV6地址
    [self getInterfaceAddress4:&interface4 address6:&interface6 fromDescription:interface port:0];
    
    // 如果两者都为nil
    if ((interface4 == nil) && (interface6 == nil))
    if (isIPv4Disabled && (interface6 == nil))
    if (isIPv6Disabled && (interface4 == nil))
    
    // 如果都没问题则赋值到成员变量中
    connectInterface4 = interface4;
    connectInterface6 = interface6;
}

5、根据interface 得到IPV4和IPV6地址

- (void)getInterfaceAddress4:(NSMutableData **)interfaceAddr4Ptr
                    address6:(NSMutableData **)interfaceAddr6Ptr
             fromDescription:(NSString *)interfaceDescription
                        port:(uint16_t)port
{
    NSMutableData *addr4 = nil;
    NSMutableData *addr6 = nil;
    NSString *interface = nil;
    ...
}
❶ 先用:分割(比如https://www.127.0.0.1
NSArray *components = [interfaceDescription componentsSeparatedByString:@":"];
if ([components count] > 0)
{
    NSString *temp = [components objectAtIndex:0];
    if ([temp length] > 0)
    {
        interface = temp;
    }
}
❷ 拿到port端口
if ([components count] > 1 && port == 0)
{
    // 将一个字符串根据base参数转成长整型,如base值为10则采用10进制
    NSString *temp = [components objectAtIndex:1];
    long portL = strtol([temp UTF8String], NULL, 10);
    
    // UINT16_MAX:65535最大端口号
    if (portL > 0 && portL <= UINT16_MAX)
    {
        port = (uint16_t)portL;
    }
}
❸ interface为空则自己创建一个interface
if (interface == nil)
{
    // 生成一个地址结构体
    struct sockaddr_in sockaddr4;
    // memset的作用是在一段内存块中填充某个给定的值,它是对较大的结构体或数组进行清零操作的一种最快方法
    memset(&sockaddr4, 0, sizeof(sockaddr4));
    
    sockaddr4.sin_len         = sizeof(sockaddr4);// 结构体长度
    sockaddr4.sin_family      = AF_INET;
    sockaddr4.sin_port        = htons(port);// 端口号
    sockaddr4.sin_addr.s_addr = htonl(INADDR_ANY);// 0.0.0.0表示不确定地址或者任意地址
    
    // sockaddr6同上
    ...
    
    // 把这两个结构体转成data
    addr4 = [NSMutableData dataWithBytes:&sockaddr4 length:sizeof(sockaddr4)];
    addr6 = [NSMutableData dataWithBytes:&sockaddr6 length:sizeof(sockaddr6)];
}
❹ 如果是localhost、loopback(回环地址)就赋值为127.0.0.1
else if ([interface isEqualToString:@"localhost"] || [interface isEqualToString:@"loopback"])
{
    sockaddr4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);// 127.0.0.1
    // sockaddr6同上
    ...
    addr4 = [NSMutableData dataWithBytes:&sockaddr4 length:sizeof(sockaddr4)];
    addr6 = [NSMutableData dataWithBytes:&sockaddr6 length:sizeof(sockaddr6)];
}
❺ 非localhost、loopback则去获取本机IP,看是否和传进来Interface是同名或者同IP,相同才给赋端口号,把数据封装进Data,否则为nil
else
{
    const char *iface = [interface UTF8String];// 转成cString
    
    struct ifaddrs *addrs;// 定义结构体指针,这个指针是本地IP
    const struct ifaddrs *cursor;
    
    // 获取到本机IP,为0说明成功了
    if ((getifaddrs(&addrs) == 0))
    {
        cursor = addrs;// 赋值
        while (cursor != NULL)// 如果IP不为空,则循环链表去设置
        {
            // 如果 addr4 IPV4地址为空,而且地址类型为IPV4
            if ((addr4 == nil) && (cursor->ifa_addr->sa_family == AF_INET))
            {
                ...
            }
            else if ((addr6 == nil) && (cursor->ifa_addr->sa_family == AF_INET6))// IPV6同上
            {
                ...
            }
            // 指向链表下一个addr
            cursor = cursor->ifa_next;
        }
    }
}
❻ 如果 addr4 IPV4地址为空,而且地址类型为IPV4
// IPv4
struct sockaddr_in nativeAddr4;
// memcpy内存copy函数,把src开始到size的字节数copy到dest中
memcpy(&nativeAddr4, cursor->ifa_addr, sizeof(nativeAddr4));

// 比较两个字符串是否相同:本机的IP名和接口interface是否相同
if (strcmp(cursor->ifa_name, iface) == 0)
{
    // 相同则赋值 port
    nativeAddr4.sin_port = htons(port);
    // 用data封号IPV4地址
    addr4 = [NSMutableData dataWithBytes:&nativeAddr4 length:sizeof(nativeAddr4)];
}
else// 本机IP名和interface不相同
{
    // 声明一个IP 16位的数组
    char ip[INET_ADDRSTRLEN];
    
    // 这里是转成了10进制(因为获取到的是二进制IP)
    const char *conversion = inet_ntop(AF_INET, &nativeAddr4.sin_addr, ip, sizeof(ip));
    
    // 如果conversion不为空说明转换成功,比较转换后的IP和interface是否相同
    if ((conversion != NULL) && (strcmp(ip, iface) == 0))
    {
        // 相同则赋值 port
        nativeAddr4.sin_port = htons(port);
        
        addr4 = [NSMutableData dataWithBytes:&nativeAddr4 length:sizeof(nativeAddr4)];
    }
}
❼ 如果这两个二级指针存在,则取成一级指针,把addr4赋值给它
if (interfaceAddr4Ptr) *interfaceAddr4Ptr = addr4;
if (interfaceAddr6Ptr) *interfaceAddr6Ptr = addr6;

四、连接socket最终调用的方法

dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
    [strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});

1、lookup方法的内部实现

- (void)lookup:(int)aStateIndex didSucceedWithAddress4:(NSData *)address4 address6:(NSData *)address6
{
    NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
    // 至少有一个server服务器地址
    NSAssert(address4 || address6, @"Expected at least one valid address");
    
    // 如果状态不一致,说明断开连接
    if (aStateIndex != stateIndex)
    {
        LogInfo(@"Ignoring lookupDidSucceed, already disconnected");
        return;
    }
    
    // 有效性检查
    if (isIPv4Disabled && (address6 == nil))
    if (isIPv6Disabled && (address4 == nil))

    // 调用连接方法,如果失败,则返回错误信息
    NSError *err = nil;
    if (![self connectWithAddress4:address4 address6:address6 error:&err])
    {
        [self closeWithError:err];
    }
}

2、connectWithAddress4方法的内部实现

- (BOOL)connectWithAddress4:(NSData *)address4 address6:(NSData *)address6 error:(NSError **)errPtr
{
    ...
    return YES;
}
❶ 决定socket类型
// 判断是否倾向于IPV6
BOOL preferIPv6 = (config & kPreferIPv6) ? YES : NO;

// 如果有IPV4地址,创建IPV4 Socket
if (address4)
{
    socket4FD = [self createSocket:AF_INET connectInterface:connectInterface4 errPtr:errPtr];
}

// 如果有IPV6地址,创建IPV6 Socket
if (address6)
{
    socket6FD = [self createSocket:AF_INET6 connectInterface:connectInterface6 errPtr:errPtr];
}

// 如果都为空,直接返回
if (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL)
{
    return NO;
}
❷ 主选与备选
int socketFD, alternateSocketFD;// 主选socketFD,备选alternateSocketFD
NSData *address, *alternateAddress;// 主选地址和备选地址

// 主选IPV6
if ((preferIPv6 && socket6FD != SOCKET_NULL) || socket4FD == SOCKET_NULL)
{
    socketFD = socket6FD;
    alternateSocketFD = socket4FD;
    address = address6;
    alternateAddress = address4;
}
// 主选IPV4
else
{
    socketFD = socket4FD;
    alternateSocketFD = socket6FD;
    address = address4;
    alternateAddress = address6;
}
// 拿到当前状态
int aStateIndex = stateIndex;
❸ 用socket和address去连接服务器
[self connectSocket:socketFD address:address stateIndex:aStateIndex];
❹ 如果有备选地址则延迟去连接备选的地址
if (alternateAddress)
{
    // 延迟去连接备选的地址
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{
        [self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex];
    });
}

3、创建Socket

- (int)createSocket:(int)family connectInterface:(NSData *)connectInterface errPtr:(NSError **)errPtr
{
    // 用SOCK_STREAM TCP流创建socket
    int socketFD = socket(family, SOCK_STREAM, 0);
    
    // 如果创建失败
    if (socketFD == SOCKET_NULL)
    {
        if (errPtr)
            *errPtr = [self errorWithErrno:errno reason:@"Error in socket() function"];
        
        return socketFD;
    }
    
    // 和connectInterface绑定
    if (![self bindSocket:socketFD toInterface:connectInterface error:errPtr])
    {
        // 绑定失败,直接关闭返回
        [self closeSocket:socketFD];
        
        return SOCKET_NULL;
    }

    return socketFD;
}

4、连接Socket的究极方法

- (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex
{
    ...
}
❶ 发现已连接则关闭连接直接返回
if (self.isConnected)
{
    [self closeSocket:socketFD];
    return;
}
❷ 在全局Queue中开启新线程进行连接
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(globalConcurrentQueue, ^{
    ...
});
❸ 调用connect方法,该函数会阻塞线程,所以要通过异步方式执行并开启新线程

客户端向特定网络地址的服务器发送连接请求,连接成功返回0,失败返回 -1。

int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
❹ 连接成功
// 在socketQueue中开辟线程,即在同步队列中异步执行
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
    if (result == 0)// 连接成功
    {
        // 关闭掉另一个没用的socket
        [self closeUnusedSocket:socketFD];
        
        // 调用didConnect:生成stream并改变状态
        [strongSelf didConnect:aStateIndex];
    }
    else// 连接失败
    {
        ...
    }
}});
❺ 连接失败
// 关闭当前socket
[strongSelf closeSocket:socketFD];

// 返回连接错误的error
if (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL)
{
    NSError *error = [strongSelf errorWithErrno:err reason:@"Error in connect() function"];
    [strongSelf didNotConnect:aStateIndex error:error];
}

5、连接成功后的回调:设置一些连接成功的状态

- (void)didConnect:(int)aStateIndex
{
    ...
}
❶ 调用回调前的准备工作
// 状态不同
if (aStateIndex != stateIndex)

// 将kConnected合并到当前flag中
flags |= kConnected;

// 停止超时连接
[self endConnectTimeout];

// 创建个Block来初始化Stream
dispatch_block_t SetupStreamsPart1 = ^{
    // 创建读写stream失败,则关闭并报对应错误
    if (![self createReadAndWriteStream])
    
    // 参数NO表示有可读bytes的时候不会调用回调函数
    if (![self registerForStreamCallbacksIncludingReadWrite:NO])
};

// 创建个Block来设置stream
dispatch_block_t SetupStreamsPart2 = ^{
    if (![self addStreamsToRunLoop])// 如果加到runloop上失败
    if (![self openStreams])// 打开读写stream
};

// 拿到server端的host和port
NSString *host = [self connectedHost];
uint16_t port = [self connectedPort];
__strong id<GCDAsyncSocketDelegate> theDelegate = delegate;// 拿到代理
❷ 调用回调函数
// 代理队列和Host不为nil且响应didConnectToHost代理方法
if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
{
    // 调用初始化stream
    SetupStreamsPart1();
    
    dispatch_async(delegateQueue, ^{ @autoreleasepool {
        // 到代理队列调用连接成功的代理方法
        [theDelegate socket:self didConnectToHost:host port:port];
        
        dispatch_async(self->socketQueue, ^{ @autoreleasepool {
            // 然后回到socketQueue中去执行设置stream
            SetupStreamsPart2();
        }});
    }});
}
❸ 初始化读写source并开始下一个任务
// 初始化读写source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];

// 开始下一个任务
[self maybeDequeueRead];
[self maybeDequeueWrite];

6、注册Stream的回调和读stream的回调

a、注册Stream的回调
- (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
{
    // 设置一个CF的flag:一种是错误发生的时候,一种是stream事件结束
    CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
    if (includeReadWrite)// 如果包含读写
        readStreamEvents |= kCFStreamEventHasBytesAvailable;// 仍然有Bytes要读
    
    // 给读stream设置客户端,会在之前设置的那些标记下回调函数CFReadStreamCallback
    if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))// 客户端stream上下文对象
    {
        return NO;
    }
    
    // 写的flag同上
}
b、读stream的回调
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
{
    // 得到触发回调的sokcet
    GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)pInfo;
    
    switch(type)
    {
        case kCFStreamEventHasBytesAvailable:// 如果是可读数据的回调
        {
            // 在socketQueue中调用
            dispatch_async(asyncSocket->socketQueue, ^{ @autoreleasepool {
                asyncSocket->flags |= kSecureSocketHasBytesAvailable;
                [asyncSocket doReadData];// 去读取数据
            }});
            
            break;
        }
        ...
    }
}

7、客户端的runloop绑定stream

stream随着runloop不停转动,只要stream流一发生改变就能立刻更新数据。

- (void)didConnect:(int)aStateIndex
{
    // 创建个Block来设置stream
    dispatch_block_t SetupStreamsPart2 = ^{
        if (![self addStreamsToRunLoop])// 如果加到runloop上失败
}
a、把stream添加到runloop上
- (BOOL)addStreamsToRunLoop
{
    // 判断flag里是否包含kAddedStreamsToRunLoop,没添加过则添加
    if (!(flags & kAddedStreamsToRunLoop))
    {
        [[self class] startCFStreamThreadIfNeeded];
        
        // 在开启的线程中去执行(阻塞式的)
        dispatch_sync(cfstreamThreadSetupQueue, ^{
            [[self class] performSelector:@selector(scheduleCFStreams:)
                                 onThread:cfstreamThread
                               withObject:self
                            waitUntilDone:YES];
        });
        // 添加标识
        flags |= kAddedStreamsToRunLoop;
    }
    
    return YES;
}
b、注册CFStream
+ (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
{
    // 获取到runloop
    CFRunLoopRef runLoop = CFRunLoopGetCurrent();
    
    // 如果有readStream
    if (asyncSocket->readStream)
        // 注册readStream在runloop的kCFRunLoopDefaultMode上
        CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
    // 同上
    if (asyncSocket->writeStream)
        CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
}

五、连接到服务器后开始读取数据

// 已经连接到服务器
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(nonnull NSString *)host port:(uint16_t)port
{
    NSLog(@"连接成功,主机:%@,端口:%d",host,port);
    [self.socket readDataWithTimeout:-1 tag:10086];
}

1、用偏移量 maxLength 读取数据

- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag
{
    [self readDataWithTimeout:timeout buffer:nil bufferOffset:0 maxLength:0 tag:tag];
}
- (void)readDataWithTimeout:(NSTimeInterval)timeout
                     buffer:(NSMutableData *)buffer
               bufferOffset:(NSUInteger)offset
                        tag:(long)tag
{
    [self readDataWithTimeout:timeout buffer:buffer bufferOffset:offset maxLength:0 tag:tag];
}
- (void)readDataWithTimeout:(NSTimeInterval)timeout
                     buffer:(NSMutableData *)buffer
               bufferOffset:(NSUInteger)offset
                  maxLength:(NSUInteger)length
                        tag:(long)tag
{
    // 初始化packet(读取的数据都是一个个的包)
    GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer
                                  
    // 往读的队列添加任务,任务是包的形式
    [self->readQueue addObject:packet];
                                  
    // 让读任务离队,开始执行这条读任务
    [self maybeDequeueRead];
}

2、让读任务离队,开始执行这条读任务

- (void)maybeDequeueRead
{
    ...
}
a、如果当前读的包为空,而且flag为已连接
if ((currentRead == nil) && (flags & kConnected))
❶ 从readQueue中拿到第一个写入的数据
// 如果读的queue大于0 (里面装的是我们封装的GCDAsyncReadPacket数据包)
if ([readQueue count] > 0)
{
    // 从readQueue中拿到第一个写入的数据
    currentRead = [readQueue objectAtIndex:0];
    // 移除第一个写入的数据
    [readQueue removeObjectAtIndex:0];
    ...
}
❷ GCDAsyncSpecialPacket这种类型会做TLS认证
// 我们的数据包是否是GCDAsyncSpecialPacket这种类型,这个包里装了TLS的一些设置
if ([currentRead isKindOfClass:[GCDAsyncSpecialPacket class]])
{
    // 如果是这种类型的数据,那么我们就标记flag为正在读取TLS
    flags |= kStartingReadTLS;
    
    // 只有读写都开启了TLS才会做TLS认证
    [self maybeStartTLS];
}
❸ 不是特殊包类型则直接读取数据
else
{
    // 设置读的任务超时,每次延时的时候还会调用 [self doReadData];
    [self setupReadTimerWithTimeout:currentRead->timeout];
    
    // 读取数据
    [self doReadData];
}

b、读的队列没有数据,标记flag为读了没有数据则断开连接状态
else if (flags & kDisconnectAfterReads)
{
    // 如果标记为写然后断开连接
    if (flags & kDisconnectAfterWrites)
    {
        // 如果写的队列为0,而且写为空
        if (([writeQueue count] == 0) && (currentWrite == nil))
        {
            [self closeWithError:nil];// 断开连接
        }
    }
    else
    {
        [self closeWithError:nil];// 断开连接
    }
}

c、如果有安全socket
else if (flags & kSocketSecure)
{
    // 把加密数据从进程缓存区中读取到prebuffer里
    [self flushSSLBuffers];
    
    // 如果可读字节数为0
    if ([preBuffer availableBytes] == 0)
    {
        // 重新恢复读的source。因为每次开始读数据的时候,都会挂起读的source
        [self resumeReadSource];
    }
}

3、可能开启TLS

- (void)maybeStartTLS
{
    // 只有读和写TLS都开启
    if ((flags & kStartingReadTLS) && (flags & kStartingWriteTLS))
    {
            ...
    }
}
a、如果是用CFStream的,则安全传输为NO
// 是否需要安全传输
BOOL useSecureTransport = YES;

// 拿到当前读的数据
GCDAsyncSpecialPacket *tlsPacket = (GCDAsyncSpecialPacket *)currentRead;

// 得到设置字典
NSDictionary *tlsSettings = @{};
if (tlsPacket)
{
    tlsSettings = tlsPacket->tlsSettings;
}

// 拿到Key为CFStreamTLS的value
NSNumber *value = [tlsSettings objectForKey:GCDAsyncSocketUseCFStreamForTLS];
// 如果是用CFStream的,则安全传输为NO
if (value && [value boolValue])
    useSecureTransport = NO;
b、如果使用安全通道则开启TLS
// 如果使用安全通道
if (useSecureTransport)
{
    // 开启TLS
    [self ssl_startTLS];
}
c、CFStream形式的Tls
else
{
    [self cf_startTLS];
}

4、读取数据

static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
{
    [asyncSocket doReadData];// 去读取数据
}
- (void)doReadData
{
    ...
}
a、STEP 1 - READ FROM PREBUFFER
// 当前总读的数据量
NSUInteger totalBytesReadForCurrentRead = 0;

// 读取数据,直接读到界限
bytesToCopy = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
// 或者读到指定长度或者数据包的长度为止
bytesToCopy = [currentRead readLengthForNonTermWithHint:[preBuffer availableBytes]];

// 从上两步拿到我们需要读的长度,去看看有没有空间去存储
[currentRead ensureCapacityForAdditionalDataOfLength:bytesToCopy];

// 拿到我们需要追加数据的指针位置
uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset +
                                                                  currentRead->bytesDone;

// 从prebuffer处复制过来数据,bytesToCopy长度
memcpy(buffer, [preBuffer readBuffer], bytesToCopy);

// 从preBuffer移除掉已经复制的数据
[preBuffer didRead:bytesToCopy];

// 已读的数据加上
currentRead->bytesDone += bytesToCopy;
// 当前已读的数据加上
totalBytesReadForCurrentRead += bytesToCopy;

// 如果已读 == 需要读的长度,说明已经读完
done = (currentRead->bytesDone == currentRead->readLength);
b、STEP 2 - READ FROM SOCKET(从socket中去读取)
// 循环去读
do
{
    // 用ssl方式去读取数据
    result = SSLRead(sslContext, loop_buffer, loop_bytesToRead, &loop_bytesRead);
    
    // 读了的大小加进度
    bytesRead += loop_bytesRead;
}
// 如果没出错,且读的大小小于需要读的大小,就一直循环
while ((result == noErr) && (bytesRead < bytesToRead));
c、开始读取数据,最普通的形式 read
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);

// 加上读的数量
currentRead->bytesDone += bytesRead;
// 把这一次读的数量加上来
totalBytesReadForCurrentRead += bytesRead;
// 判断是否已读完
done = (currentRead->bytesDone == currentRead->readLength);

//检查是否读完
if (done)
{
    //完成这次数据的读取
    [self completeCurrentRead];
}

//如果响应读数据进度的代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadPartialDataOfLength:tag:)])
{
    //代理queue中回调出去
    dispatch_async(delegateQueue, ^{ @autoreleasepool {
        [theDelegate socket:self didReadPartialDataOfLength:totalBytesReadForCurrentRead tag:theReadTag];
    }});
}

Demo

Demo在我的Github上,欢迎下载。
SourceCodeAnalysisDemo

参考文献

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容