原创:知识进阶型文章
创作不易,请珍惜,之后会持续更新,不断完善
个人比较喜欢做笔记和写总结,毕竟好记性不如烂笔头哈哈,这些文章记录了我的IOS成长历程,希望能与大家一起进步
温馨提示:由于简书不支持目录跳转,大家可通过command + F 输入目录标题后迅速寻找到你所需要的内容
目录
- 一、GCDAsyncSocket的成员变量
- 二、创建socket
- 1、提供给外界调用创建socket的接口方法
- 2、层级调用
- 3、最终调用的初始化方法
- 4、在懒加载方法中进行一系列配置
- 三、连接socket
- 1、连接socket方法的具体实现
- 2、包裹在自动释放池中的GCDBlock中的具体内容
- 3、创建全局队列并异步执行代码块中的内容
- 4、实现在连接之前的接口检查方法
- 5、根据interface 得到IPV4和IPV6地址
- 四、连接socket最终调用的方法
- 1、lookup方法的内部实现
- 2、connectWithAddress4方法的内部实现
- 3、创建Socket
- 4、连接Socket的究极方法
- 5、连接成功后的回调:设置一些连接成功的状态
- 6、注册Stream的回调和读stream的回调
- 7、stream与runloop
- 五、连接到服务器后开始读取数据
- 1、用偏移量 maxLength 读取数据
- 2、让读任务离队,开始执行这条读任务
- 3、可能开启TLS
- 4、读取数据
- Demo
- 参考文献
一、GCDAsyncSocket的成员变量
@implementation GCDAsyncSocket
{
...
}
标识当前socket的状态
uint32_t flags;
enum GCDAsyncSocketFlags
{
kSocketStarted = 1 << 0, // If set, socket has been started (accepting/connecting)
kConnected
...
}
ipv4和ipv6的配置
uint16_t config;
enum GCDAsyncSocketConfig
{
kIPv4Disabled = 1 << 0, // If set, IPv4 is disabled
kIPv6Disabled = 1 << 1, // If set, IPv6 is disabled
kPreferIPv6 = 1 << 2, // If set, IPv6 is preferred over IPv4
kAllowHalfDuplexConnection = 1 << 3, // If set, the socket will stay open even if the read stream closes
};
GCDAsyncSocketDelegate属性
__weak id<GCDAsyncSocketDelegate> delegate;// 当连接socket、读写数据、关闭socket的时候进行回调代理属性
dispatch_queue_t delegateQueue;// 代理回调的queue
三种Socket类型对应三种地址
int socket4FD;// 本地IPV4Socket
int socket6FD;// 本地IPV6Socket
int socketUN;// unix域的套接字,用于进程之间通讯
NSURL *socketUrl;// 服务端url
int stateIndex;// 状态Index
NSData * connectInterface4;// 本机的IPV4地址
NSData * connectInterface6;// 本机的IPV6地址
NSData * connectInterfaceUN;// 本机unix域地址
这个类对Socket的操作都在这个串行queue中
dispatch_queue_t socketQueue;
接收源数据的回调事件
dispatch_source_t accept4Source;
dispatch_source_t accept6Source;
dispatch_source_t acceptUNSource;
链接、读写timer用的是GCD定时器
dispatch_source_t connectTimer;
dispatch_source_t readSource;
dispatch_source_t writeSource;
dispatch_source_t readTimer;
dispatch_source_t writeTimer;
connectTimer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, socketQueue);
dispatch_source_set_event_handler(connectTimer, ^{ @autoreleasepool {
[strongSelf doConnectTimeout];
}});
读写数据包数组:类似queue,先读进去的先拿出来(FIFO)最大限制为5个包
NSMutableArray *readQueue;
NSMutableArray *writeQueue;
当前正在读写数据包
GCDAsyncReadPacket *currentRead;
GCDAsyncWritePacket *currentWrite;
当前socket未获取完的数据大小
unsigned long socketFDBytesAvailable;
全局公用的提前缓冲区:读取的数据会填充到Buffer中,填充满了后就会取出Buffer中的数据进行代理回调
GCDAsyncSocketPreBuffer *preBuffer;
读写数据流
CFStreamClientContext streamContext;
CFReadStreamRef readStream;// 读的数据流
CFWriteStreamRef writeStream;// 写的数据流
SSL认证
SSLContextRef sslContext;// SSL上下文,用来做SSL认证
GCDAsyncSocketPreBuffer *sslPreBuffer;// 全局公用的SSL的提前缓冲区
size_t sslWriteCachedLength;
OSStatus sslErrCode;// 记录SSL读取数据错误
OSStatus lastSSLHandshakeError;// 记录SSL握手的错误
socket队列的标识key
void *IsOnSocketQueueOrTargetQueueKey;
// 将socketQueue作上标记方便取出
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);
// 当前线程根据这个标识判断是不是在这个队列
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
{
return delegate;
}
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
{
return delegateQueue;
}
连接备选服务端地址的延时 (另一个IPV4或IPV6)
NSTimeInterval alternateAddressDelay;
alternateAddressDelay = 0.3;// 默认0.3秒
// 用socket和address去连接服务器
[self connectSocket:socketFD address:address stateIndex:aStateIndex];
// 如果有备选地址
if (alternateAddress)
{
// 延迟去连接备选的地址
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{
[self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex];
});
}
二、创建socket
self.socket = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:dispatch_get_global_queue(0, 0)];
1、提供给外界调用创建socket的接口方法
- (instancetype)init;
- (instancetype)initWithSocketQueue:(nullable dispatch_queue_t)sq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq socketQueue:(nullable dispatch_queue_t)sq;
2、层级调用
- (id)init
{
return [self initWithDelegate:nil delegateQueue:NULL socketQueue:NULL];
}
- (id)initWithSocketQueue:(dispatch_queue_t)sq
{
return [self initWithDelegate:nil delegateQueue:NULL socketQueue:sq];
}
- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq
{
return [self initWithDelegate:aDelegate delegateQueue:dq socketQueue:NULL];
}
3、最终调用的初始化方法
- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq socketQueue:(dispatch_queue_t)sq
{
if((self = [super init]))
{
...
}
return self;
}
❶ 创建socket,属性值先都置为 -1
socket4FD = SOCKET_NULL;// 本机的ipv4
socket6FD = SOCKET_NULL;
socketUN = SOCKET_NULL;
socketUrl = nil;
stateIndex = 0;// 状态Index
❷ 如果scoketQueue存在却是global的则报错,没有的话创建一个scoketQueue(名字为:GCDAsyncSocket,NULL表示是串行队列)
if (sq)
{
// 断言必须要一个非并行queue
NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
@"The given socketQueue parameter must not be a concurrent queue.");
socketQueue = sq;
}
else
{
socketQueue = dispatch_queue_create([GCDAsyncSocketQueueName UTF8String], NULL);
}
❸ 给当前队里加一个标识
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);
❹ 初始化读写数组,限制大小为5
readQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentRead = nil;
writeQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentWrite = nil;
❺ 设置缓冲区大小为4kb
preBuffer = [[GCDAsyncSocketPreBuffer alloc] initWithCapacity:(1024 * 4)];
❻ 设置连接备选服务端地址的延时为0.3秒
alternateAddressDelay = 0.3;
4、在懒加载方法中进行一系列配置
delegate
- (id)delegate
{
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))// 当前队列已经是同步串行
{
return delegate;
}
else// 否则让其处于同步串行
{
__block id result;
// 同步串行队列:保证socket顺序安全,因为需要先建立链接才能读写数据,存在顺序性
dispatch_sync(socketQueue, ^{
result = self->delegate;
});
return result;
}
}
delegateQueue
- (dispatch_queue_t)delegateQueue
{
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))// 当前代理队列已经是同步串行
{
return delegateQueue;
}
else// 否则让其处于同步串行
{
__block dispatch_queue_t result;
// 保证代理数据回调顺序安全
dispatch_sync(socketQueue, ^{
result = self->delegateQueue;
});
return result;
}
}
三、连接socket
[self.socket connectToHost:@"127.0.0.1" onPort:8090 withTimeout:-1 error:&error];
1、连接socket方法的具体实现
- (BOOL)connectToHost:(NSString *)inHost
onPort:(uint16_t)port
viaInterface:(NSString *)inInterface
withTimeout:(NSTimeInterval)timeout
error:(NSError **)errPtr
{
...
}
❶ 拿到host和interface,通过copy防止外界传入值被修改
很可能给我们的服务端的参数是一个可变字符串,所以我们需要copy
,在Block
里同步执行,这样就不需要担心它被改变。
NSString *host = [inHost copy];
NSString *interface = [inInterface copy];
❷ 包裹在自动释放池中的GCDBlock
- 大量临时变量(
connect
: 重连) - 自定义线程管理 (
NSOperation
)
dispatch_block_t block = ^{ @autoreleasepool {
...
}
❸ 在socketQueue中执行这个Block,否则同步的调起这个queue去执行
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();// 在socketQueue中执行这个Block
else
dispatch_sync(socketQueue, block);// 否则同步的调起这个queue去执行
❹ 如果发生了错误,则将错误进行赋值
__block NSError *preConnectErr = nil;// 错误信息
...
f (errPtr) *errPtr = preConnectErr;
❺ 把连接是否成功的result返回
__block BOOL result = NO;// 返回结果
...
return result;
2、包裹在自动释放池中的GCDBlock中的具体内容
dispatch_block_t block = ^{ @autoreleasepool {
...
}
❶ 对host服务器地址有效性进行校验
return_from_block
这个宏其实就是return
。host
长度为0本来应该直接返回 但是目前在GCDBlock
中具有异步性即编译器还没有识别到return
的时候就直接跳过去了顺序执行下面内容,所以采用在预编译期执行的宏定义这种方式防止此类错误的发生。其实这种情况很少发生,因为系统对return
已经进行过优化防止此类情况发生。
#define return_from_block return
if ([host length] == 0)
{
NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string.";
preConnectErr = [self badParamError:msg];
return_from_block;
}
❷ 在连接之前的接口检查,一般我们传nil
// interface表示本机的IP端口
if (![self preConnectWithInterface:interface error:&preConnectErr])
{
return_from_block;
}
❸ flags做或等运算:将flags标识为开始Socket连接
self->flags |= kSocketStarted;
❹ 创建全局队列并异步执行
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(...);
❺ 启连接超时并返回YES
[self startConnectTimeout:timeout];
result = YES;
3、创建全局队列并异步执行代码块中的内容
dispatch_async(globalConcurrentQueue, ^{ @autoreleasepool {
}});
❶ 获取server地址数组(包含IPV4 IPV6的地址)
NSError *lookupErr = nil;// 查找错误
// lookupHost方法用来获取服务器地址,内部实现和获取本地地址大同小异,不再赘述
NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr];
❷ 如果有错
if (lookupErr)// 如果有错
{
// 使用scocketQueue
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
// 一些错误处理,比如清空一些数据等等
[strongSelf lookup:aStateIndex didFail:lookupErr];
}});
}
❸ 没有发生错误的话则遍历地址数组得到IPV4和IPV6地址
NSData *address4 = nil;
NSData *address6 = nil;
for (NSData *address in addresses)// 遍历地址数组
{
// 判断是否address4为空且address为IPV4
if (!address4 && [[self class] isIPv4Address:address])
{
address4 = address;
}
// 判断是否address6为空且address为IPV6
else if (!address6 && [[self class] isIPv6Address:address])
{
address6 = address;
}
}
❹ 在socketQueue同步队列中异步去发起连接以保证顺序执行且不堵塞线程
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
[strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});
4、实现在连接之前的接口检查方法
- (BOOL)preConnectWithInterface:(NSString *)interface error:(NSError **)errPtr
{
...
return YES
}
❶ 先断言,如果当前的queue不是初始化的quueue,直接报错
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
❷ 进行一系列有效性 判断
&
表示位与运算,因为枚举是用左位移<<
运算定义的,所以可以用&
来判断config
包不包含某个枚举。因为一个值可能包含好几个枚举值,所以这时候不能用==
来判断,只能用&
来判断。
if (delegate == nil)// 无代理
if (delegateQueue == NULL) // 没有代理queue
if (![self isDisconnected])// 当前不是非连接状态(即已经连接了所以没必要再进行连接)
// 是否支持IPV4 IPV6
BOOL isIPv4Disabled = (config & kIPv4Disabled) ? YES : NO;
BOOL isIPv6Disabled = (config & kIPv6Disabled) ? YES : NO;
if (isIPv4Disabled && isIPv6Disabled) // 是否都不支持
{
if (errPtr)
{
NSString *msg = @"Both IPv4 and IPv6 have been disabled. Must enable at least one protocol first.";
*errPtr = [self badConfigError:msg];
}
return NO;
}
❸ 如果有interface本机地址(一般传入的是nil)
if (interface)
{
NSMutableData *interface4 = nil;
NSMutableData *interface6 = nil;
// 得到本机的IPV4和IPV6地址
[self getInterfaceAddress4:&interface4 address6:&interface6 fromDescription:interface port:0];
// 如果两者都为nil
if ((interface4 == nil) && (interface6 == nil))
if (isIPv4Disabled && (interface6 == nil))
if (isIPv6Disabled && (interface4 == nil))
// 如果都没问题则赋值到成员变量中
connectInterface4 = interface4;
connectInterface6 = interface6;
}
5、根据interface 得到IPV4和IPV6地址
- (void)getInterfaceAddress4:(NSMutableData **)interfaceAddr4Ptr
address6:(NSMutableData **)interfaceAddr6Ptr
fromDescription:(NSString *)interfaceDescription
port:(uint16_t)port
{
NSMutableData *addr4 = nil;
NSMutableData *addr6 = nil;
NSString *interface = nil;
...
}
❶ 先用:分割(比如https://www.127.0.0.1)
NSArray *components = [interfaceDescription componentsSeparatedByString:@":"];
if ([components count] > 0)
{
NSString *temp = [components objectAtIndex:0];
if ([temp length] > 0)
{
interface = temp;
}
}
❷ 拿到port端口
if ([components count] > 1 && port == 0)
{
// 将一个字符串根据base参数转成长整型,如base值为10则采用10进制
NSString *temp = [components objectAtIndex:1];
long portL = strtol([temp UTF8String], NULL, 10);
// UINT16_MAX:65535最大端口号
if (portL > 0 && portL <= UINT16_MAX)
{
port = (uint16_t)portL;
}
}
❸ interface为空则自己创建一个interface
if (interface == nil)
{
// 生成一个地址结构体
struct sockaddr_in sockaddr4;
// memset的作用是在一段内存块中填充某个给定的值,它是对较大的结构体或数组进行清零操作的一种最快方法
memset(&sockaddr4, 0, sizeof(sockaddr4));
sockaddr4.sin_len = sizeof(sockaddr4);// 结构体长度
sockaddr4.sin_family = AF_INET;
sockaddr4.sin_port = htons(port);// 端口号
sockaddr4.sin_addr.s_addr = htonl(INADDR_ANY);// 0.0.0.0表示不确定地址或者任意地址
// sockaddr6同上
...
// 把这两个结构体转成data
addr4 = [NSMutableData dataWithBytes:&sockaddr4 length:sizeof(sockaddr4)];
addr6 = [NSMutableData dataWithBytes:&sockaddr6 length:sizeof(sockaddr6)];
}
❹ 如果是localhost、loopback(回环地址)就赋值为127.0.0.1
else if ([interface isEqualToString:@"localhost"] || [interface isEqualToString:@"loopback"])
{
sockaddr4.sin_addr.s_addr = htonl(INADDR_LOOPBACK);// 127.0.0.1
// sockaddr6同上
...
addr4 = [NSMutableData dataWithBytes:&sockaddr4 length:sizeof(sockaddr4)];
addr6 = [NSMutableData dataWithBytes:&sockaddr6 length:sizeof(sockaddr6)];
}
❺ 非localhost、loopback则去获取本机IP,看是否和传进来Interface是同名或者同IP,相同才给赋端口号,把数据封装进Data,否则为nil
else
{
const char *iface = [interface UTF8String];// 转成cString
struct ifaddrs *addrs;// 定义结构体指针,这个指针是本地IP
const struct ifaddrs *cursor;
// 获取到本机IP,为0说明成功了
if ((getifaddrs(&addrs) == 0))
{
cursor = addrs;// 赋值
while (cursor != NULL)// 如果IP不为空,则循环链表去设置
{
// 如果 addr4 IPV4地址为空,而且地址类型为IPV4
if ((addr4 == nil) && (cursor->ifa_addr->sa_family == AF_INET))
{
...
}
else if ((addr6 == nil) && (cursor->ifa_addr->sa_family == AF_INET6))// IPV6同上
{
...
}
// 指向链表下一个addr
cursor = cursor->ifa_next;
}
}
}
❻ 如果 addr4 IPV4地址为空,而且地址类型为IPV4
// IPv4
struct sockaddr_in nativeAddr4;
// memcpy内存copy函数,把src开始到size的字节数copy到dest中
memcpy(&nativeAddr4, cursor->ifa_addr, sizeof(nativeAddr4));
// 比较两个字符串是否相同:本机的IP名和接口interface是否相同
if (strcmp(cursor->ifa_name, iface) == 0)
{
// 相同则赋值 port
nativeAddr4.sin_port = htons(port);
// 用data封号IPV4地址
addr4 = [NSMutableData dataWithBytes:&nativeAddr4 length:sizeof(nativeAddr4)];
}
else// 本机IP名和interface不相同
{
// 声明一个IP 16位的数组
char ip[INET_ADDRSTRLEN];
// 这里是转成了10进制(因为获取到的是二进制IP)
const char *conversion = inet_ntop(AF_INET, &nativeAddr4.sin_addr, ip, sizeof(ip));
// 如果conversion不为空说明转换成功,比较转换后的IP和interface是否相同
if ((conversion != NULL) && (strcmp(ip, iface) == 0))
{
// 相同则赋值 port
nativeAddr4.sin_port = htons(port);
addr4 = [NSMutableData dataWithBytes:&nativeAddr4 length:sizeof(nativeAddr4)];
}
}
❼ 如果这两个二级指针存在,则取成一级指针,把addr4赋值给它
if (interfaceAddr4Ptr) *interfaceAddr4Ptr = addr4;
if (interfaceAddr6Ptr) *interfaceAddr6Ptr = addr6;
四、连接socket最终调用的方法
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
[strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});
1、lookup方法的内部实现
- (void)lookup:(int)aStateIndex didSucceedWithAddress4:(NSData *)address4 address6:(NSData *)address6
{
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
// 至少有一个server服务器地址
NSAssert(address4 || address6, @"Expected at least one valid address");
// 如果状态不一致,说明断开连接
if (aStateIndex != stateIndex)
{
LogInfo(@"Ignoring lookupDidSucceed, already disconnected");
return;
}
// 有效性检查
if (isIPv4Disabled && (address6 == nil))
if (isIPv6Disabled && (address4 == nil))
// 调用连接方法,如果失败,则返回错误信息
NSError *err = nil;
if (![self connectWithAddress4:address4 address6:address6 error:&err])
{
[self closeWithError:err];
}
}
2、connectWithAddress4方法的内部实现
- (BOOL)connectWithAddress4:(NSData *)address4 address6:(NSData *)address6 error:(NSError **)errPtr
{
...
return YES;
}
❶ 决定socket类型
// 判断是否倾向于IPV6
BOOL preferIPv6 = (config & kPreferIPv6) ? YES : NO;
// 如果有IPV4地址,创建IPV4 Socket
if (address4)
{
socket4FD = [self createSocket:AF_INET connectInterface:connectInterface4 errPtr:errPtr];
}
// 如果有IPV6地址,创建IPV6 Socket
if (address6)
{
socket6FD = [self createSocket:AF_INET6 connectInterface:connectInterface6 errPtr:errPtr];
}
// 如果都为空,直接返回
if (socket4FD == SOCKET_NULL && socket6FD == SOCKET_NULL)
{
return NO;
}
❷ 主选与备选
int socketFD, alternateSocketFD;// 主选socketFD,备选alternateSocketFD
NSData *address, *alternateAddress;// 主选地址和备选地址
// 主选IPV6
if ((preferIPv6 && socket6FD != SOCKET_NULL) || socket4FD == SOCKET_NULL)
{
socketFD = socket6FD;
alternateSocketFD = socket4FD;
address = address6;
alternateAddress = address4;
}
// 主选IPV4
else
{
socketFD = socket4FD;
alternateSocketFD = socket6FD;
address = address4;
alternateAddress = address6;
}
// 拿到当前状态
int aStateIndex = stateIndex;
❸ 用socket和address去连接服务器
[self connectSocket:socketFD address:address stateIndex:aStateIndex];
❹ 如果有备选地址则延迟去连接备选的地址
if (alternateAddress)
{
// 延迟去连接备选的地址
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(alternateAddressDelay * NSEC_PER_SEC)), socketQueue, ^{
[self connectSocket:alternateSocketFD address:alternateAddress stateIndex:aStateIndex];
});
}
3、创建Socket
- (int)createSocket:(int)family connectInterface:(NSData *)connectInterface errPtr:(NSError **)errPtr
{
// 用SOCK_STREAM TCP流创建socket
int socketFD = socket(family, SOCK_STREAM, 0);
// 如果创建失败
if (socketFD == SOCKET_NULL)
{
if (errPtr)
*errPtr = [self errorWithErrno:errno reason:@"Error in socket() function"];
return socketFD;
}
// 和connectInterface绑定
if (![self bindSocket:socketFD toInterface:connectInterface error:errPtr])
{
// 绑定失败,直接关闭返回
[self closeSocket:socketFD];
return SOCKET_NULL;
}
return socketFD;
}
4、连接Socket的究极方法
- (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex
{
...
}
❶ 发现已连接则关闭连接直接返回
if (self.isConnected)
{
[self closeSocket:socketFD];
return;
}
❷ 在全局Queue中开启新线程进行连接
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(globalConcurrentQueue, ^{
...
});
❸ 调用connect方法,该函数会阻塞线程,所以要通过异步方式执行并开启新线程
客户端向特定网络地址的服务器发送连接请求,连接成功返回0,失败返回 -1。
int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
❹ 连接成功
// 在socketQueue中开辟线程,即在同步队列中异步执行
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
if (result == 0)// 连接成功
{
// 关闭掉另一个没用的socket
[self closeUnusedSocket:socketFD];
// 调用didConnect:生成stream并改变状态
[strongSelf didConnect:aStateIndex];
}
else// 连接失败
{
...
}
}});
❺ 连接失败
// 关闭当前socket
[strongSelf closeSocket:socketFD];
// 返回连接错误的error
if (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL)
{
NSError *error = [strongSelf errorWithErrno:err reason:@"Error in connect() function"];
[strongSelf didNotConnect:aStateIndex error:error];
}
5、连接成功后的回调:设置一些连接成功的状态
- (void)didConnect:(int)aStateIndex
{
...
}
❶ 调用回调前的准备工作
// 状态不同
if (aStateIndex != stateIndex)
// 将kConnected合并到当前flag中
flags |= kConnected;
// 停止超时连接
[self endConnectTimeout];
// 创建个Block来初始化Stream
dispatch_block_t SetupStreamsPart1 = ^{
// 创建读写stream失败,则关闭并报对应错误
if (![self createReadAndWriteStream])
// 参数NO表示有可读bytes的时候不会调用回调函数
if (![self registerForStreamCallbacksIncludingReadWrite:NO])
};
// 创建个Block来设置stream
dispatch_block_t SetupStreamsPart2 = ^{
if (![self addStreamsToRunLoop])// 如果加到runloop上失败
if (![self openStreams])// 打开读写stream
};
// 拿到server端的host和port
NSString *host = [self connectedHost];
uint16_t port = [self connectedPort];
__strong id<GCDAsyncSocketDelegate> theDelegate = delegate;// 拿到代理
❷ 调用回调函数
// 代理队列和Host不为nil且响应didConnectToHost代理方法
if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
{
// 调用初始化stream
SetupStreamsPart1();
dispatch_async(delegateQueue, ^{ @autoreleasepool {
// 到代理队列调用连接成功的代理方法
[theDelegate socket:self didConnectToHost:host port:port];
dispatch_async(self->socketQueue, ^{ @autoreleasepool {
// 然后回到socketQueue中去执行设置stream
SetupStreamsPart2();
}});
}});
}
❸ 初始化读写source并开始下一个任务
// 初始化读写source
[self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];
// 开始下一个任务
[self maybeDequeueRead];
[self maybeDequeueWrite];
6、注册Stream的回调和读stream的回调
a、注册Stream的回调
- (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
{
// 设置一个CF的flag:一种是错误发生的时候,一种是stream事件结束
CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
if (includeReadWrite)// 如果包含读写
readStreamEvents |= kCFStreamEventHasBytesAvailable;// 仍然有Bytes要读
// 给读stream设置客户端,会在之前设置的那些标记下回调函数CFReadStreamCallback
if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))// 客户端stream上下文对象
{
return NO;
}
// 写的flag同上
}
b、读stream的回调
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
{
// 得到触发回调的sokcet
GCDAsyncSocket *asyncSocket = (__bridge GCDAsyncSocket *)pInfo;
switch(type)
{
case kCFStreamEventHasBytesAvailable:// 如果是可读数据的回调
{
// 在socketQueue中调用
dispatch_async(asyncSocket->socketQueue, ^{ @autoreleasepool {
asyncSocket->flags |= kSecureSocketHasBytesAvailable;
[asyncSocket doReadData];// 去读取数据
}});
break;
}
...
}
}
7、客户端的runloop绑定stream
让stream
随着runloop
不停转动,只要stream
流一发生改变就能立刻更新数据。
- (void)didConnect:(int)aStateIndex
{
// 创建个Block来设置stream
dispatch_block_t SetupStreamsPart2 = ^{
if (![self addStreamsToRunLoop])// 如果加到runloop上失败
}
a、把stream添加到runloop上
- (BOOL)addStreamsToRunLoop
{
// 判断flag里是否包含kAddedStreamsToRunLoop,没添加过则添加
if (!(flags & kAddedStreamsToRunLoop))
{
[[self class] startCFStreamThreadIfNeeded];
// 在开启的线程中去执行(阻塞式的)
dispatch_sync(cfstreamThreadSetupQueue, ^{
[[self class] performSelector:@selector(scheduleCFStreams:)
onThread:cfstreamThread
withObject:self
waitUntilDone:YES];
});
// 添加标识
flags |= kAddedStreamsToRunLoop;
}
return YES;
}
b、注册CFStream
+ (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
{
// 获取到runloop
CFRunLoopRef runLoop = CFRunLoopGetCurrent();
// 如果有readStream
if (asyncSocket->readStream)
// 注册readStream在runloop的kCFRunLoopDefaultMode上
CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
// 同上
if (asyncSocket->writeStream)
CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
}
五、连接到服务器后开始读取数据
// 已经连接到服务器
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(nonnull NSString *)host port:(uint16_t)port
{
NSLog(@"连接成功,主机:%@,端口:%d",host,port);
[self.socket readDataWithTimeout:-1 tag:10086];
}
1、用偏移量 maxLength 读取数据
- (void)readDataWithTimeout:(NSTimeInterval)timeout tag:(long)tag
{
[self readDataWithTimeout:timeout buffer:nil bufferOffset:0 maxLength:0 tag:tag];
}
- (void)readDataWithTimeout:(NSTimeInterval)timeout
buffer:(NSMutableData *)buffer
bufferOffset:(NSUInteger)offset
tag:(long)tag
{
[self readDataWithTimeout:timeout buffer:buffer bufferOffset:offset maxLength:0 tag:tag];
}
- (void)readDataWithTimeout:(NSTimeInterval)timeout
buffer:(NSMutableData *)buffer
bufferOffset:(NSUInteger)offset
maxLength:(NSUInteger)length
tag:(long)tag
{
// 初始化packet(读取的数据都是一个个的包)
GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer
// 往读的队列添加任务,任务是包的形式
[self->readQueue addObject:packet];
// 让读任务离队,开始执行这条读任务
[self maybeDequeueRead];
}
2、让读任务离队,开始执行这条读任务
- (void)maybeDequeueRead
{
...
}
a、如果当前读的包为空,而且flag为已连接
if ((currentRead == nil) && (flags & kConnected))
❶ 从readQueue中拿到第一个写入的数据
// 如果读的queue大于0 (里面装的是我们封装的GCDAsyncReadPacket数据包)
if ([readQueue count] > 0)
{
// 从readQueue中拿到第一个写入的数据
currentRead = [readQueue objectAtIndex:0];
// 移除第一个写入的数据
[readQueue removeObjectAtIndex:0];
...
}
❷ GCDAsyncSpecialPacket这种类型会做TLS认证
// 我们的数据包是否是GCDAsyncSpecialPacket这种类型,这个包里装了TLS的一些设置
if ([currentRead isKindOfClass:[GCDAsyncSpecialPacket class]])
{
// 如果是这种类型的数据,那么我们就标记flag为正在读取TLS
flags |= kStartingReadTLS;
// 只有读写都开启了TLS才会做TLS认证
[self maybeStartTLS];
}
❸ 不是特殊包类型则直接读取数据
else
{
// 设置读的任务超时,每次延时的时候还会调用 [self doReadData];
[self setupReadTimerWithTimeout:currentRead->timeout];
// 读取数据
[self doReadData];
}
b、读的队列没有数据,标记flag为读了没有数据则断开连接状态
else if (flags & kDisconnectAfterReads)
{
// 如果标记为写然后断开连接
if (flags & kDisconnectAfterWrites)
{
// 如果写的队列为0,而且写为空
if (([writeQueue count] == 0) && (currentWrite == nil))
{
[self closeWithError:nil];// 断开连接
}
}
else
{
[self closeWithError:nil];// 断开连接
}
}
c、如果有安全socket
else if (flags & kSocketSecure)
{
// 把加密数据从进程缓存区中读取到prebuffer里
[self flushSSLBuffers];
// 如果可读字节数为0
if ([preBuffer availableBytes] == 0)
{
// 重新恢复读的source。因为每次开始读数据的时候,都会挂起读的source
[self resumeReadSource];
}
}
3、可能开启TLS
- (void)maybeStartTLS
{
// 只有读和写TLS都开启
if ((flags & kStartingReadTLS) && (flags & kStartingWriteTLS))
{
...
}
}
a、如果是用CFStream的,则安全传输为NO
// 是否需要安全传输
BOOL useSecureTransport = YES;
// 拿到当前读的数据
GCDAsyncSpecialPacket *tlsPacket = (GCDAsyncSpecialPacket *)currentRead;
// 得到设置字典
NSDictionary *tlsSettings = @{};
if (tlsPacket)
{
tlsSettings = tlsPacket->tlsSettings;
}
// 拿到Key为CFStreamTLS的value
NSNumber *value = [tlsSettings objectForKey:GCDAsyncSocketUseCFStreamForTLS];
// 如果是用CFStream的,则安全传输为NO
if (value && [value boolValue])
useSecureTransport = NO;
b、如果使用安全通道则开启TLS
// 如果使用安全通道
if (useSecureTransport)
{
// 开启TLS
[self ssl_startTLS];
}
c、CFStream形式的Tls
else
{
[self cf_startTLS];
}
4、读取数据
static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
{
[asyncSocket doReadData];// 去读取数据
}
- (void)doReadData
{
...
}
a、STEP 1 - READ FROM PREBUFFER
// 当前总读的数据量
NSUInteger totalBytesReadForCurrentRead = 0;
// 读取数据,直接读到界限
bytesToCopy = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
// 或者读到指定长度或者数据包的长度为止
bytesToCopy = [currentRead readLengthForNonTermWithHint:[preBuffer availableBytes]];
// 从上两步拿到我们需要读的长度,去看看有没有空间去存储
[currentRead ensureCapacityForAdditionalDataOfLength:bytesToCopy];
// 拿到我们需要追加数据的指针位置
uint8_t *buffer = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset +
currentRead->bytesDone;
// 从prebuffer处复制过来数据,bytesToCopy长度
memcpy(buffer, [preBuffer readBuffer], bytesToCopy);
// 从preBuffer移除掉已经复制的数据
[preBuffer didRead:bytesToCopy];
// 已读的数据加上
currentRead->bytesDone += bytesToCopy;
// 当前已读的数据加上
totalBytesReadForCurrentRead += bytesToCopy;
// 如果已读 == 需要读的长度,说明已经读完
done = (currentRead->bytesDone == currentRead->readLength);
b、STEP 2 - READ FROM SOCKET(从socket中去读取)
// 循环去读
do
{
// 用ssl方式去读取数据
result = SSLRead(sslContext, loop_buffer, loop_bytesToRead, &loop_bytesRead);
// 读了的大小加进度
bytesRead += loop_bytesRead;
}
// 如果没出错,且读的大小小于需要读的大小,就一直循环
while ((result == noErr) && (bytesRead < bytesToRead));
c、开始读取数据,最普通的形式 read
ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
// 加上读的数量
currentRead->bytesDone += bytesRead;
// 把这一次读的数量加上来
totalBytesReadForCurrentRead += bytesRead;
// 判断是否已读完
done = (currentRead->bytesDone == currentRead->readLength);
//检查是否读完
if (done)
{
//完成这次数据的读取
[self completeCurrentRead];
}
//如果响应读数据进度的代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didReadPartialDataOfLength:tag:)])
{
//代理queue中回调出去
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didReadPartialDataOfLength:totalBytesReadForCurrentRead tag:theReadTag];
}});
}
Demo
Demo在我的Github上,欢迎下载。
SourceCodeAnalysisDemo