前言
直播迎来了又一个爆发,抖音淘宝的直播带货,公司也搞了一个直播功能,用的阿里云的SDK,推流都是他们内部实现的,因此项目上线后空闲之余,研究了一下直播推流的流程,这里记录一下,后边会补一篇文章来记录集成阿里云直播的实现及走过的坑
推流思路概述
推流整个过程可以看做现实生活中水泵到水龙头的过程,首先得建立水泵站和安装水龙头,再建立管道,然后安装连接起来,这时候水泵 -> 管道 -> 水龙头就可以传输, 这时候就需要引入水,最后打开水泵和水龙头,整个推流的过程就完成了
流程
1. 初始化NSStream, 建立Socket流 (建立水泵站和安装水龙头)
2. RTMP三次握手 (建立管道)
3. NSConnect建立连接 (安装连接)
4. 基于RTMP的message格式创建数据流 (引入水)
5. publish推流 (打开水泵和水龙头)
代码分析
-
建立Socket流
如果你已经知道了一个主机的DNS名字或者IP地址,使用 Core Foundation来读取或者发送数据流通过 CFStreamCreatePairWithSocketToHost函数。你可以充分利用CFStream和NSStream的无缝转换。把CFReadStreamRef和CFWriteStreamRef对象转换为NSInputStream和 NSOutputStream对象。当你得到输出路和输入流以后,在没有用arc的情况下你必须马上占有他们,把他们引用到一个NSInputStream和NSOutputStream对象,并且设置他们的代理对象(这个对象需要实现NSStreamDelegate协议)。通过调用open方法在当前运行时循环上执行他们
- (void)connectToServer:(NSString *)host port:(UInt32)port;{
if (self.streamStatus > 0) {
[self close];
}
//输入流,用来读取数据
CFReadStreamRef readStream;
//输出流,用来发送数据
CFWriteStreamRef writeStream;
if (port <= 0) {
//RTMP默认端口,1935
port = 1935;
}
//建立socket连接
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);
//注意__bridge_transfer,转移对象的内存管理权
_inputStream = (__bridge_transfer NSInputStream *)readStream;
_outputStream = (__bridge_transfer NSOutputStream *)writeStream;
//设置代理
_outputStream.delegate = self;
_inputStream.delegate = self;
[_outputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSRunLoopCommonModes];
[_inputStream scheduleInRunLoop:[NSRunLoop mainRunLoop] forMode:NSRunLoopCommonModes];
//打开输入流
[_inputStream open];
//打开输出流
[_outputStream open];
}
-
RTMP三次握手
RTMP(Real Time Messaging Protocol)是被Flash用于对象,视频,音频的传输.这个协议建立在TCP协议或者轮询HTTP协议之上,因此也需要通过三次握手来连接,客户端要向服务器发送C0, C1, C2(按序)三个chunk,服务器向客户端发送S0, S1,S2(按序)三个chunk,然后才能进行有效的信息传输。RTMP协议本身并没有规定这6个Message的具体传输顺序,但RTMP协议的实现者需要保证这几点
- 握手从客户端发送C0和C1块开始
- 客户端要等收到S1之后才能发送C2
- 客户端要等收到S2之后才能发送其他信息(控制信息和真实音视频等数据)
- 服务端要等到收到C0之后发送S1
- 服务端必须等到收到C1之后才能发送S2
- 服务端必须等到收到C2之后才能发送其他信息(控制信息和真实音视频等数据)
理论状态如图所示
Uninitialized:在此阶段发送协议版本。客户端和服务器都没有初始化。客户端以数据包C0的形式发送协议版本。如果服务器支持该版本,它将发送S0和S1作为响应。否则,服务器将通过采取适当的操作进行响应。在RTMP中,此操作正在终止连接。
Version Sent:在Uninitialized状态之后,客户端和服务器都处于Version Sent状态。客户端在等待包S1,服务器在等待包C1。当接收到等待的包时,客户端发送包C2,服务器发送包S2。然后状态变为Ack Sent。
Ack Sent:客户端和服务器分别等待S2和C2。
Handshake Done:客户机和服务器处于可以交换消息的状态
项目实际中的开发顺序
1. Client -> Server,C0 + C1
2. Server -> Client,S0 + S1 + S2
3. Client -> Server,C2
- 客户端发送 C0 + C1 , 其中C0为一个字节0x03, C1为1536个字节
- (void)handshake0 {
self.rtmpStatus = SGRtmpSessionStatusHandshake0;
//c0
char c0Byte = 0x03;
NSData *c0 = [NSData dataWithBytes:&c0Byte length:1];
[self writeData:c0];
[self handshake1];
}
- (void)handshake1 {
//c1
uint8_t *c1Bytes = (uint8_t *)malloc(1536);
memset(c1Bytes, 0, 4 + 4);
NSData *c1 = [NSData dataWithBytes:c1Bytes length:1536];
free(c1Bytes);
[self writeData:c1];
}
在C0中,该字段标识客户端请求的RTMP版本。在S0中,该字段标识由服务器选择的RTMP版本。本规范定义的版本是3。0-2是早期专有产品使用,现已弃用;4-31预留用于未来使用;32-255不允许使用(以区分RTMP和文本协议,因为它总是以一个可打印字符为开始)。当识别不出客户端请求的版本时,服务器应该使用3来响应。客户端可以选择降级到版本3,或者放弃握手
在C1中,包含time (4个字节)、zero(四个字节)、 randomData(1528字节)
- 服务端收到C0和C1发送 S0 + S1 + S2, 客户端接收到S1后Copy一份
case SGRtmpSessionStatusHandshake0:{
uint8_t s0;
[data getBytes:&s0 length:1];
if (s0 == 0x03) {//s0
self.rtmpStatus = SGRtmpSessionStatusHandshake1;
if (data.length > 1) {//后面还有数据,但不确定长度
data = [data subdataWithRange:NSMakeRange(1, data.length -1)];
self.handshake = data.mutableCopy;
} else {
break;
}
} else {
NSLog(@"握手失败");
break;
}
}
case SGRtmpSessionStatusHandshake1:{
if (self.handshake.length >= kRTMPSignatureSize) {//s1
[self handshake1];
if (self.handshake.length > kRTMPSignatureSize) {//>
NSData *subData = [self.handshake subdataWithRange:NSMakeRange(kRTMPSignatureSize, self.handshake.length - kRTMPSignatureSize)];
self.handshake = subData.mutableCopy;
} else {// =
self.handshake = [NSMutableData data];
break;
}
} else {// <
break;
}
}
case SGRtmpSessionStatusHandshake2:{//s2
if (data.length >= kRTMPSignatureSize) {
NSLog(@"握手完成");
self.rtmpStatus = SGRtmpSessionStatusHandshakeComplete;
[self sendConnectPacket];
}
break;
}
}
3.客户端发送C2
C2和S2数据包长度均为1536个八位的字节,几乎分别类似于S1和C1的原样返回, 包含time、time2、randomData(1528字节)
- (void)handshake2 {
self.rtmpStatus = SGRtmpSessionStatusHandshake2;
NSData *s1 = [self.handshake subdataWithRange:NSMakeRange(0, kRTMPSignatureSize)];
//c2
uint8_t *s1Bytes = (uint8_t *)s1.bytes;
memset(s1Bytes + 4, 0, 4);
NSData *c2 = [NSData dataWithBytes:s1Bytes length:s1.length];
[self writeData:c2];
}
-
NSConnect建立连接
1.客户端发送命令消息中的“连接”(connect)到服务器,请求与一个服务应用实例建立连接。
2.服务端收到connect命令后,服务器会发送协议消息"Window Acknowledgement size"消息到客户端,服务端同时连接到connect中请求的application
3.服务端发送协议消息"Set Peer BandWidth"到客户端
4.客户端在处理完服务端发来的"Set Peer BandWidth"消息后,向服务端发送"Window Acknowledgment"消息
5.服务端向客户端发送一条用户控制消息(Stream Begin), 如果连接成功,服务端向客户端发送_result消息,否则发送_error消息。
如图所示,客户端需要做的操作为向服务端发送connect消息和发送Window Acknowledgment消息
1. 握手完成后,客户端就会向服务端发送connect消息 , connect消息的格式按照RTMP Header + RTMP Body的格式组织
RTMP Header:
Type ID (消息的类型Id,1个字节) + StreamID (消息的流ID)
RTMP Body:
字段 | 类型 | 说明 |
---|---|---|
Command Name(命令名字) | String | 命令的名字,如”connect” |
Transaction ID(事务ID) | Number | 恒为1 |
Command Object(命令包含的参数对象) | Object | 键值对集合表示的命令参数 |
Optional User Arguments(额外的用户参数) | Object | 用户自定义的额外信息 |
- (void)sendConnectPacket {
// RTMP Header
RTMPChunk_0 metadata = {0};
metadata.msg_stream_id = SGStreamIDInvoke;
metadata.msg_type_id = SGMSGTypeID_INVOKE;
// RTMP Body = Command Name + Transaction ID + Command Object
NSString *url;
NSMutableData *buff = [NSMutableData data];
if (_url.port > 0) {
url = [NSString stringWithFormat:@"%@://%@:%zd/%@",_url.scheme,_url.host,_url.port,_url.app];
}else{
url = [NSString stringWithFormat:@"%@://%@/%@",_url.scheme,_url.host,_url.app];
}
// Command Name
[buff appendString:@"connect"];
// Transaction ID
[buff appendDouble:++_numOfInvokes];
self.trackedCommands[@(_numOfInvokes)] = @"connect";
// Command Object
[buff appendByte:kAMFObject];
[buff putKey:@"app" stringValue:_url.app]; // 客户端要链接到rtmp服务器的应用程序
[buff putKey:@"type" stringValue:@"nonprivate"]; // flashVer表示flash播放器的版本号
[buff putKey:@"tcUrl" stringValue:url]; // 服务器的URL地址
[buff putKey:@"fpad" boolValue:NO]; // 是否使用代理
[buff putKey:@"capabilities" doubleValue:15.]; // 值为15,即capabilities的object类型
[buff putKey:@"audioCodecs" doubleValue:10.]; // audioCodes表示支持的音频编码格式
[buff putKey:@"videoCodecs" doubleValue:7.]; // videoCodecs表示支持的视频频编码格式
[buff putKey:@"videoFunction" doubleValue:1.]; // videoFunction的值为1表示客户端可以执行精确到帧的搜索
[buff appendByte16:0];
[buff appendByte:kAMFObjectEnd];
metadata.msg_length.data = (int)buff.length;
[self sendPacket:buff :metadata];
}
抓包格式如图所示:
2. 客户端发送Window Acknowledgment消息
Window Acknowledgement Size 是设置接收端消息窗口大小,一般是2500000字节,即告诉客户端你在收到我设置的窗口大小的这么多数据之后给我返回一个ACK消息,告诉我你收到了这么多消息。在实际做推流的时候推流端要接收很少的服务器数据,远远到达不了窗口大小,所以基本不用考虑这点。而对于服务器返回的ACK消息一般也不做处理,我们默认服务器都已经收到了这么多消息,项目中这一步给省略掉了,不过可以通过文中顶部的阿里云sdk推流的抓包可以看到,阿里云是做了发送 Window Acknowledgement Size 5000000消息到客户端
-
基于RTMP的message格式创建数据流
RTMP的Message格式有很多写的非常详细的文章,这里就不再赘述,对于RTMP格式还不是很清晰的同学可以看一下这篇文章(带你吃透RTMP),createStream的流程如下所示:
1.客户端发送命令消息中releaseStream命令到服务器端
2.客户端发送命令消息中FCPublish命令到服务器端
3.客户端发送命令消息中的“创建流”(createStream)命令到服务器端。
4.服务器端接收到“创建流”命令后,发送命令消息中的“结果”(_result),通知客户端流的状态。
if ([command isEqualToString:@"_result"]) {
if ([trackedCommand isEqualToString:@"connect"]) {
[self sendReleaseStream];
[self sendFCPublish];
[self sendCreateStream];
}
}
- (void)sendReleaseStream {
RTMPChunk_0 metadata = {0};
metadata.msg_stream_id = SGStreamIDInvoke;
metadata.msg_type_id = SGMSGTypeID_NOTIFY;
NSMutableData *buff = [NSMutableData data];
[buff appendString:@"releaseStream"];
[buff appendDouble:++_numOfInvokes];
self.trackedCommands[@(_numOfInvokes)] = @"releaseStream";
[buff appendByte:kAMFNull];
[buff appendString:_url.playPath];
metadata.msg_length.data = (int)buff.length;
[self sendPacket:buff :metadata];
}
- (void)sendFCPublish {
RTMPChunk_0 metadata = {0};
metadata.msg_stream_id = SGStreamIDInvoke;
metadata.msg_type_id = SGMSGTypeID_NOTIFY;
NSMutableData *buff = [NSMutableData data];
[buff appendString:@"FCPublish"];
[buff appendDouble:(++_numOfInvokes)];
self.trackedCommands[@(_numOfInvokes)] = @"FCPublish";
[buff appendByte:kAMFNull];
[buff appendString:_url.playPath];
metadata.msg_length.data = (int)buff.length;
[self sendPacket:buff :metadata];
}
- (void)sendCreateStream {
RTMPChunk_0 metadata = {0};
metadata.msg_stream_id = SGStreamIDInvoke;
metadata.msg_type_id = SGMSGTypeID_INVOKE;
NSMutableData *buff = [NSMutableData data];
[buff appendString:@"createStream"];
self.trackedCommands[@(++_numOfInvokes)] = @"createStream";
[buff appendDouble:_numOfInvokes];
[buff appendByte:kAMFNull];
metadata.msg_length.data = (int)buff.length;
[self sendPacket:buff :metadata];
}
上面三种命令的格式参考向服务端发送connect消息的格式,Command Name 命令做对应的修改, Command Object没有command相关的信息,使用Null类型表示, 经过releaseStream,FCPublish ,createStream消息之后,得到了_result消息之后,接下来客户端就可以发起publish消息
-
publish推流
推流端使用publish消息向rtmp服务器端发布一个命名的流,发布之后,任意客户端都可以以该名称请求视频、音频和数据,我们首先来看一下publish消息的组织结构:
字段 | 类型 | 说明 |
---|---|---|
Command Name(命令名字) | String | 命令的名字,如"publish" |
Transaction ID(事务ID) | Number | 恒为0 |
Command Object(命令包含的参数对象) | Object | NULL,对onSatus命令来说不需要这个字段 |
Publishing Name(推流的名称) | String | 流名称 |
Publishing Type(推流类型) | String | "live"、"record"、"append"中的一种。live表示该推流文件不会在服务器端存储;record表示该推流的文件会在服务器应用程序下的子目录下保存以便后续播放,如果文件已经存在的话删除原来所有的内容重新写入;append也会将推流数据保存在服务器端,如果文件不存在的话就会建立一个新文件写入,如果对应该流的文件已经存在的话保存原来的数据,在文件末尾接着写入 |
if ([command isEqualToString:@"_result"]) {
if ([trackedCommand isEqualToString:@"createStream"]) {
if (p[10] || p[19] != 0x05 || p[20]) {
NSLog(@"RTMP: Unexpected reply on connect()\n");
} else {
_streamID = [NSMutableData getDouble:p+21];
}
[self sendPublish];
self.rtmpStatus = SGRtmpSessionStatusReady;
}
}
- (void)sendPublish {
RTMPChunk_0 metadata = {0};
metadata.msg_stream_id = SGStreamIDAudio;
metadata.msg_type_id = SGMSGTypeID_INVOKE;
NSMutableData *buff = [NSMutableData data];
[buff appendString:@"publish"];
[buff appendDouble:++_numOfInvokes];
self.trackedCommands[@(_numOfInvokes)] = @"publish";
[buff appendByte:kAMFNull];
// 流名称
[buff appendString:_url.playPath];
// 推流类型
[buff appendString:@"live"];
metadata.msg_length.data = (int)buff.length;
[self sendPacket:buff :metadata];
}
客户端发送publish消息给rtmp服务端后,服务端会向客户端反馈一条onStatus消息, 这时整个推流就结束了,下面就可以开始传递音视频的数据, sendPacket:(NSData *)data 这个函数是将数据按照message格式来进行处理,因为消息发送都是调用的这个函数,由于文章篇幅问题,所以将这个方法贴在这里,有疑惑的同学可以先看一下前边贴的RTMP格式讲解的文章
- (void)sendPacket:(NSData *)data :(RTMPChunk_0)metadata {
SGFrame *frame = [[SGFrame alloc] init];
frame.data = data;
frame.timestamp = metadata.timestamp.data;
frame.msgLength = metadata.msg_length.data;
frame.msgTypeId = metadata.msg_type_id;//消息类型
frame.msgStreamId = metadata.msg_stream_id;//消息流id
[self sendBuffer:frame];
}
/**
* Chunk Basic Header: HeaderType+ChannelID组成 1个字节
* >HeaderType(前两bit): 00->12字节 01->8字节
* >ChannelID(后6个bit): 02->Ping和ByteRead通道 03->Invoke通道 connect() publish()和自己写的NetConnection.Call() 04->Audio和Vidio通道
*
* 12字节举例
* Chunk Message Header: timestamp + message_length+message_typ + msg_stream_id
*
*/
- (void)sendBuffer:(SGFrame *)frame{
dispatch_sync(_packetQueue, ^{
uint64_t ts = frame.timestamp; // 时间戳
int streamId = frame.msgStreamId; // msg_stream_id
NSLog(@"streamId------%d",streamId);
NSNumber *preTimestamp = self.preChunk[@(streamId)]; // 上一帧的时间戳
uint8_t *chunk;
int offset = 0; // 消息长度
if (preTimestamp == nil) {//第一帧,音频或者视频
chunk = malloc(12);
chunk[0] = RTMP_CHUNK_TYPE_0/*0x00*/ | (streamId & 0x1F); //前两个字节 00 表示12字节
offset += 1;
memcpy(chunk+offset, [NSMutableData be24:(uint32_t)ts], 3);
offset += 3;//时间戳3个字节
memcpy(chunk+offset, [NSMutableData be24:frame.msgLength], 3);
offset += 3;//消息长度3个字节
int msgTypeId = frame.msgTypeId;//一个字节的消息类型
memcpy(chunk+offset, &msgTypeId, 1);
offset += 1;
memcpy(chunk+offset, (uint8_t *)&(_streamID), sizeof(_streamID)); // 占用4个字节
offset += sizeof(_streamID);
} else { //不是第一帧
chunk = malloc(8);
chunk[0] = RTMP_CHUNK_TYPE_1/*0x40*/ | (streamId & 0x1F);//前两个字节01表示8字节
offset += 1;
char *temp = [NSMutableData be24:(uint32_t)(ts - preTimestamp.integerValue)];
memcpy(chunk+offset, temp, 3);
offset += 3;
memcpy(chunk+offset, [NSMutableData be24:frame.msgLength], 3);
offset += 3;
int msgTypeId = frame.msgTypeId;
memcpy(chunk+offset, &msgTypeId, 1);
offset += 1;
}
self.preChunk[@(streamId)] = @(ts);
uint8_t *bufferData = (uint8_t *)frame.data.bytes;
uint8_t *outp = (uint8_t *)malloc(frame.data.length + 64);
memcpy(outp, chunk, offset);
free(chunk);
NSUInteger total = frame.data.length;
NSInteger step = MIN(total, _outChunkSize);
memcpy(outp+offset, bufferData, step);
offset += step;
total -= step;
bufferData += step;
while (total > 0) {
step = MIN(total, _outChunkSize);
bufferData[-1] = RTMP_CHUNK_TYPE_3/*0xC0*/ | (streamId & 0x1F);//11表示一个字节,直接跳过这个字节;
memcpy(outp+offset, bufferData - 1, step + 1);
offset += step + 1;
total -= step;
bufferData += step;
}
NSData *tosend = [NSData dataWithBytes:outp length:offset];
free(outp);
[self writeData:tosend];
});
}
结尾
感谢大家的观看,推流整个流程,每一个环节都有很多细节需要思考,由于篇幅较长,呈现的内容也很多,需要有耐心的多读几遍,希望看完你会有所收获,可能有些地方说的不严谨,还望大家多多指正哈,一起学习,这里附上查阅学习的文章
手撕Rtmp协议
带你吃透RTMP
iOS不用任何第三方,写一个简单的RTMP直播推流器
特别感谢以上分享文章的朋友