Live555源码解析(3) - 服务开启,愿者上钩

上一篇Live555源码解析(2) - RTSP协议概述对RTSP进行了整体介绍,对会话交互过程及通常应用场景做了示例。接下来,我们就从媒体服务器的本职工作服务开始谈起。

1. 从服务器说起

要服务,就必须有服务器,有开放给外界客户端访问的地址和端口。先放源码:

TaskScheduler* scheduler = BasicTaskScheduler::createNew();
UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);

UserAuthenticationDatabase* authDB = NULL;

portNumBits rtspServerPortNum = 554;
//@1.1 建立RTSP服务器
RTSPServer* rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
if(rtspServer == NULL)
{
    rtspServerPortNum = 8554;
    rtspServer = DynamicRTSPServer::createNew(*env, rtspServerPortNum, authDB);
}
if(rtspServer == NULL)
{
    *env << "Failed to create RTSP server: " << env->getResultMsg() << "\n";
    exit(1);
}

@1.1 建立RTSP服务器

createNew()函数有三个参数,分别说明如下:

  • *env
    env对应的UsageEnvironment、BasicUsageEnvironment主要为后台运行机制,提供基础支持。具体已在本系列Live555源码解析(1) - Main 寻根问祖,留其筋骨中详细说明,这里不再赘述。

  • rtspServerPortNum
    RTSP默认知名端口为554,如被占用,则使用8554。如仍不能成功创建,则提示错误后退出程序

  • authDB
    authDB对应UserAuthenticationDatabase,默认情况下不会定义ACCESS_CONTROL,即关闭认证机制。因此此处不进行分析,后续如有需求,可专门整理下互联网认证机制概况。

本句代码中出现了两个class,分别为RTSPServerDynamicRTSPServer。第一眼看应该是继承关系,确实也是继承关系,只不过希望先留个更清晰的印象。

RTSPServer Class Hierarchy

2. DynamicRTSPServer::createNew()

DynamicRTSPServer* DynamicRTSPServer::createNew(UsageEnvironment& env, Port ourPort, UserAuthenticationDatabase* authDatabase, unsigned reclamationTestSeconds)
{
    //@2.1 创建Socket
    int ourSocket = setUpOurSocket(env, ourPort);
    if (ourSocket == -1) return NULL;

    //@2.2 DynamicRTSPServer构造函数
    return new DynamicRTSPServer(env, ourSocket, ourPort, authDatabase,     reclamationTestSeconds);
}

@2.1 创建Socket

由于RTSPServer、RTSPServerSupportingHTTPStreaming、DynamicRTSPServer均未实现setUpOurSocket()函数,因此这里实际调用的是父类GenericMediaServer中的setUpOurSocket()

int GenericMediaServer::setUpOurSocket(UsageEnvironment& env, Port& ourPort)
{
    int ourSocket = -1;
    do
    {
        //如果当前Socket已被本地服务器占用,则不允许重复使用
        NoReuse dummy(env);
        
        //@2.1.1 创建TCPSocket
        ourSocket = setupStreamSocket(env, ourPort);
        if(ourSocket < 0) break;
            
        //@2.1.2 调整Socket发送Buffer大小
        if(!increaseSendBufferTo(env, ourSocket, 50*1024)) break;

        //@2.1.3 切换Socket模式为LISTEN
        if(listen(ourSocket, LISTEN_BACKLOG_SIZE) < 0)
        {
            env.setResultErrMsg("listen() failed: ");
            break;
        }
            
        //@2.1.4 校验port值
        if(ourPort.num() == 0)
            if(!getSourcePort(env, ourSocket, ourPort)) break;
            return ourSocket;
    } while(0);
        
    //@2.1.5 异常情况,清场退出
    if(ourSocket != -1) ::closeSocket(ourSocket);
    return -1;
}

@2.1.1 创建TCPSocket

GroupsockHelper中定义了Socket的一些全局帮助函数,使用时inlcude "GroupsockHelper.hh"即可,如此处的setupStreamSocket。

int setupStreamSocket(UsageEnvironment& env, Port port, Boolean makeNonBlocking)
{
    //@2.1.1.1 winsock使用前初始化
    if(!initializeWinsockIfNecessary())
    {
        socketErr(env, "Failed to initialize 'winsock': ");
        return -1;
    }

    //@2.1.1.2 创建IPv4 Socket
    int newSocket = createSocket(SOCK_STREAM);
    if(newSocket < 0)
    {
        socketErr(env, "unable to create stream socket: ");
        return newSocket;
    }

    //@2.1.1.3 SO_REUSEADDR
    int reuseFlag = groupsockPriv(env)->reuseFlag;
    reclaimGroupsockPriv(env);
    if(setsockopt(newSocket, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuseFlag, sizeof  reuseFlag) < 0)
    {
        socketErr(env, "setsockopt(SO_REUSEADDR) error: ");
        closeSocket(newSocket);
        return -1;
    }

    //@2.1.1.4 SO_REUSEPORT
    if(setsockopt(newSocket, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuseFlag, sizeof reuseFlag) < 0)
    {
        socketErr(env, "setsockopt(SO_REUSEPORT) error: ");
        closeSocket(newSocket);
        return -1;
    }

    // 如已制定Port或Addr,则需显式bind()至创建的socket
    if(port.num() != 0 || ReceivingInterfaceAddr != INADDR_ANY)
    {
        MAKE_SOCKADDR_IN(name, ReceivingInterfaceAddr, port.num());
        if(bind(newSocket, (struct sockaddr*)&name, sizeof name) != 0)
        {
            char tmpBuffer[100];
            sprintf(tmpBuffer, "bind() error(port number: %d): ", ntohs(port.num());
            socketErr(env, tmpBuffer);
            closeSocket(newSocket);
            return -1;
        }
    }

    //@2.1.1.5 non-blocking
    if(makeNonBlocking)
    {
        if(!makeSocketNonBlocking(newSocket))
        {
            socketErr(env, "failed to make non-blocking: ");
            closeSocket(newSocket);
            return -1;
        }
    }
    return newSocket;
}

@2.1.1.1 winsock使用前初始化

winsock使用前必须调用初始化命令,通过WSAStartup()设置Socket最高版本。这里尝试2.2和1.1版本,成功即可。
源码位于groupsock/inet.c中。

@2.1.1.2 创建IPv4 Socket

调用socket()函数创建指定类型的IPv4 Socket,此处为SOCK_STREAM即TCP Socket。

sock = socket(AF_INET, type|SOCK_CLOEXEC, 0);

注意到此处尝试设置SOCK_CLOEXEC属性。

@2.1.1.3 SO_REUSEADDR

设置SO_REUSEADDR Flag,TCP的最主要目标是保证数据传输的可靠性,当TCP连接中出现分组丢失时,丢失的分组在2RTT后未收到ACK则会重传以保证数据可靠。完成连接任务后,TCP开始进行四路断开,不幸的是,此时原迷路(被路由到其他网络)的分组又被路由回来了。那么TCP应该如何对待这个分组?

如果不允许重用地址,也就是说不设置SO_REUSEADDR标志,那么TCP连接将保持处于TIME_WAIT状态2MSL时间以避免前述分组重复情况。2MSL(通常为30~120秒)内,所有重复分组直接被丢弃。如重复分组在2MSL时长内仍未到达,则将自然消亡,因此连接安全退出。

看起来,这是一种安全机制,保证了重复分组可以得到正确处理,但实践中发生上述情况的概率很小,而如果均采取上述操作会引入新的麻烦。比如服务器所在主机异常重启,服务进程被动重启,此时如不允许地址重用,则需等待2MSL,也就是说30~120秒,这个时长内,无法提供正常服务。这个代价是没有必要的。

因此,不管是BSD socket还是Windows Winsock,通常都会设置SO_REUSEADDR标志,当然必须在bind()前设置才能生效。

@2.1.1.4 SO_REUSEPORT

SO_REUSEPORT支持多个进程或者线程绑定到同一端口,以提高服务器程序的性能,它解决了如下问题:

  • 允许多个套接字bind()/listen() 同一个TCP/UDP端口
  • 每一个线程拥有自己的服务器套接字
  • 在服务器套接字上没有了锁的竞争
  • 内核层面实现负载均衡
  • 安全层面,监听同一个端口的套接字只能位于同一个用户下面

@2.1.1.5 non-blocking

默认情况下,TCP socket是阻塞(blocking)模式的,例如当你调用recv()来读取流数据时,该函数会保持阻塞,直到至少有一个字节数据可读或出现异常错误才会返回。而非阻塞(non-blocking)模式时,操作不必等到有结果才返回,而是以异步的形式在满足条件后返回,这对于处理多个socket时非常有效。

所以至此,如成功从setupStreamSocket()返回,那么我们得到的一定是一个具有CLOEXEC、REUSEADDR、REUSEPORT且non-blocking的Socket。

@2.1.2 调整Socket发送Buffer大小

为了确保Socket发送Buffer足够大,这里设置为50KB。同样内部是通过GroupsockHelper提供的帮助函数完成的。这里先说明下为什么要有缓存的存在。

众所周知,TCP的卖点在可靠性,可靠不意味着不出错,否则IP协议就足够传输了。可靠指的是容错的能力很强,既然要容错,就必须有备份,否则无法支持重传、滑动窗口、拥塞控制等机制运行。既然有备份,那就必须有缓存,也就是说buffer的存在。

事实上,每个Socket都拥有自己的Receive Buffer和Send Buffer,对应Read和Write操作。顺便说明一下,Read/Write返回仅仅是指和Buffer的交互过程完成,比如Write所有数据到Send Buffer后即可返回成功,而从Send Buffer到对端Receive Buffer,则需要由TCP协议内部众多机制参与完成。

@2.1.3 切换Socket模式为LISTEN

socket创建后默认为主动状态,而listen()用于将socket切换为被动监听状态。所谓被动监听,是指当无客户端请求时,socket处于睡眠状态,直到出现请求,才会被唤醒来响应请求。

需要注意的是第二个参数LISTEN_BACKLOG_SIZE,它代表着内核应该为相应socket排队的最大连接个数。通常内核为每个socket维护两个队列:

  • 未完成连接队列
  • 已完成连接队列


此处图片及内容参考或截取自《Unix网络编程卷1》p107,这是本好书,建议阅读。

@2.1.4 校验port值

如未指定port值,则有内核确认port端口,此时应显式获取,更新变量ourPort以便后续使用。自此,第一部分setUpOurSocket操作完成,收获的是端口号为554或8554,具有CLOEXEC、REUSEADDR、REUSEPORT且non-blocking标志,发送缓存为50KB,已切换至Listen状态的Socket。

@2.2 DynamicRTSPServer构造函数

有了静态的Socket后,就有了对外提供服务的渠道或窗口,但此时服务本身并未建设完成,因此第二部分的工作就是铺设基础服务。

DynamicRTSPServer::DynamicRTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,
        UserAuthenticationDatabase* authDatabase, unsigned reclamtionTestSeconds)
    //@2.2.1 RTSPServerSupportingHTTPStreaming构造函数
    : RTSPServerSupportingHTTPStreaming(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds)
{}

第一小节中,已经给出了DynamicRTSPServerRTSPServer等类间继承关系图,结合这段代码,也可以看出除了调用基类RTSPServerSupportingHTTPStreaming构造函数外,并无其他内容。

@2.2.1 RTSPServerSupportingHTTPStreaming构造函数

类似地,除调用基类构造函数外,无其他初始化内容。

RTSPServerSupportingHTTPStreaming::RTSPServerSupportingHTTPStreaming(UsageEnvironment& env,                 int ourSocket, Port rtspPort, UserAuthenticationDatabase* authDatabase, 
        unsigned reclamationTestSeconds)
//@2.2.1.1 RTSPServer构造函数
: RTSPServer(env, ourSocket, rtspPort, authDatabase, reclamationTestSeconds) 
{}

@2.2.1.1 RTSPServer构造函数

RTSPServer::RTSPServer(UsageEnvironment& env, int ourSocket, Port ourPort,  
            UserAuthenticationDatabase* authDatabase,
            unsigned reclamationSeconds)
    //@2.2.1.1.1 GenericMediaServer构造函数
    : GenericMediaServer(env, ourSocket, ourPort, reclamationSeconds)
    , fHTTPServerSocket(-1), fHTTPServerPort(0)
    , fAllowStreamingRTPOverTCP(True)
    , fClientConnectionsForHTTPTunneling(NULL) // will get created if needed
    , fTCPStreamingDatabase(HashTable::create(ONE_WORD_HASH_KEYS))
    , fPendingRegisterOrDeregisterRequests(HashTable::create(ONE_WORD_HASH_KEYS))
    , fRegisterOrDeregisterRequestCounter(0)
    //@2.2.1.1.2 认证机制
    , fAuthDB(authDatabase)
{
}
@2.2.1.1.1 GenericMediaServer构造函数
GenericMediaServer::GenericMediaServer(UsageEnvironment& env, int ourSocket,                                        Port ourPort, unsigned reclamationSeconds)
    //@2.2.1.1.1.1 Medium构造函数
    : Medium(env)
    , fServerSocket(ourSocket), fServerPort(ourPort) 
    , fReclamationSeconds(reclamationSeconds)
    //@2.2.1.1.1.2 HASH Tables
    , fServerMediaSessions(HashTable::create(STRING_HASH_KEYS))
    , fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS))
    , fClientSessions(HashTable::create(STRING_HASH_KEYS))
{
    //@2.2.1.1.1.3 ignore SIGPIPE信号
    ignoreSigPipeOnSocket(fServerSocket); 
    //@2.2.1.1.1.4 铺设服务-incomingConnectionHandler
    env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket,     
                                incomingConnectionHandler, this); 
}
@2.2.1.1.1.1 Medium构造函数
Medium::Medium(UsageEnvironment& env)
    :fEnviron(env), fNextTask(NULL) 
{
    MediaLookupTable::ourMedia(env)->generateNewName(fMediumName, mediumNameMaxLen);
    env.setResultMsg(fMediumName);
    MediaLookupTable::ourMedia(env)->addNew(this, fMediumName);
}

根父类,是Media Server中大多数类的父类。MediaLookupTable是一个类,主要用于方便根据名称进行Medium的查找,内部使用,不进行详细说明。

@2.2.1.1.1.2 HASH Tables

顾名思义,三张表分别保存着服务器媒体会话、客户端连接以及客户端会话。各表内容变动API分别为:

  • ServerMediaSessions

    • addServerMediaSession()
    • removeServerMediaSession()
  • ClientConnections

    • ClientConnection()构造函数
    • ~ClientConnection()析构函数
  • ClientSessions

    • createNewClientSessionWithId()
    • closeAllClientSessionsForServerMediaSession()
@2.2.1.1.1.3 ignore SIGPIPE信号

先说明下SIGPIPE信号如何产生:当socket对端已调用close进行了完全关闭时,本地Write发出的保温会导致对端回复RST报文,假如本地再次调用Write,则会生成SIGPIPE信号,导致进程退出。
为了避免进程退出,既可以捕获SIGPIPE信号对其进行处理,也可以简单忽略。这样第二次调用Write时会返回-1,且错误值设为SIGPIPE,以便程序知道对端已经关闭。

@2.2.1.1.1.4 铺设服务-incomingConnectionHandler

实际调用UsageEnvironment中TaskScheduler类的成员函数。

void turnOnBackgroundReadHandling(int socketNum,
                BackgroundHandlerProc* handlerProc, void* clientData) 
{
    setBackgroundHandling(socketNum, SOCKET_READABLE, handlerProc, clientData);
}

该函数又在BasicTaskScheduler类中实现。

void BasicTaskScheduler::setBackgroundHandling(int socketNum,
    int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData)
{
    if(socketNum < 0) return;
    
    FD_CLR((unsigned)socketNum, &fReadSet);
    FD_CLR((unsigned)socketNum, &fWriteSet);
    FD_CLR((unsigned)socketNum, &fExceptionSet);
    if(conditionSet == 0)
    {
        fHandlers->clearHandler(socketNum);
        if(socketNum+1 == fMaxNumSockets)
            --fMaxNumSockets;
    }
    else
    {
        fHandlers->assignHandler(socketNum, conditionSet, handlerProc, clientData);
        if(socketNum+1 > fMaxNumSockets)
            fMaxNumSockets = socketNum + 1;
        if(conditionSet&SOCKET_READABLE) FD_SET((unsigned)socketNum, &fReadSet);
        if(conditionSet&SOCKET_WRITABLE) FD_SET((unsigned)socketNum, &fWriteSet);
        if(conditionSet&SOCKET_EXCEPTION) FD_SET((unsigned)socketNum, &fExceptionSet);
    }                           
}

如果你还能记得Live555源码解析(1) - Main 寻根问祖,留其筋骨提到main()函数中的Loop SingleStep(),那么ReadSetWriteSetExceptionSet一定还有印象。事实上,这三个Set用于多Sockets Select。

还需要注意的一句是assignHandler(),作用对象为fHandlers,该对象类型为HandlerSet,同样在Live555源码解析(1) - Main 寻根问祖,留其筋骨中也有详细说明,此处不再赘述。

只需要记住,Set和fHandlers两个变量配合的结果是为了当socket满足指定条件时,调用设定的处理函数。对应到此处,是指当Socket可读时,调用incomingConnectionHandler函数。完成判断、调用逻辑的实现,就在BasicTaskScheduler::SingleStep()中Socket I/O循环部分。

@2.2.1.1.2 认证机制

虽然本篇并不讨论认证机制,但仍然需要注意的是,认证机制在这一层完成。

3. 总结

从前面两节的代码分析来看,暂时只铺设了一项服务incomingConnectionHandler,同时准备了一些哈希表以供程序运行中使用,分别为ServerMediaSessions、ClientConnections、ClientSessions,也在@2.2.1.1.1.2 HASH Tables小节中列出了会修改表内容的API。这就是能从DynamicRTSPServer create所发掘的所有动线。

下一篇中,将基于这项服务,寻找服务的使用动机、时机,以及所引起的其他连锁反应。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容