Cowboy源码解读

1、cowboy主要依赖ranch和cowlib,
ranch在前面已经讲解了,cowlib主要是处理http协议里的一些数据
代码先行

-spec start_clear(ranch:ref(), ranch:opts(), opts())
    -> {ok, pid()} | {error, any()}.
start_clear(Ref, TransOpts0, ProtoOpts0) ->
    TransOpts1 = ranch:normalize_opts(TransOpts0),
    {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
    ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
    ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).

回调模块cowboy_clear,收到的数据都会从这里开始

-module(cowboy_clear).
-behavior(ranch_protocol).

-export([start_link/4]).
-export([connection_process/4]).

-spec start_link(ranch:ref(), inet:socket(), module(), cowboy:opts()) -> {ok, pid()}.
start_link(Ref, _Socket, Transport, Opts) ->
    Pid = proc_lib:spawn_link(?MODULE, connection_process,
        [self(), Ref, Transport, Opts]),
    {ok, Pid}.

-spec connection_process(pid(), ranch:ref(), module(), cowboy:opts()) -> ok.
connection_process(Parent, Ref, Transport, Opts) ->
    ProxyInfo = case maps:get(proxy_header, Opts, false) of
        true ->
            {ok, ProxyInfo0} = ranch:recv_proxy_header(Ref, 1000),
            ProxyInfo0;
        false ->
            undefined
    end,
    {ok, Socket} = ranch:handshake(Ref),
    init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http).

init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) ->
    _ = case maps:get(connection_type, Opts, supervisor) of
        worker -> ok;
        supervisor -> process_flag(trap_exit, true)
    end,
    Protocol:init(Parent, Ref, Socket, Transport, ProxyInfo, Opts).

调用 cowboy_http:init -> before_loop -> loop

loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
        timer=TimerRef, children=Children, in_streamid=InStreamID,
        last_streamid=LastStreamID, streams=Streams}, Buffer) ->
    {OK, Closed, Error} = Transport:messages(),
    InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
    receive
        %% Discard data coming in after the last request
        %% we want to process was received fully.
        {OK, Socket, _} when InStreamID > LastStreamID ->
            before_loop(State, Buffer);
        %% Socket messages.
        {OK, Socket, Data} ->
            %% Only reset the timeout if it is idle_timeout (active streams).
            State1 = case Streams of
                [] -> State;
                _ -> set_timeout(State)
            end,
            parse(<< Buffer/binary, Data/binary >>, State1);
        {Closed, Socket} ->
            terminate(State, {socket_error, closed, 'The socket has been closed.'});
        {Error, Socket, Reason} ->
            terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
        %% Timeouts.
        {timeout, Ref, {shutdown, Pid}} ->
            cowboy_children:shutdown_timeout(Children, Ref, Pid),
            loop(State, Buffer);
        {timeout, TimerRef, Reason} ->
            timeout(State, Reason);
        {timeout, _, _} ->
            loop(State, Buffer);
        %% System messages.
        {'EXIT', Parent, Reason} ->
            terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
        %% Messages pertaining to a stream.
        {{Pid, StreamID}, Msg} when Pid =:= self() ->
            loop(info(State, StreamID, Msg), Buffer);
        %% Exit signal from children.
        Msg = {'EXIT', Pid, _} ->
            loop(down(State, Pid, Msg), Buffer);
        %% Calls from supervisor module.
        {'$gen_call', From, Call} ->
            cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
            loop(State, Buffer);
        %% Unknown messages.
        Msg ->
            cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
            loop(State, Buffer)
    after InactivityTimeout ->
        terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
    end.

在这等着收socket数据
当匹配到{OK, Socket, Data} ,然后处理数据

parse(<<>>, State) ->
    before_loop(State, <<>>);
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
        last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
    before_loop(State, <<>>);
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
    after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
    after_parse(parse_header(Buffer,
        State#state{in_state=PS#ps_header{headers=undefined}},
        Headers));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
    after_parse(parse_hd_before_value(Buffer,
        State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
        Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
    %% @todo We do not want to get the body automatically if the request doesn't ask for it.
    %% We may want to get bodies that are below a threshold without waiting, and buffer them
    %% until the request asks, though.
    after_parse(parse_body(Buffer, State)).

我们直接看after_parse

after_parse({request, Req=#{streamid := StreamID, method := Method,
        headers := Headers, version := Version},
        State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
            io:format("after_parse request~n"),
    try cowboy_stream:init(StreamID, Req, Opts) of
        {Commands, StreamState} ->
            TE = maps:get(<<"te">>, Headers, undefined),
            Streams = [#stream{id=StreamID, state=StreamState,
                method=Method, version=Version, te=TE}|Streams0],
            State1 = case maybe_req_close(State0, Headers, Version) of
                close -> State0#state{streams=Streams, last_streamid=StreamID};
                keepalive -> State0#state{streams=Streams}
            end,
            State = set_timeout(State1),
            parse(Buffer, commands(State, StreamID, Commands))
    catch Class:Exception ->
        cowboy:log(cowboy_stream:make_error_log(init,
            [StreamID, Req, Opts],
            Class, Exception, erlang:get_stacktrace()), Opts),
        early_error(500, State0, {internal_error, {Class, Exception},
            'Unhandled exception in cowboy_stream:init/3.'}, Req),
        parse(Buffer, State0)
    end;

cowboy_stream:init,需要注意下处理完了,还会去解析parse(Buffer, commands(State, StreamID, Commands)),如:body里面的数据

-spec init(streamid(), cowboy_req:req(), cowboy:opts())
    -> {commands(), {module(), state()} | undefined}.
init(StreamID, Req, Opts) ->
    case maps:get(stream_handlers, Opts, [cowboy_stream_h]) of
        [] ->
            {[], undefined};
        [Handler|Tail] ->
            %% We call the next handler and remove it from the list of
            %% stream handlers. This means that handlers that run after
            %% it have no knowledge it exists. Should user require this
            %% knowledge they can just define a separate option that will
            %% be left untouched.
            {Commands, State} = Handler:init(StreamID, Req, Opts#{stream_handlers => Tail}),
            {Commands, {Handler, State}}
    end.

默认会调用 cowboy_stream_h:init

-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
    -> {[{spawn, pid(), timeout()}], #state{}}.
init(StreamID, Req=#{ref := Ref}, Opts) ->
    Env = maps:get(env, Opts, #{}),
    Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
    Shutdown = maps:get(shutdown_timeout, Opts, 5000),
    Pid = proc_lib:spawn_link(?MODULE, request_process, [Req, Env, Middlewares]),
    Expect = expect(Req),
    {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts),
    {[{spawn, Pid, Shutdown}|Commands],
        #state{next=Next, ref=Ref, pid=Pid, expect=Expect}}.

注意下
Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
默认的中间件模块
然后启动一个进程,执行 request_process 这个函数

%% Request process.

%% We catch all exceptions in order to add the stacktrace to
%% the exit reason as it is not propagated by proc_lib otherwise
%% and therefore not present in the 'EXIT' message. We want
%% the stacktrace in order to simplify debugging of errors.
%%
%% This + the behavior in proc_lib means that we will get a
%% {Reason, Stacktrace} tuple for every exceptions, instead of
%% just for errors and throws.
%%
%% @todo Better spec.
-spec request_process(_, _, _) -> _.
request_process(Req, Env, Middlewares) ->
    OTP = erlang:system_info(otp_release),
    try
        execute(Req, Env, Middlewares)
    catch
        exit:Reason ->
            Stacktrace = erlang:get_stacktrace(),
            erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
        %% OTP 19 does not propagate any exception stacktraces,
        %% we therefore add it for every class of exception.
        _:Reason when OTP =:= "19" ->
            Stacktrace = erlang:get_stacktrace(),
            erlang:raise(exit, {Reason, Stacktrace}, Stacktrace);
        %% @todo I don't think this clause is necessary.
        Class:Reason ->
            erlang:raise(Class, Reason, erlang:get_stacktrace())
    end.

%% @todo
%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
%   -> ok.
-spec execute(_, _, _) -> _.
execute(_, _, []) ->
    ok; %% @todo Maybe error reason should differ here and there.
execute(Req, Env, [Middleware|Tail]) ->
    case Middleware:execute(Req, Env) of
        {ok, Req2, Env2} ->
            execute(Req2, Env2, Tail);
        {suspend, Module, Function, Args} ->
            proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
        {stop, _Req2} ->
            ok %% @todo Maybe error reason should differ here and there.
    end.

然后执行cowboy_router:execute 这个主要是解析匹配路由数据

-spec execute(Req, Env)
    -> {ok, Req, Env} | {stop, Req}
    when Req::cowboy_req:req(), Env::cowboy_middleware:env().
execute(Req=#{host := Host, path := Path}, Env=#{dispatch := Dispatch}) ->
    case match(Dispatch, Host, Path) of
        {ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} ->
            {ok, Req#{
                host_info => HostInfo,
                path_info => PathInfo,
                bindings => Bindings
            }, Env#{
                handler => Handler,
                handler_opts => HandlerOpts
            }};
        {error, notfound, host} ->
            {stop, cowboy_req:reply(400, Req)};
        {error, badrequest, path} ->
            {stop, cowboy_req:reply(400, Req)};
        {error, notfound, path} ->
            {stop, cowboy_req:reply(404, Req)}
    end.

匹配到处理的Handler 放入Env
再执行cowboy_handler:execute

-spec execute(Req, Env) -> {ok, Req, Env}
    when Req::cowboy_req:req(), Env::cowboy_middleware:env().
execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
    try Handler:init(Req, HandlerOpts) of
        {ok, Req2, State} ->
            Result = terminate(normal, Req2, State, Handler),
            {ok, Req2, Env#{result => Result}};
        {Mod, Req2, State} ->
            Mod:upgrade(Req2, Env, Handler, State);
        {Mod, Req2, State, Opts} ->
            Mod:upgrade(Req2, Env, Handler, State, Opts)
    catch Class:Reason ->
        terminate({crash, Class, Reason}, Req, HandlerOpts, Handler),
        erlang:raise(Class, Reason, erlang:get_stacktrace())
    end.

然后再调用你路由表中的Handler模块的 init 函数

如果是websocket会有个upgrade,注意下

提升方式:使用cowboy下的examples,然后自己跟踪下代码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352