EasyDarwin RTSP会话流程解析

在main.cpp的main函数里:

...
//This function starts, runs, and shuts down the server
    if (::StartServer(&theXMLParser, &theMessagesSource, thePort, statsUpdateInterval, theInitialState, dontFork, debugLevel, debugOptions) != qtssFatalErrorState)
    {    
         ::RunServer();
         CleanPid(false);
         exit (EXIT_SUCCESS);
    }
    else
        exit(-1); //Cant start server don't try again

QTSS_ServerState StartServer(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, int statsUpdateInterval, QTSS_ServerState inInitialState, Bool16 inDontFork, UInt32 debugLevel, UInt32 debugOptions)
{
    QTSS_ServerState theServerState = qtssStartingUpState;
    
    sStatusUpdateInterval = statsUpdateInterval;
    ...
    //start the server
    QTSSDictionaryMap::Initialize();
    QTSServerInterface::Initialize();// this must be called before constructing the server object
    sServer = NEW QTSServer();
    sServer->SetDebugLevel(debugLevel);
    sServer->SetDebugOptions(debugOptions);
    
    // re-parse config file
    inPrefsSource->Parse();
    ...
    sServer->Initialize(inPrefsSource, inMessagesSource, inPortOverride,createListeners);
    ...
    if (sServer->GetServerState() != qtssFatalErrorState)
    {
        IdleTask::Initialize();
        Socket::StartThread();
        OSThread::Sleep(1000);
        sServer->InitModules(inInitialState);
        //开始监听
        sServer->StartTasks();
        sServer->SetupUDPSockets(); // udp sockets are set up after the rtcp task is instantiated
        theServerState = sServer->GetServerState();
    }
    return theServerState;
}

在StartServer里首先创建QTSServer,然后调用了sServer->Initialize函数,在Initialize调用了CreateListeners函数依据xml的配置文件创建了监听的套接字来监听RTSP的端口,这两个函数的实现如下:

Bool16 QTSServer::Initialize(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, Bool16 createListeners)
{
    ...
    // CREATE GLOBAL OBJECTS
    fSocketPool = new RTPSocketPool();
    fRTPMap = new OSRefTable(kRTPSessionMapSize);
    fHLSMap = new OSRefTable(kHLSSessionMapSize);
    fReflectorSessionMap = new OSRefTable(kReflectorSessionMapSize);
    ...
    // BEGIN LISTENING
    if (createListeners)
    {
        if ( !this->CreateListeners(false, fSrvrPrefs, inPortOverride) )
            QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssMsgSomePortsFailed, 0);
    }
    ...
    fServerState = qtssStartingUpState;
    return true;
}
Bool16 QTSServer::CreateListeners(Bool16 startListeningNow, QTSServerPrefs* inPrefs, UInt16 inPortOverride)
{
    // Create any new listeners we need
    for (UInt32 count3 = 0; count3 < theTotalRTSPPortTrackers; count3++)
    {
        if (theRTSPPortTrackers[count3].fNeedsCreating)
        {
            newListenerArray[curPortIndex] = NEW RTSPListenerSocket();
            //在Initialize创建了TCPSocket
            QTSS_Error err = newListenerArray[curPortIndex]->Initialize(theRTSPPortTrackers[count3].fIPAddr, theRTSPPortTrackers[count3].fPort);
            ...
            else
            {
                //
                // This listener was successfully created.
                if (startListeningNow)
                    newListenerArray[curPortIndex]->RequestEvent(EV_RE);
                curPortIndex++;
            }
        }
    }
}

然后StartServer里调用sServer->StartTasks()开始监听。StartTasks是如何开始监听的呢?

void QTSServer::StartTasks()
{
    fRTCPTask = new RTCPTask();
    fStatsTask = new RTPStatsUpdaterTask();
    //
    // Start listening
    for (UInt32 x = 0; x < fNumListeners; x++)
        fListeners[x]->RequestEvent(EV_RE);
}

这里的fListeners就是RTSPListenerSocket,其继承关系如下图,RequestEvent就是把创建的fFileDesc和EV_RE(读事件)添加到epoll中.


123.jpg

在RequestEvent里有这么两行代码

fRef.Set(fUniqueIDStr, this); 
fEventThread->fRefTable.Register(&fRef);

这个fEventThread是在StartServer里被创建且开始运行的,在该线程里epoll会监听网络事件,网络事件来的时候,会调用fRefTable存储的EventContext指针对象的ProcessEvent处理事件;fListeners向fEventThread的fRefTable注册了fRef,也就是等网络事件来的时候调用到fListeners的ProcessEvent处理。
该线程函数的代码如下:

void EventThread::Entry()
{
    struct eventreq theCurrentEvent;
    ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );
    
    while (true)
    {
        int theErrno = EINTR;
        while (theErrno == EINTR)
        {
#if MACOSXEVENTQUEUE
            int theReturnValue = waitevent(&theCurrentEvent, NULL);
#else
            
            #if defined(__linux__)
            int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);
            #else
            int theReturnValue = select_waitevent(&theCurrentEvent, NULL);            
            #endif
#endif  
            ...
        }
        
        AssertV(theErrno == 0, theErrno);
        
        //ok, there's data waiting on this socket. Send a wakeup.
        if (theCurrentEvent.er_data != NULL)
        {
            //The cookie in this event is an ObjectID. Resolve that objectID into
            //a pointer.
            StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
            OSRef* ref = fRefTable.Resolve(&idStr);
            if (ref != NULL)
            {
                EventContext* theContext = (EventContext*)ref->GetObject();
#if DEBUG
                theContext->fModwatched = false;
#endif
                theContext->ProcessEvent(theCurrentEvent.er_eventbits);
                fRefTable.Release(ref);
            }
        }
    ...
}

那在ProcessEvent里干了些什么事情呢?

void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
{
     ...
    //fSocket data member of TCPSocket.
    int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
    ...
    
    theTask = this->GetSessionTask(&theSocket);
    if (theTask == NULL)
    {    //this should be a disconnect. do an ioctl call?
        close(osSocket);
        if (theSocket)
            theSocket->fState &= ~kConnected; // turn off connected state
    }
    else
    {   
        Assert(osSocket != EventContext::kInvalidFileDesc);
        
        //set options on the socket
        //we are a server, always disable nagle algorithm
        int one = 1;
        int err = ::setsockopt(osSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int));
        AssertV(err == 0, OSThread::GetErrno());
        
        err = ::setsockopt(osSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&one, sizeof(int));
        AssertV(err == 0, OSThread::GetErrno());
    
        int sndBufSize = 96L * 1024L;
        err = ::setsockopt(osSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sndBufSize, sizeof(int));
        AssertV(err == 0, OSThread::GetErrno());
    
        //setup the socket. When there is data on the socket,
        //theTask will get an kReadEvent event
        theSocket->Set(osSocket, &addr);
        theSocket->InitNonBlocking(osSocket);
        theSocket->SetTask(theTask);
        theSocket->RequestEvent(EV_RE);
        
        theTask->SetThreadPicker(Task::GetBlockingTaskThreadPicker()); //The RTSP Task processing threads
    }
    ...
}

从TCPListenerSocket::ProcessEvent可知,在该函数里accept RTSP的连接请求,然后调用GetSessionTask创建RTSPSession了,theSocket attach上了osSocket,然后设置osSocket为非阻塞,设置处理osSocket的网络事件为RTSPSession,把osSocket的读事件加入到epoll中。至此我们可以知道该连接的所有网络事件由RTSPSession处理。theSocket是一个TCPSocket指针对象,由RTSPSession创建的,而不再是一个RTSPListenerSocket了。
RTSPSession是如何获取到网络事件的?从TCPListenerSocket::ProcessEvent中知道,theSocket对象调用了RequestEvent,请求了读事件,也就是在epoll中等待网络读事件,当读到RTSP会话的对端发送了RTSP的信息时(在EventThread线程函数Entry里,如下所示),会调用到EventContext的ProcessEvent函数。

void EventThread::Entry()
{
    struct eventreq theCurrentEvent;
    ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );
    
    while (true)
    {
        int theErrno = EINTR;
        while (theErrno == EINTR)
        {
#if MACOSXEVENTQUEUE
            int theReturnValue = waitevent(&theCurrentEvent, NULL);
#else
            
            #if defined(__linux__)
            int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);
            #else
            int theReturnValue = select_waitevent(&theCurrentEvent, NULL);            
            #endif
#endif  
            //Sort of a hack. In the POSIX version of the server, waitevent can return
            //an actual POSIX errorcode.
            if (theReturnValue >= 0)
                theErrno = theReturnValue;
            else
                theErrno = OSThread::GetErrno();
        }
        
        AssertV(theErrno == 0, theErrno);
        
        //ok, there's data waiting on this socket. Send a wakeup.
        if (theCurrentEvent.er_data != NULL)
        {
            //The cookie in this event is an ObjectID. Resolve that objectID into
            //a pointer.
            StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
            OSRef* ref = fRefTable.Resolve(&idStr);
            if (ref != NULL)
            {
                EventContext* theContext = (EventContext*)ref->GetObject();
#if DEBUG
                theContext->fModwatched = false;
#endif
                theContext->ProcessEvent(theCurrentEvent.er_eventbits);
                fRefTable.Release(ref);
                
                
            }
        }

#if EVENT_CONTEXT_DEBUG
        SInt64  yieldStart = OS::Milliseconds();
#endif

    #if 0//defined(__linux__)

    #else
        this->ThreadYield();
    #endif
    
#if EVENT_CONTEXT_DEBUG
        SInt64  yieldDur = OS::Milliseconds() - yieldStart;
        static SInt64   numZeroYields;
        
        if ( yieldDur > 1 )
        {
            qtss_printf( "EventThread time in OSTHread::Yield %i, numZeroYields %i\n", (SInt32)yieldDur, (SInt32)numZeroYields );
            numZeroYields = 0;
        }
        else
            numZeroYields++;
#endif
    }
}

ProcessEvent会给RTSPSession这个Task发送一个kReadEvent的信号,此处的fTask就是RTSPSession。于是RTSPSession这个task会被加入到线程队列中,等待RTSPSession的run方法被调用去处理网络的读事件,也就是处理RTSP会话。
RTSPSession::run函数采用状态机处理RTSP会话,其状态机设计如下:


1234.jpg

• kReadingFirstRequest,RTSPSession刚被创建时,都是首先进入这个状态,然后读取客户端发送过来的RTSP request。
• kHTTPFilteringRequest,然后检查该请求是不是以RTSP-OVER-HTTP的方式进行RTSP交互。
• kHaveNonTunnelMessage,开始加锁,防止在该请求还没有处理完的时候,禁止其他的任何RTSP请求被处理。
• kFilteringRequest,会给每个订阅了QTSS_RTSPFilter_Role角色的模块调用QTSS_RTSPFilter_Role的回调函数。如果不是OPTION命令或者SET_PARAMETER命令,则会创建RTPSession。如果是RTP的数据包,则会调用QTSS_RTSPIncomingData_Role角色的回调,每个订阅了该角色的模块都可以获取到RTP包的内容。
• kPostProcessingRequest,如果创建了RTPSession,则会给所有订阅了QTSS_RTSPPostProcessor_Role角色的模块调用QTSS_RTSPPostProcessor_Role回调函数。
• kCleaningUp,在该状态机里释放fSessionMutex和fReadMutex锁。
• kReadingRequest,除了RTSP刚被创建时,读取request消息是在kReadingFirstRequest状态机里,其余所有的request消息都从kReadingRequest消息开始处理。
• kRoutingRequest,调用QTSS_RTSPRoute_Role角色的回调。
• kPreprocessingRequest,调用QTSS_RTSPPreProcessor_Role角色的回调,如果是TEARDOWN的request,则被调用的模块可能会调用RTPSession的Teardown方法,这样就会给RTPSession这个TASK发送一个kKillEvent的事件,RTPSession收到该事件后,会调用QTSS_ClientSessionClosing_Role角色的回调,关闭该会话。

技术交流可以入QQ群【554271674】

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容