[Erlang开发之路]二十二点三、加入gen_tcp的job_center

意义

加入gen_tcp之后,可以跨机器、网络去操作job_center

过程中遇到的问题:
clien收到rpc来的消息后,向tcp服务端发送消息,然后在接收到tcp服务端的消息之前老是先接收到进程rpc来的消息{From,Bin},也就是在第二个receive的时候收到了非服务端来的消息,导致没有匹配成功4

loopRecv(Socket)->
    receive
            {tcp,Socket,Bin}->
                    Term=binary_to_term(Bin),
                    io:format("Recv From Job_center:~p~n",[Term]);
            {tcp_closed,Socket}->
                    io:format("Drop Connect with Job_center~n");
            {From,Bin}->
                    gen_tcp:send(Socket,term_to_binary(Bin)),
                    receive
                            {tcp,Socket,Bin}->
                                    From ! binary_to_term(Bin);
                            {tcp_closed,Socket}->
                                    From ! {tcp_closed,Socket}
                    after 0->
                        do_nothings
                    end;
            _->
                    unknown
    end,
    io:format("finish a recv!~n"),
    loopRecv(Socket).

后来加入after优先处理之后就好了,因为after在第二个receive中没有匹配到的时候,直接到after去,但是我after那里do_nothings,也就是不做事,然后就回到第一个receive去匹配了,但我还是没想明白为啥会收到一条来自进程的消息。

job_center.app

{application,job_center_app,
[{description,"A Job Deliver Center 1.0"},
 {vsn,"1.0"},
 {modules,[job_center_app,job_center_server,job_center,job_center_supervisor,job_center_alarm]},
 {registered,[job_center]},
 {applications,[kernel,stdlib]},
 {mod,{job_center_app,[]}},
 {start_phases,[]}
]}.


job_center.erl (job_center核心功能

-module(job_center).
-behaviour(gen_server).
-export([loopAccept/1,loopRecv/1,code_change/3,terminate/2,test/0,start_link/0,job_getStatistic/1,stop/0,init/1,add_job/1,work_wanted/0,job_done/2,handle_call/3,handle_cast/2,handle_info/2,job_statistic/0]).
-import(gen_server,[start_link/4,call/2]).
-spec add_job(fun())->integer().

-define(debug_flag,true).
-ifdef(debug_flag).
-define(DEBUG(X),io:format("DEBUG ~p:~p ~p~n",[?MODULE,?LINE,X])).
-else.
-define(DEBUG(X),void).
-endif.


test()->
        start_link(),
        job_center:add_job(fun()->good_job end).


start_link()->
        io:format("start_link_now!~n"),
        start_link({local,?MODULE},?MODULE,[],[]).

stop()->
        call(?MODULE,stop).

init([])->
        io:format("job_center Start!~n"),
        %process_flag(trap_exit,true),
        {ok,Listen}=gen_tcp:listen(6612,[binary,{packet,4}]),
        io:format("Listen:~p~n",[Listen]),
        spawn(fun()->loopAccept(Listen) end),
        State=ets:new(?MODULE,[]),
        ets:insert(State,{job_index,0}),
        {ok,State}.
%                      gen_tcp function
loopAccept(Listen)->
    {ok,Socket}=gen_tcp:accept(Listen),
    spawn(fun()->loopAccept(Listen) end),
    io:format("new Client In!~n"),
    loopRecv(Socket).
loopRecv(Socket)->
        receive
                {tcp,Socket,Binary} when is_binary(Binary)->
                        Packet=binary_to_term(Binary),
                        io:format("Recv a message:~p~n",[Packet]),
                        Ret=case Packet of
                                {add_job,Fun}->
                                        add_job(Fun);
                                {work_wanted}->
                                        work_wanted();
                                {job_done,JobNumber,Status}->
                                        job_done(JobNumber,Status);
                                {job_statistic}->
                                        job_statistic();
                                {job_getStatistic,JobNumber}->
                                        job_getStatistic(JobNumber);
                                _->
                                        command_not_found
                        end,
                        gen_tcp:send(Socket,term_to_binary(Ret)),
                        loopRecv(Socket);
                {tcp_closed,Socket}->
                        io:format("Client Disconnent~n");
                UnKnown->
                        gen_tcp:send(Socket,term_to_binary({command_not_found,UnKnown})),
                        loopRecv(Socket)
        end,
        io:format("finish recv a msg!~n").
%                      user api
add_job(Fun)->
        call(?MODULE,{add_job,Fun}).

work_wanted()->
        call(?MODULE,{work_wanted}).

job_done(JobNumber,Status)->
        io:format("Request change Job:~p Status:~p~n",[JobNumber,Status]),
        call(?MODULE,{job_done,JobNumber,Status}).

job_statistic()->
        call(?MODULE,{job_statistic}).

job_getStatistic(Index)->
        call(?MODULE,{job_getStatistic,Index}).

%                    callback Function 
handle_call({job_getStatistic,Index},_From,State)->
        io:format("someone want to get statistic~n"),
        case ets:member(State,Index) of
            true->
                io:format("getStatistic:~p~n",[Index]),
                {reply,ets:lookup_element(State,Index,3),State};
            false->
                {reply,job_not_found,State}
        end;
handle_call({job_statistic},_From,State)->
        %未完成的任务数
        Match_not_done=ets:match(State,{'$1','$2',not_done}),
        io:format("未完成的数量:~p~n",[length(Match_not_done)]),
        Match_doing=ets:match(State,{'$1','$2',doing}),
        io:format("正在进行的数量:~p~n",[length(Match_doing)]),
        Match_done=ets:match(State,{'$1','$2',done}),
        io:format("已完成的数量:~p~n",[length(Match_done)]),
        {reply,show_all_finish,State};
handle_call({add_job,Fun},_From,State)->
        [{job_index,Index}]=ets:lookup(State,job_index),
        ets:insert(State,[{job_index,Index+1},{Index,Fun,not_done}]),
        Reply=ok,
        {reply,Reply,State};

handle_call({work_wanted},{Pid,_Ref},State)->
        Match_Ret=ets:match(State,{'$1','$2',not_done},1),
        case Match_Ret of
            '$end_of_table'->%没job了
                    Fun=void,
                    Index=-1,
                    {reply,{no_job_now,Index,Fun},State};
             {[[Index,Fun]],_}->%判断表中还有未完成的job
                    ets:update_element(State,Index,{3,doing}),%更新状态为doing正在进行
                    %-------------------检测崩溃
                    spawn_link(fun()->
                    Ref=monitor(process,Pid),
                    receive
                            {'DOWN',Ref,process,Pid,Why}->
                                    io:format("worker:~p leave for ~p ~n",[Pid,Why]),
                                    Bool=Why=:=normal,
                                    if 
                                            Bool=:=false-> 
                                                    job_center:job_done(Index,not_done);
                                            Bool=:=true->
                                                    void
                                    end
                    end
                          end),%监控是否崩溃防止崩溃时数据停留
                    %-------------------检测超时
                    spawn(fun()->
                        timer:sleep(4000),
                        case ets:lookup_element(State,Index,3) of
                                    doing->Pid ! {hurry_up};
                                    _->ok
                        end,
                        timer:sleep(2000),
                        exit(Pid,you_are_fired)
                          end),%检测超时,超过6秒就炒鱿鱼
                    {reply,{get_job_succ,Index,Fun,5000},State}
        end;
        

handle_call({job_done,JobNumber,Status},_From,State)->
        case ets:member(State,JobNumber) of%首先判断这个键值是否存在,存在才可以去lookup否则报错
                true->
                        Status_change_before=ets:lookup_element(State,JobNumber,3),
                        %io:format("Match:~p~n",[Status_change_before]),
                        case Status_change_before of
                                done->
                                        Reply=aleady_done;
                                doing->
                                        Reply=good_job,
                                        io:format("comfirm to change the status:~p of Index~p~n",[Status,JobNumber]),
                                        ets:update_element(State,JobNumber,{3,Status});
                                not_done->
                                        Reply=did_not_been_got
                        end,
                        {reply,Reply,State};
                false->
                        {reply,job_not_found,State}
        end;
handle_call(stop,_From,State)->
        io:format("good bye!~n"),
        {stop,normal,stopped,State}.

handle_cast(_Msg,State)->
        {noreply,State}.

handle_info(_Info,State)->
        {noreply,State}.

terminate(_Reason,_State)->ok.

code_change(_OldVsn,State,_Extra)->{ok,State}.

job_center_alarm.erl (警报处理器

-module(job_center_alarm).
-export([init/1,handle_event/2,handle_call/2,handle_info/2,code_change/3,terminate/2]).
-behaviour(gen_event).

init(Args)->{ok,0}.

handle_event({set_alarm,What},State)->
        io:format("Alarm comming:~p!~n",[What]),
        {ok,State};

handle_event({clear_alarm,What},State)->
        io:format("Alarm_clean:~p!~n",[What]),
        {ok,State}.

handle_call(_Req,State)->
        {ok,ignore,State}.

handle_info(_Info,State)->
        {ok,State}.

code_change(_OldVsn,State,_Extra)->{ok,State}.

terminate(_Reason,_State)->ok.

job_center_app.erl (application打包管理

-module(job_center_app).
-behaviour(application).
-export([start/2,stop/1]).
start(_Type,StartArgs)->
        job_center_supervisor:start_link(StartArgs).
stop(_State)->
        ok.

job_center_client.erl (Tcp客户端

-module(job_center_client).
-export([start_client/0,loopRecv/1,rpc/1,job_done/2,job_statistic/0,job_getStatistic/1,work_wanted/0,add_job/1]).

start_client()->
        case gen_tcp:connect("localhost",6612,[binary,{packet,4}]) of
                {ok,Socket}->
                        register(job_center_client,spawn(fun()->loopRecv(Socket) end));
                {error,Desc}->
                        io:format("connect error!~p~n",[Desc])
        end.
loopRecv(Socket)->
    receive
            {tcp,Socket,Bin}->
                    Term=binary_to_term(Bin),
                    io:format("Recv From Job_center:~p~n",[Term]);
            {tcp_closed,Socket}->
                    io:format("Drop Connect with Job_center~n");
            {process,From,Bin}->
                    gen_tcp:send(Socket,term_to_binary(Bin)),
                    receive
                            {tcp,Socket,Bin}->
                                    From ! binary_to_term(Bin);
                            {tcp_closed,Socket}->
                                    From ! {tcp_closed,Socket}
                    after 0->
                        %gen_tcp:send(Socket,term_to_binary({err}))
                        do_nothing
                    end;
            _->
                    unknown
    end,
    io:format("finish a recv!~n"),
    loopRecv(Socket).
job_done(Index,State)->
        rpc({job_done,Index,State}).
job_statistic()->
        rpc({job_statistic}).
job_getStatistic(Index)->
        rpc({job_getStatistic,Index}).
add_job(Fun)->
        rpc({add_job,Fun}).
work_wanted()->
        rpc({work_wanted}).
rpc(Req)->
        io:format("start to send~n"),
        job_center_client ! {process,self(),Req},
        io:format("send finish!~n"),
        receive 
                {tcp_closed,_Socket}->
                        io:format("Drop Connect with Job_center~n");
                {tcp,_Socket,Bin}->
                        io:format("Ret:~p~n",[binary_to_term(Bin)]);
                Any->
                        io:format("unknow:~p~n",[Any])
        end,
        io:format("receive finish").


job_center_server.erl(job_center服务端

-module(job_center_server).
-export([start_server/0,loopAccept/1,loopRecv/1]).

start_server()->
        {ok,Listen}=gen_tcp:listen(6612,[binary,{packet,4},{reuseaddr,true},{active,true}]),
        spawn(fun()-> loopAccept(Listen) end).


loopAccept(Listen)->
    {ok,Socket}=gen_tcp:accept(Listen),
    spawn(fun()->loopAccept(Listen) end),
    loopRecv(Socket).

loopRecv(Socket)->
        receive
                {tcp,Socket,Binary}->
                        Packet=binary_to_term(Binary),
                        Ret=rpc:call(job_center,Packet),
                        gen_tcp:send(Socket,term_to_binary(Ret)),
                        loopRecv(Socket);
                {tcp_close,Socket}->
                        io:format("Server socket closed!~n",[])
        end.    

job_center_supervisor.erl (job_center监控树

-module(job_center_supervisor).
-export([start/0,test/0,init/1,start_link/1]).
-behaviour(supervisor).
-define(MAXRESTARTS,3).
-define(TIME,10).
start()->
        spawn(fun()-> supervisor:start_link({local,?MODULE},?MODULE,[]) end).
start_link(Args)->
        supervisor:start_link({local,?MODULE},?MODULE,Args).
test()->
        {ok,Pid}=supervisor:start_link({local,?MODULE},?MODULE,[]),
        unlink(Pid).
init([])->
        gen_event:swap_handler(alarm_handler,{alarm_handler,swap},{job_center_alarm,[]}),
        {ok,{{one_for_one,?MAXRESTARTS,?TIME},
        [{job_center,
         {job_center,start_link,[]},
         permanent,
         10000,
         worker,
         [job_center]
         }
        ]}}.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容