srs 之http_api 调用流程

首先,http_api 监听和rtmp的监听是一样的
创建http_api的监听

srs_error_t SrsServer::listen_http_api()
{
    srs_error_t err = srs_success;
    
    close_listeners(SrsListenerHttpApi);
    if (_srs_config->get_http_api_enabled()) {
        SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi);
        listeners.push_back(listener);
        
        std::string ep = _srs_config->get_http_api_listen();
        
        std::string ip;
        int port;
        srs_parse_endpoint(ep, ip, port);
        
        if ((err = listener->listen(ip, port)) != srs_success) {
            return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port);
        }
    }
    
    return err;
}

同上一篇rtmp一样,调用
SrsBufferListener::listen -> SrsTcpListener::listen -> SrsTcpListener::cycle
调用srs_accept等待客户端的连接,然后调用SrsBufferListener::on_tcp_client进行处理请求,然后根据SrsListenerType type走不同的分支

    if (type == SrsListenerRtmpStream) {
        *pr = new SrsRtmpConn(this, stfd, ip, port);
    } else if (type == SrsListenerHttpApi) {
        *pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port);
    } else if (type == SrsListenerHttpsApi) {
        *pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port);
    } else if (type == SrsListenerHttpStream) {
        *pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port);
    } else if (type == SrsListenerHttpsStream) {
        *pr = new SrsResponseOnlyHttpConn(true, this, stfd, http_server, ip, port);
    } else {
        srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
        srs_close_stfd(stfd);
        return err;
    }

http_api就是SrsListenerHttpApi,创建了 SrsHttpApi 并调用 SrsHttpApi::start()
之后就创建协程调用SrsHttpConn::do_cycle

srs_error_t SrsHttpConn::do_cycle()
{
    srs_error_t err = srs_success;
    
    // set the recv timeout, for some clients never disconnect the connection.
    // @see https://github.com/ossrs/srs/issues/398
    skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT);

    // initialize parser
    if ((err = parser->initialize(HTTP_REQUEST)) != srs_success) {
        return srs_error_wrap(err, "init parser for %s", ip.c_str());
    }

    // Notify the handler that we are starting to process the connection.
    if ((err = handler_->on_start()) != srs_success) {
        return srs_error_wrap(err, "start");
    }

    SrsRequest* last_req = NULL;
    SrsAutoFree(SrsRequest, last_req);

    // process all http messages.
    err = process_requests(&last_req);
    
    srs_error_t r0 = srs_success;
    if ((r0 = on_disconnect(last_req)) != srs_success) {
        err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
        srs_freep(r0);
    }
    
    return err;
}

进入 SrsHttpConn::process_requests - > SrsHttpConn::process_request -> SrsHttpCorsMux::serve_http

srs_error_t SrsHttpConn::process_requests(SrsRequest** preq)
{
    srs_error_t err = srs_success;

    for (int req_id = 0; ; req_id++) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "pull");
        }

        // get a http message
        ISrsHttpMessage* req = NULL;
        if ((err = parser->parse_message(skt, &req)) != srs_success) {
            return srs_error_wrap(err, "parse message");
        }

        // if SUCCESS, always NOT-NULL.
        // always free it in this scope.
        srs_assert(req);
        SrsAutoFree(ISrsHttpMessage, req);

        // Attach owner connection to message.
        SrsHttpMessage* hreq = (SrsHttpMessage*)req;
        hreq->set_connection(this);

        // copy request to last request object.
        srs_freep(*preq);
        *preq = hreq->to_request(hreq->host());

        // may should discard the body.
        SrsHttpResponseWriter writer(skt);
        if ((err = handler_->on_http_message(req, &writer)) != srs_success) {
            return srs_error_wrap(err, "on http message");
        }

        // ok, handle http request.
        if ((err = process_request(&writer, req, req_id)) != srs_success) {
            return srs_error_wrap(err, "process request=%d", req_id);
        }

        // After the request is processed.
        if ((err = handler_->on_message_done(req, &writer)) != srs_success) {
            return srs_error_wrap(err, "on message done");
        }

        // donot keep alive, disconnect it.
        // @see https://github.com/ossrs/srs/issues/399
        if (!req->is_keep_alive()) {
            break;
        }
    }

    return err;
}
srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, int rid)
{
    srs_error_t err = srs_success;
    
    srs_trace("HTTP #%d %s:%d %s %s, content-length=%" PRId64 "", rid, ip.c_str(), port,
        r->method_str().c_str(), r->url().c_str(), r->content_length());
    
    // use cors server mux to serve http request, which will proxy to http_remux.
    if ((err = cors->serve_http(w, r)) != srs_success) {
        return srs_error_wrap(err, "mux serve");
    }
    
    return err;
}

srs_error_t SrsHttpCorsMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
    // If CORS enabled, and there is a "Origin" header, it's CORS.
    if (enabled) {
        SrsHttpHeader* h = r->header();
        required = !h->get("Origin").empty();
    }
    
    // When CORS required, set the CORS headers.
    if (required) {
        SrsHttpHeader* h = w->header();
        h->set("Access-Control-Allow-Origin", "*");
        h->set("Access-Control-Allow-Methods", "GET, POST, HEAD, PUT, DELETE, OPTIONS");
        h->set("Access-Control-Expose-Headers", "Server,range,Content-Length,Content-Range");
        h->set("Access-Control-Allow-Headers", "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type");
    }
    
    // handle the http options.
    if (r->is_http_options()) {
        w->header()->set_content_length(0);
        if (enabled) {
            w->write_header(SRS_CONSTS_HTTP_OK);
        } else {
            w->write_header(SRS_CONSTS_HTTP_MethodNotAllowed);
        }
        return w->final_request();
    }
    
    srs_assert(next);
    return next->serve_http(w, r);
}

调用到 SrsHttpCorsMux::serve_http ,此时就需要知道 next是什么了

SrsHttpApi::SrsHttpApi(bool https, ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
{
    // Create a identify for this client.
    _srs_context->set_id(_srs_context->generate_id());

    manager = cm;
    skt = new SrsTcpConnection(fd);

    if (https) {
        ssl = new SrsSslConnection(skt);
        conn = new SrsHttpConn(this, ssl, m, cip, port);
    } else {
        ssl = NULL;
        conn = new SrsHttpConn(this, skt, m, cip, port);
    }

    _srs_config->subscribe(this);
}

SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner* handler, ISrsProtocolReadWriter* fd, ISrsHttpServeMux* m, string cip, int cport)
{
    parser = new SrsHttpParser();
    cors = new SrsHttpCorsMux();
    http_mux = m;
    handler_ = handler;

    skt = fd;
    ip = cip;
    port = cport;
    create_time = srsu2ms(srs_get_system_time());
    clk = new SrsWallClock();
    kbps = new SrsKbps(clk);
    kbps->set_io(skt, skt);
    trd = new SrsSTCoroutine("http", this, _srs_context->get_id());
    traffic_flag_ = false;
}

srs_error_t SrsHttpConn::set_crossdomain_enabled(bool v)
{
    srs_error_t err = srs_success;

    // initialize the cors, which will proxy to mux.
    if ((err = cors->initialize(http_mux, v)) != srs_success) {
        return srs_error_wrap(err, "init cors");
    }

    return err;
}

从上面的代码,创建 SrsHttpApi 的时候,会把SrsHttpServeMux指针赋值过去,SrsHttpCorsMux调用initialize,会把SrsHttpServeMux指针赋值过去,所以next就是 SrsHttpServeMux*next了, 所以就调到了 SrsHttpServeMux::serve_http

srs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
    srs_error_t err = srs_success;
    
    ISrsHttpHandler* h = NULL;
    if ((err = find_handler(r, &h)) != srs_success) {
        return srs_error_wrap(err, "find handler");
    }
    
    srs_assert(h);
    if ((err = h->serve_http(w, r)) != srs_success) {
        return srs_error_wrap(err, "serve http");
    }
    
    return err;
}

find_handler 就是查找http_api开始注册的信息了

srs_error_t SrsRtcServer::listen_api()
{
    srs_error_t err = srs_success;

    // TODO: FIXME: Fetch api from hybrid manager, not from SRS.
    SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();

    // kuaishou api
    if ((err = http_api_mux->handle("/webrtc/v1/pullstream", new SrsGoApiRtcPlay(this))) != srs_success) {
        return srs_error_wrap(err, "handle play");
    }

    if ((err = http_api_mux->handle("/webrtc/v1/stopstream", new SrsGoApiRtcStop(this))) != srs_success) {
        return srs_error_wrap(err, "handle stop");
    }

    // srs native api
    if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
        return srs_error_wrap(err, "handle play");
    }

    if ((err = http_api_mux->handle("/rtc/v1/stop/", new SrsGoApiRtcStop(this))) != srs_success) {
        return srs_error_wrap(err, "handle stop");
    }

    if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
        return srs_error_wrap(err, "handle publish");
    }

#ifdef SRS_SIMULATOR
    if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) {
        return srs_error_wrap(err, "handle nack");
    }
#endif

    return err;
}

如果此时客户端请求 http://172.18.153.234:1985/webrtc/v1/pullstream,则就调用了 SrsGoApiRtcPlay,这样就调到了 SrsGoApiRtcPlay::serve_http了

srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
    srs_error_t err = srs_success;

    SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
    SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
    hc->set_traffic_flag(true);

    SrsJsonObject* res = SrsJsonAny::object();
    SrsAutoFree(SrsJsonObject, res);

    if ((err = do_serve_http(w, r, res)) != srs_success) {
        srs_warn("RTC error %s", srs_error_desc(err).c_str());
#if 0
        return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
#else
        // for different client, we can generate different err response.
        generate_ks_err_response(res, err);
        srs_freep(err);
#endif
    }

    return srs_api_response(w, r, res->dumps());
}

SrsGoApiRtcPlay::do_serve_http
我们来看下 do_serve_http里面做了什么
ruc.remote_sdp.parse 解析远端的 sdp
check_remote_sdp 检查sdp , 而后创建session,SrsRtcServer::create_session,create_session里面调用do_create_session

srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* conf, SrsSdp& local_sdp, SrsRtcConnection* session)
{
    srs_error_t err = srs_success;

    SrsRequest* req = conf->req;

    // first add publisher/player for negotiate sdp media info
    if (conf->publish) {
        if ((err = session->add_publisher(conf, local_sdp)) != srs_success) {
            return srs_error_wrap(err, "add publisher");
        }
    } else {
        if ((err = session->add_player(conf, local_sdp)) != srs_success) {
            return srs_error_wrap(err, "add player");
        }
    }

    // All tracks default as inactive, so we must enable them.
    session->set_all_tracks_status(req->get_stream_url(), conf->publish, true);

    //local_pwd 是个随机数,作为answer sdp中的ice_ufrag
    std::string local_pwd = srs_random_str(32);
    std::string local_ufrag = "";
    // TODO: FIXME: Rename for a better name, it's not an username.
    std::string username = "";
    while (true) {
        local_ufrag = srs_random_str(8);

        username = local_ufrag + ":" + conf->remote_sdp.get_ice_ufrag();
        if (!_srs_rtc_manager->find_by_name(username)) {
            break;
        }
    }

    local_sdp.set_ice_ufrag(local_ufrag);
    local_sdp.set_ice_pwd(local_pwd);
    local_sdp.set_fingerprint_algo("sha-256");
    local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());

    // We allows to mock the eip of server.
    if (!conf->eip.empty()) {
        string host;
        int port = _srs_config->get_rtc_server_listen();
        srs_parse_hostport(conf->eip, host, port);

        local_sdp.add_candidate(host, port, "host");
        srs_trace("RTC: Use candidate mock_eip %s as %s:%d", conf->eip.c_str(), host.c_str(), port);
    } else {
        std::vector<string> candidate_ips = get_candidate_ips();
        for (int i = 0; i < (int)candidate_ips.size(); ++i) {
            local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(), "host");
        }
        srs_trace("RTC: Use candidates %s", srs_join_vector_string(candidate_ips, ", ").c_str());
    }

    // Setup the negotiate DTLS by config.
    local_sdp.session_negotiate_ = local_sdp.session_config_;

    // Setup the negotiate DTLS role.
    if (conf->remote_sdp.get_dtls_role() == "active") {
        local_sdp.session_negotiate_.dtls_role = "passive";
    } else if (conf->remote_sdp.get_dtls_role() == "passive") {
        local_sdp.session_negotiate_.dtls_role = "active";
    } else if (conf->remote_sdp.get_dtls_role() == "actpass") {
        local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role;
    } else {
        // @see: https://tools.ietf.org/html/rfc4145#section-4.1
        // The default value of the setup attribute in an offer/answer exchange
        // is 'active' in the offer and 'passive' in the answer.
        local_sdp.session_negotiate_.dtls_role = "passive";
    }
    local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role);

    session->set_remote_sdp(conf->remote_sdp);
    // We must setup the local SDP, then initialize the session object.
    session->set_local_sdp(local_sdp);
    session->set_state(WAITING_STUN);

    // Before session initialize, we must setup the local SDP.
    if ((err = session->initialize(req, conf->dtls, conf->srtp, username)) != srs_success) {
        return srs_error_wrap(err, "init");
    }

    // We allows username is optional, but it never empty here.
    _srs_rtc_manager->add_with_name(username, session);

    return err;
}

函数里面会进行sdp 协商并创建完整的 local_sdp,local_sdp就是要返回给web端的answer sdp。
_srs_rtc_manager->add_with_name(username, session);
username 作为key保存session,username是由loacl sdp 中的ice-ufrag和remote sdp中的ice-ufrag拼接成的字符串,为什么要用它俩作为key呢,是因为stun包中的username也是ice-ufrag。当收到stun包时,就可以基于stun包中的username确定发送方是哪个用户,从而确定该用户的udp协议的ip和port了。
最后 srs_api_response发送 sdp 给客户端。
此时客户端收到服务器的sdp信息,解析检查成功后,就可以进行stun的连通性检查了,自此http_api的调用就结束了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,192评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,858评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,517评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,148评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,162评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,905评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,537评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,439评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,956评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,083评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,218评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,899评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,565评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,093评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,201评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,539评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,215评论 2 358

推荐阅读更多精彩内容