首先,http_api 监听和rtmp的监听是一样的
创建http_api的监听
srs_error_t SrsServer::listen_http_api()
{
srs_error_t err = srs_success;
close_listeners(SrsListenerHttpApi);
if (_srs_config->get_http_api_enabled()) {
SrsListener* listener = new SrsBufferListener(this, SrsListenerHttpApi);
listeners.push_back(listener);
std::string ep = _srs_config->get_http_api_listen();
std::string ip;
int port;
srs_parse_endpoint(ep, ip, port);
if ((err = listener->listen(ip, port)) != srs_success) {
return srs_error_wrap(err, "http api listen %s:%d", ip.c_str(), port);
}
}
return err;
}
同上一篇rtmp一样,调用
SrsBufferListener::listen -> SrsTcpListener::listen -> SrsTcpListener::cycle
调用srs_accept等待客户端的连接,然后调用SrsBufferListener::on_tcp_client进行处理请求,然后根据SrsListenerType type走不同的分支
if (type == SrsListenerRtmpStream) {
*pr = new SrsRtmpConn(this, stfd, ip, port);
} else if (type == SrsListenerHttpApi) {
*pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpsApi) {
*pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port);
} else if (type == SrsListenerHttpStream) {
*pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port);
} else if (type == SrsListenerHttpsStream) {
*pr = new SrsResponseOnlyHttpConn(true, this, stfd, http_server, ip, port);
} else {
srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);
srs_close_stfd(stfd);
return err;
}
http_api就是SrsListenerHttpApi,创建了 SrsHttpApi 并调用 SrsHttpApi::start()
之后就创建协程调用SrsHttpConn::do_cycle
srs_error_t SrsHttpConn::do_cycle()
{
srs_error_t err = srs_success;
// set the recv timeout, for some clients never disconnect the connection.
// @see https://github.com/ossrs/srs/issues/398
skt->set_recv_timeout(SRS_HTTP_RECV_TIMEOUT);
// initialize parser
if ((err = parser->initialize(HTTP_REQUEST)) != srs_success) {
return srs_error_wrap(err, "init parser for %s", ip.c_str());
}
// Notify the handler that we are starting to process the connection.
if ((err = handler_->on_start()) != srs_success) {
return srs_error_wrap(err, "start");
}
SrsRequest* last_req = NULL;
SrsAutoFree(SrsRequest, last_req);
// process all http messages.
err = process_requests(&last_req);
srs_error_t r0 = srs_success;
if ((r0 = on_disconnect(last_req)) != srs_success) {
err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
srs_freep(r0);
}
return err;
}
进入 SrsHttpConn::process_requests - > SrsHttpConn::process_request -> SrsHttpCorsMux::serve_http
srs_error_t SrsHttpConn::process_requests(SrsRequest** preq)
{
srs_error_t err = srs_success;
for (int req_id = 0; ; req_id++) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "pull");
}
// get a http message
ISrsHttpMessage* req = NULL;
if ((err = parser->parse_message(skt, &req)) != srs_success) {
return srs_error_wrap(err, "parse message");
}
// if SUCCESS, always NOT-NULL.
// always free it in this scope.
srs_assert(req);
SrsAutoFree(ISrsHttpMessage, req);
// Attach owner connection to message.
SrsHttpMessage* hreq = (SrsHttpMessage*)req;
hreq->set_connection(this);
// copy request to last request object.
srs_freep(*preq);
*preq = hreq->to_request(hreq->host());
// may should discard the body.
SrsHttpResponseWriter writer(skt);
if ((err = handler_->on_http_message(req, &writer)) != srs_success) {
return srs_error_wrap(err, "on http message");
}
// ok, handle http request.
if ((err = process_request(&writer, req, req_id)) != srs_success) {
return srs_error_wrap(err, "process request=%d", req_id);
}
// After the request is processed.
if ((err = handler_->on_message_done(req, &writer)) != srs_success) {
return srs_error_wrap(err, "on message done");
}
// donot keep alive, disconnect it.
// @see https://github.com/ossrs/srs/issues/399
if (!req->is_keep_alive()) {
break;
}
}
return err;
}
srs_error_t SrsHttpConn::process_request(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, int rid)
{
srs_error_t err = srs_success;
srs_trace("HTTP #%d %s:%d %s %s, content-length=%" PRId64 "", rid, ip.c_str(), port,
r->method_str().c_str(), r->url().c_str(), r->content_length());
// use cors server mux to serve http request, which will proxy to http_remux.
if ((err = cors->serve_http(w, r)) != srs_success) {
return srs_error_wrap(err, "mux serve");
}
return err;
}
srs_error_t SrsHttpCorsMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
// If CORS enabled, and there is a "Origin" header, it's CORS.
if (enabled) {
SrsHttpHeader* h = r->header();
required = !h->get("Origin").empty();
}
// When CORS required, set the CORS headers.
if (required) {
SrsHttpHeader* h = w->header();
h->set("Access-Control-Allow-Origin", "*");
h->set("Access-Control-Allow-Methods", "GET, POST, HEAD, PUT, DELETE, OPTIONS");
h->set("Access-Control-Expose-Headers", "Server,range,Content-Length,Content-Range");
h->set("Access-Control-Allow-Headers", "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type");
}
// handle the http options.
if (r->is_http_options()) {
w->header()->set_content_length(0);
if (enabled) {
w->write_header(SRS_CONSTS_HTTP_OK);
} else {
w->write_header(SRS_CONSTS_HTTP_MethodNotAllowed);
}
return w->final_request();
}
srs_assert(next);
return next->serve_http(w, r);
}
调用到 SrsHttpCorsMux::serve_http ,此时就需要知道 next是什么了
SrsHttpApi::SrsHttpApi(bool https, ISrsResourceManager* cm, srs_netfd_t fd, SrsHttpServeMux* m, string cip, int port)
{
// Create a identify for this client.
_srs_context->set_id(_srs_context->generate_id());
manager = cm;
skt = new SrsTcpConnection(fd);
if (https) {
ssl = new SrsSslConnection(skt);
conn = new SrsHttpConn(this, ssl, m, cip, port);
} else {
ssl = NULL;
conn = new SrsHttpConn(this, skt, m, cip, port);
}
_srs_config->subscribe(this);
}
SrsHttpConn::SrsHttpConn(ISrsHttpConnOwner* handler, ISrsProtocolReadWriter* fd, ISrsHttpServeMux* m, string cip, int cport)
{
parser = new SrsHttpParser();
cors = new SrsHttpCorsMux();
http_mux = m;
handler_ = handler;
skt = fd;
ip = cip;
port = cport;
create_time = srsu2ms(srs_get_system_time());
clk = new SrsWallClock();
kbps = new SrsKbps(clk);
kbps->set_io(skt, skt);
trd = new SrsSTCoroutine("http", this, _srs_context->get_id());
traffic_flag_ = false;
}
srs_error_t SrsHttpConn::set_crossdomain_enabled(bool v)
{
srs_error_t err = srs_success;
// initialize the cors, which will proxy to mux.
if ((err = cors->initialize(http_mux, v)) != srs_success) {
return srs_error_wrap(err, "init cors");
}
return err;
}
从上面的代码,创建 SrsHttpApi 的时候,会把SrsHttpServeMux指针赋值过去,SrsHttpCorsMux调用initialize,会把SrsHttpServeMux指针赋值过去,所以next就是 SrsHttpServeMux*next了, 所以就调到了 SrsHttpServeMux::serve_http
srs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
ISrsHttpHandler* h = NULL;
if ((err = find_handler(r, &h)) != srs_success) {
return srs_error_wrap(err, "find handler");
}
srs_assert(h);
if ((err = h->serve_http(w, r)) != srs_success) {
return srs_error_wrap(err, "serve http");
}
return err;
}
find_handler 就是查找http_api开始注册的信息了
srs_error_t SrsRtcServer::listen_api()
{
srs_error_t err = srs_success;
// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
SrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();
// kuaishou api
if ((err = http_api_mux->handle("/webrtc/v1/pullstream", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle play");
}
if ((err = http_api_mux->handle("/webrtc/v1/stopstream", new SrsGoApiRtcStop(this))) != srs_success) {
return srs_error_wrap(err, "handle stop");
}
// srs native api
if ((err = http_api_mux->handle("/rtc/v1/play/", new SrsGoApiRtcPlay(this))) != srs_success) {
return srs_error_wrap(err, "handle play");
}
if ((err = http_api_mux->handle("/rtc/v1/stop/", new SrsGoApiRtcStop(this))) != srs_success) {
return srs_error_wrap(err, "handle stop");
}
if ((err = http_api_mux->handle("/rtc/v1/publish/", new SrsGoApiRtcPublish(this))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}
#ifdef SRS_SIMULATOR
if ((err = http_api_mux->handle("/rtc/v1/nack/", new SrsGoApiRtcNACK(this))) != srs_success) {
return srs_error_wrap(err, "handle nack");
}
#endif
return err;
}
如果此时客户端请求 http://172.18.153.234:1985/webrtc/v1/pullstream,则就调用了 SrsGoApiRtcPlay,这样就调到了 SrsGoApiRtcPlay::serve_http了
srs_error_t SrsGoApiRtcPlay::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsHttpConn* hc = dynamic_cast<SrsHttpConn*>(hr->connection());
hc->set_traffic_flag(true);
SrsJsonObject* res = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, res);
if ((err = do_serve_http(w, r, res)) != srs_success) {
srs_warn("RTC error %s", srs_error_desc(err).c_str());
#if 0
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
#else
// for different client, we can generate different err response.
generate_ks_err_response(res, err);
srs_freep(err);
#endif
}
return srs_api_response(w, r, res->dumps());
}
SrsGoApiRtcPlay::do_serve_http
我们来看下 do_serve_http里面做了什么
ruc.remote_sdp.parse 解析远端的 sdp
check_remote_sdp 检查sdp , 而后创建session,SrsRtcServer::create_session,create_session里面调用do_create_session
srs_error_t SrsRtcServer::do_create_session(SrsRtcUserConfig* conf, SrsSdp& local_sdp, SrsRtcConnection* session)
{
srs_error_t err = srs_success;
SrsRequest* req = conf->req;
// first add publisher/player for negotiate sdp media info
if (conf->publish) {
if ((err = session->add_publisher(conf, local_sdp)) != srs_success) {
return srs_error_wrap(err, "add publisher");
}
} else {
if ((err = session->add_player(conf, local_sdp)) != srs_success) {
return srs_error_wrap(err, "add player");
}
}
// All tracks default as inactive, so we must enable them.
session->set_all_tracks_status(req->get_stream_url(), conf->publish, true);
//local_pwd 是个随机数,作为answer sdp中的ice_ufrag
std::string local_pwd = srs_random_str(32);
std::string local_ufrag = "";
// TODO: FIXME: Rename for a better name, it's not an username.
std::string username = "";
while (true) {
local_ufrag = srs_random_str(8);
username = local_ufrag + ":" + conf->remote_sdp.get_ice_ufrag();
if (!_srs_rtc_manager->find_by_name(username)) {
break;
}
}
local_sdp.set_ice_ufrag(local_ufrag);
local_sdp.set_ice_pwd(local_pwd);
local_sdp.set_fingerprint_algo("sha-256");
local_sdp.set_fingerprint(_srs_rtc_dtls_certificate->get_fingerprint());
// We allows to mock the eip of server.
if (!conf->eip.empty()) {
string host;
int port = _srs_config->get_rtc_server_listen();
srs_parse_hostport(conf->eip, host, port);
local_sdp.add_candidate(host, port, "host");
srs_trace("RTC: Use candidate mock_eip %s as %s:%d", conf->eip.c_str(), host.c_str(), port);
} else {
std::vector<string> candidate_ips = get_candidate_ips();
for (int i = 0; i < (int)candidate_ips.size(); ++i) {
local_sdp.add_candidate(candidate_ips[i], _srs_config->get_rtc_server_listen(), "host");
}
srs_trace("RTC: Use candidates %s", srs_join_vector_string(candidate_ips, ", ").c_str());
}
// Setup the negotiate DTLS by config.
local_sdp.session_negotiate_ = local_sdp.session_config_;
// Setup the negotiate DTLS role.
if (conf->remote_sdp.get_dtls_role() == "active") {
local_sdp.session_negotiate_.dtls_role = "passive";
} else if (conf->remote_sdp.get_dtls_role() == "passive") {
local_sdp.session_negotiate_.dtls_role = "active";
} else if (conf->remote_sdp.get_dtls_role() == "actpass") {
local_sdp.session_negotiate_.dtls_role = local_sdp.session_config_.dtls_role;
} else {
// @see: https://tools.ietf.org/html/rfc4145#section-4.1
// The default value of the setup attribute in an offer/answer exchange
// is 'active' in the offer and 'passive' in the answer.
local_sdp.session_negotiate_.dtls_role = "passive";
}
local_sdp.set_dtls_role(local_sdp.session_negotiate_.dtls_role);
session->set_remote_sdp(conf->remote_sdp);
// We must setup the local SDP, then initialize the session object.
session->set_local_sdp(local_sdp);
session->set_state(WAITING_STUN);
// Before session initialize, we must setup the local SDP.
if ((err = session->initialize(req, conf->dtls, conf->srtp, username)) != srs_success) {
return srs_error_wrap(err, "init");
}
// We allows username is optional, but it never empty here.
_srs_rtc_manager->add_with_name(username, session);
return err;
}
函数里面会进行sdp 协商并创建完整的 local_sdp,local_sdp就是要返回给web端的answer sdp。
_srs_rtc_manager->add_with_name(username, session);
username 作为key保存session,username是由loacl sdp 中的ice-ufrag和remote sdp中的ice-ufrag拼接成的字符串,为什么要用它俩作为key呢,是因为stun包中的username也是ice-ufrag。当收到stun包时,就可以基于stun包中的username确定发送方是哪个用户,从而确定该用户的udp协议的ip和port了。
最后 srs_api_response发送 sdp 给客户端。
此时客户端收到服务器的sdp信息,解析检查成功后,就可以进行stun的连通性检查了,自此http_api的调用就结束了