EMQX源码阅读笔记一

一、EMQX项目简介

EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。

二、协议简介

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。
MQTT官网: http://mqtt.org
MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
这里我们一切从简,直接上图:

协议流程图(图一)

MQTT的控制包类型(MQTT Control Packet type)包含:CONNECT、CONNACK、PUBLISH、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK、PINGREQ、PINGRESP、DISCONNECT,基本上都在图中体现了,这些协议包在图中的流转基本上实现了MQTT定义的连接、断开、订阅、取消订阅、发布的功能。

三、代码阅读

本篇笔记仅仅初探EMQX(3.0版本),主要包含连接层与会话层代码分析、订阅/取消订阅过程分析、发布路由跳转,因为篇幅有限,通篇点到为止。

一、连接层与会话层

这里相关代码也比较多,这里主要关注emqx_connection.erl、emqx_protocol.erl、emqx_session.erl、emqx_broker.erl。
每一个设备连接进来都会有两个进程,一个是连接进程(conn),一个是会话进程(session),连接进程的ID我们成为ConnPid,会话进程的ID我们成为SPid。

  • 连接进程:主要负责报文的收发、解析。
  • 会话进程:处理 MQTT 协议发布订阅(Publish/Subscribe)业务交互流程,处理 Qos0/1/2 消息接收与下发,消息超时重传与离线消息保存,通过飞行窗口(Inflight Window),实现下发消息吞吐控制与顺序保证。

一、流程图

下面是我根据代码画出的流程图:


连接、订阅、取消订阅、断连(图二)
发布(图三)

图例说明:

  1. Device 表示连接服务的Client端,Conn表示服务端连接进程,Session表示会话进程。
  2. 粗实线表Client发送服务端的协议包;还表示服务中的同步处理。
  3. 细虚线表示服务端发给Client端的协议包。
  4. 粗虚线表示服务端的异步调用。

1、连接

设备与服务端建立长链接后,会派生出一个Conn进程来维护设备与服务端的通信。
客户端收到CONNECT协议包,校验完权限后会建立session,session进程建立成功后会与conn进程建立双向绑定,随后conn进程给设备返回CONNACK协议包。
我们看一下两个进程的state
conn进程

{status,<0.2190.0>,
        {module,gen_server},
        [[{incoming_bytes,166},
          {{subscribe,<<"mqttbroker/xxx">>},{allow,1570872922716}},
          {'$ancestors',[<0.1876.0>,<0.1875.0>,esockd_sup,<0.1555.0>]},
          {force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}},
          {acl_keys_q,{[{subscribe,<<"mqttbroker/xxx">>}],[]}},
          {rand_seed,{#{bits => 58,jump => #Fun<rand.8.10897371>,
                        next => #Fun<rand.5.10897371>,type => exrop,
                        uniform => #Fun<rand.6.10897371>,
                        uniform_n => #Fun<rand.7.10897371>,weak_low_bits => 1},
                      [123527204234062397|227463409239782656]}},
          {'$logger_metadata$',#{client_id => <<"mqttbroker/slw">>,
                                 peername => "127.0.0.1:55952",pid => <0.2190.0>}},
          {guid,{1570872922688773,268564305021070,0}},
          {'$initial_call',{emqx_connection,init,1}},
          {acl_cache_size,1}],
         running,<0.1876.0>,[],
         [{header,"Status for generic server <0.2190.0>"},
          {data,[{"Status",running},
                 {"Parent",<0.1876.0>},
                 {"Logged events",[]}]},
          {data,[{"State",
                  #state{transport = esockd_transport,socket = #Port<0.29>,
                         peername = {{127,0,0,1},55952},
                         sockname = undefined,conn_state = running,active_n = 100,
                         proto_state = #pstate{zone = external,
                                               sendfun = #Fun<emqx_connection.0.65258536>,
                                               peername = {{127,0,0,1},55952},
                                               peercert = nossl,proto_ver = 4,proto_name = <<"MQTT">>,
                                               client_id = <<"mqttbroker/slw">>,is_assigned = false,
                                               conn_pid = <0.2190.0>,conn_props = #{},
                                               ack_props = undefined,username = <<"mqttbroker/slw">>,
                                               session = <0.2192.0>,clean_start = true,topic_aliases = #{},
                                               packet_size = 1048576,keepalive = 60,mountpoint = undefined,
                                               is_super = false,is_bridge = false,
                                               prod_key = <<"mqttbroker">>,dev_name = <<"slw">>,
                                               enable_ban = true,enable_acl = true,
                                               acl_deny_action = ignore,
                                               recv_stats = #{msg => 0,pkt => 3},
                                               send_stats = #{msg => 0,pkt => 3},
                                               connected = true,
                                               connected_at = {1570,872922,684761},
                                               ignore_loop = false,
                                               topic_alias_maximum = #{from_client => 0,to_client => 0}},
                         parser_state = {none,#{max_packet_size => 1048576,version => 4}},
                         gc_state = {emqx_gc,#{cnt => {1000,1000},
                                               oct => {1048576,1048576}}},
                         keepalive = {keepalive,#Fun<emqx_connection.1.65258536>,164,
                                                45,
                                                {keepalive,check},
                                                #Ref<0.3658081892.699924485.182073>,1},
                         enable_stats = true,stats_timer = undefined,
                         rate_limit = undefined,pub_limit = undefined,
                         limit_timer = undefined,idle_timeout = 15000}}]}]]}

{status,<0.2192.0>,
        {module,gen_server},
        [[{'$ancestors',[emqx_session_sup,emqx_sm_sup,emqx_sup,
                         <0.1561.0>]},
          {force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}},
          {'$logger_metadata$',#{client_id => <<"mqttbroker/slw">>}},
          {guid,{1570872922717372,268564305021072,0}},
          {'$initial_call',{emqx_session,init,1}}],
         running,<0.1713.0>,[],
         [{header,"Status for generic server <0.2192.0>"},
          {data,[{"Status",running},
                 {"Parent",<0.1713.0>},
                 {"Logged events",[]}]},
          {data,[{"State",
                  #state{idle_timeout = 15000,clean_start = true,
                         binding = local,client_id = <<"mqttbroker/slw">>,
                         username = <<"mqttbroker/slw">>,conn_pid = <0.2190.0>,
                         old_conn_pid = undefined,next_pkt_id = 1,
                         max_subscriptions = 0,
                         subscriptions = #{<<"mqttbroker/xxx">> =>
                                               #{nl => 0,pktid => 1,qos => 2,rap => 0,rc => 0,rh => 0,
                                                 <<"r">> => <<"1561530245902">>,
                                                 <<"s">> => <<"EAB78B0E25D6C5A5EDCEEB78ABCB7B58">>}},
                         upgrade_qos = false,
                         inflight = {emqx_inflight,32,{0,nil}},
                         retry_interval = 20000,retry_timer = undefined,
                         mqueue = {mqueue,true,1000,0,0,none,infinity,
                                          {queue,[],[],0}},
                         awaiting_rel = #{},max_awaiting_rel = 100,
                         await_rel_timeout = 300000,await_rel_timer = undefined,
                         expiry_interval = 0,expiry_timer = undefined,
                         enable_stats = true,stats_timer = undefined,
                         gc_state = {emqx_gc,#{cnt => {1000,1000},
                                               oct => {1048576,1048576}}},
                         created_at = {1570,872922,687756},
                         will_msg = undefined,will_delay_timer = undefined}}]}]]}

conn进程中有session字段,session进程也有conn字段,可以保证他们相互找到对方,进行同步/异步调用。

2、订阅、取消订阅

具体的订阅步骤我放在后面的章节来讲,这里只看conn和session进程的逻辑,这样来看订阅与取消订阅差不多,我们以订阅为例。
conn进程收到SUBSCRIBE协议包后,校验权限后,调用 emqx_session:subscribe/4,实际是想session进程cast了消息 {subscribe, self(), SubReq},session进程处理完订阅逻辑后,给conn进程发送suback消息 From ! {deliver, {suback, PacketId, ReasonCodes}}
conn进程收到session进程的deliver消息后给设备端发送SUBACK消息。

3、心跳

心跳逻辑仅用的了conn进程,即conn收到PINGREQ立刻返回PINGRESP。

4、断开连接

图二中仅仅画出了主动断开逻辑,设备端发送DISCONNECT, conn进程执行正常stop,session进程监听到EXIT消息后自动退出。

5、发布

图三中为方便理解,省掉了消息路由与分发的具体逻辑(后面章节来讲),并刻意画出了一对一发消息的流程,方便分析conn、session进程在其中扮演的角色。

  • Qos0消息发布
  1. Qos0消息比较简单,从发送方来看,ConnA收到PUBLISH协议包,调用emqx_session:publish/3,执行结果调用emqx_protocol的puback/2,该函数Qos0不做处理。
  2. 从接收方来看,分发到订阅该topic的B的步骤是向SessionB发送dispatch消息 SubPid ! {dispatch, Topic, Msg}。SessionB进程handle dispatch ,执行函数是 emqx_session:dispatch/2 ,Qos0消息直接do_deliver ,给ConnB进程发送deliver消息 ConnPid ! {deliver, {publish, PacketId, Msg}},ConnB进程收到deliver消息,调用 emqx_ptotocol:deliver/2 ,将PUBLISH协议包发送给DeviceB。
  • Qos1消息发布
  1. Qos1消息相比Qos0消息多了一个PUBACK。从发送方看,ConnA收到PUBLISH包,调用emqx_session:publish/3,调用emqx_protocol的puback/2,Qos1消息deliver PUBACK 消息,通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA。
  2. 从接收方来看,Qos1比Qos0多了一个inflight的操作。当SessionB收到 SubPid ! {dispatch, Topic, Msg}消息,在给ConnB deliver ConnPid ! {deliver, {publish, PacketId, Msg}} 消息的同时,执行emqx_inflight:insert/3操作。ConnB将PUBLISH发送给DeviceB,DeviceB会回应PUBACK消息,ConnB收到PUBACK消息的时候会执行emqx_session:puback/2,实际上就是向SessionB执行cast调用, gen_server:cast(SPid, {puback, PacketId, ReasonCode}),SessionB收到cast调用时,执行emqx_inflight:delete/2操作。
  3. inflight飞行窗口操作是下行消息确保可达和保证消息顺序的逻辑,Qos2也有此逻辑,但是稍微不同。
  • Qos2消息发布
  1. Qos2消息相比Qos0多了三次交互。从发送方看,ConnA收到PUBLISH包,emqx_session:publish/3函数执行时,先向SessionA进行call调用,gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity),等待SessionA在state中将要发布的消息插入到awaiting_rel中,再执行消息发布,用执行结果调用emqx_protocol的puback/2,ConnA会根据Qos2消息给DeviceA发送PUBREC协议包。DevcieA收到PUBREC会回应PUBREL协议包。ConnA收到协议包会执行emqx_session:pubrel/3,它会同步调用SessionA gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity) ,SessionA会将State里面的awaiting_rel 之前记录的消息删除, ConnA得到执行结果后给DeviceA发送PUBCOMP包。
    2、从接收方来看,Qos2的inflight操作略有不同。emqx_inflight:insert/3的操作时机相同,当SessionB分别收到ConnB的同步callgen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)与异步cast gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}) ,sessionB会分别操作emqx_inflight:update/3emqx_inflight:delete/2
    3、Qos2的inflight飞行窗口操作同样是下行消息确保可达和保证消息顺序的逻辑。
    4、Qos2在上行消息中比Qos1多了awaiting_rel的操作,是从发送方确保消息可达。

二、Conn和Session进程的解读

本节仍然只关注进程间的消息流转。

1、Conn进程

先看连接层代码,主要看入口与出口
emqx_connection.erl

process_incoming(Data, State) ->
    Oct = iolist_size(Data),
    ?LOG(debug, "RECV ~p", [Data]),
    emqx_pd:update_counter(incoming_bytes, Oct),
    emqx_metrics:trans(inc, 'bytes/received', Oct),
    case handle_packet(Data, State) of
        {noreply, State1} ->
            State2 = maybe_gc({1, Oct}, State1),
            {noreply, ensure_stats_timer(State2)};
        Shutdown -> Shutdown
    end.
......
%% Parse and handle packets
......
handle_packet(Data, State = #state{proto_state  = ProtoState,
                                   parser_state = ParserState,
                                   idle_timeout = IdleTimeout}) ->
    try emqx_frame:parse(Data, ParserState) of
        {more, ParserState1} ->
            {noreply, State#state{parser_state = ParserState1}, IdleTimeout};
        {ok, Packet = ?PACKET(Type), Rest} ->
            emqx_metrics:received(Packet),
            (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1),
            case emqx_protocol:received(Packet, ProtoState) of
                {ok, ProtoState1} ->
                    handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1}));
                {error, Reason} ->
                    ?LOG(warning, "Process packet error - ~p", [Reason]),
                    shutdown(Reason, State);
                {error, Reason, ProtoState1} ->
                    shutdown(Reason, State#state{proto_state = ProtoState1});
                {stop, Error, ProtoState1} ->
                    stop(Error, State#state{proto_state = ProtoState1})
            end;
        {error, Reason} ->
            ?LOG(warning, "Parse frame error - ~p", [Reason]),
            shutdown(Reason, State)
    catch
        _:Error ->
            ?LOG(warning, "Parse failed for ~p~nError data:~p", [Error, Data]),
            shutdown(parse_error, State)
    end.

emqx_protocol.erl (process_packet)

received(Packet = ?PACKET(Type), PState) ->
    PState1 = set_protover(Packet, PState),
    trace(recv, Packet),
    try emqx_packet:validate(Packet) of
        true ->
            case preprocess_properties(Packet, PState1) of
                {error, ReasonCode} ->
                    {error, ReasonCode, PState1};
                {Packet1, PState2} ->
                    process_packet(Packet1, inc_stats(recv, Type, PState2))
            end
    catch
        ......
        error : Reason ->
            deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1),
            {error, Reason, PState1}
    end.

conn进程收到上行的数据后,执行handle_packet函数,emqx_frame:parse/2函数进行协议解析,将二进制数据转成term格式的协议包(CONNECT,CONNACK,SUBSCRIBE,SUBACK等等),emqx_protocol:received/2 校验协议包,并做预处理,emqx_protocol:proccess_packet/2函数根据接收到的协议包执行协议动作。我们记住这个函数,emqx_protocol:process_packet/2,是处理设备端发来的协议包,并执行动作的重要函数。

emqx_connection.erl

handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
    case emqx_protocol:deliver(PubOrAck, ProtoState) of
        {ok, ProtoState1} ->
            State1 = State#state{proto_state = ProtoState1},
            {noreply, maybe_gc(PubOrAck, ensure_stats_timer(State1))};
        {error, Reason} ->
            shutdown(Reason, State)
    end;

emqx_prptocol.erl

deliver({connack, ReasonCode}, PState) ->
    send(?CONNACK_PACKET(ReasonCode), PState);
......
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
    trace(send, Packet),
    case SendFun(Packet, #{version => Ver}) of
        ok ->
            emqx_metrics:sent(Packet),
            {ok, inc_stats(send, Type, PState)};
        {error, Reason} ->
            {error, Reason}
    end.

从前面看下行消息有Session打过来的(比如ConnPid ! {deliver, {publish, PacketId, Msg}}),有Conn进程自己回应的(比如通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA)。这两个地方都最终会调用emqx_procotol:send/2函数。

2、Session

这里也只关注流程图上的逻辑,主要关注handle_info/2handle_call/3handle_cast/2

  • 来自Conn的订阅与取消订阅:gen_server:cast(SPid, {subscribe, self(), SubReq})gen_server:cast(SPid, {unsubscribe, self(), UnsubReq})

  • 来自路由投递的下行消息: SubPid ! {dispatch, Topic, Msg}

  • 各种来自Conn的各种回应包处理:

    • 来自左端的ConnA: gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity)gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity)
    • 来自右端的ConnB: gen_server:cast(SPid, {puback, PacketId, ReasonCode})gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode})

    来自左端的消息对应awaiting_rel的处理,来自右端的消息对inflight的处理,在没有画出流程图前,这是我曾经很迷惑的地方,现在则一目了然。

至此设备到Conn进程与Session进程之间的消息流转已经讲清楚了。

二、路由层与分布层

前面章节在将消息订阅与投递过程中偏重Conn进程与Session进程间的关系,这一节则偏重于系统如何维护路由层和分布层数据以及如何通过他们组织分布式服务并进行消息投递的。


订阅与发布(图四)

订阅发布是一个解耦的过程,图三中一对一消息发布只是为了方便理解,真正的订阅发布应该是图四这种,同一个消息可以有多个设备订阅接受,并且还可以进行通配订阅的。

一、整体介绍

1、路由层

路由层维护订阅者(subscriber)与订阅关系表(subscription),并在本节点发布订阅模式派发(Dispatch)消息:


路由层设计(图五)

消息派发到会话(Session)后,由会话负责按不同 QoS 送达消息。

路由层分为两种,普通订阅和共享订阅。涉及的表如下:

  • 普通订阅:
    emqx_suboption、emqx_subscription、emqx_subscriber

  • 共享订阅:
    emqx_suboption、emqx_subscription、emqx_shared_subscription(全局)、emqx_shared_subscriber、emqx_alive_shared_subscribers

  • emqx_suboption
字段 说明 示例
key 主键 {<0.511.1>,<<"test/set/#">>} SPid和Topic组成的元组
value SubMap #{nl => 0,pktid => 1,qos => 1,rap => 0,rc => 0,rh => 0,share => <<"emqx@192.168.0.1">>,subid => <<"broker/tcp">>} share关键字在共享订阅Topic时才出现

set表。这个表是和Session进程关联最密切的表,每个Session进程会在State里面存着自己进程订阅的Topic的信息,如下:

subscriptions = #{<<"test/set/#">> => #{nl => 0,pktid => 1,qos => 1,rap => 0,rc => 0,rh => 0,share => <<"emqx@192.168.0.1">>}},
  • emqx_subscription
字段 说明 示例
key Session进程ID 主键 <0.511.1> SPid
value Topic <<"test/set/#">> Topic

订阅关系表,SPid为主键,duplicate_bag表。普通/共享订阅时使用。
目前主要的使用的地方有两处,一处是给http Api 提供查询使用,一处是处理Session进程挂掉时执行unsubscribe操作。

  • emqx_subscriber
字段 说明 示例
key Topic 主键 <<"test/set/#">> Topic / {shard,<<"test/set/#">>,1} 分片时使用的key
value Session进程ID 或者分片 <0.511.1> SPid / {shared, 1} 分片时使用的value

订阅者表,Topic为主键,bag表。普通订阅使用。
是下发消息使用的重要的表。下发时以Topic查询所有订阅者的SPid。
因为是bag表,执行取消订阅时,在删除后,需要判断表中是否还存有key决定是否do_delete_route。

关于分片:当订阅订阅同一个Topic的设备太多时,publish消息时会出现大key查询,影响效率,所有当订阅者超过1024个时会出现分片存在。分片存储是将一条数据拆成两条数据。
举个例子 {<<"test/set/#">>, <0.511.1>} ,插入时发现这个Topic的订阅者已经超过1024个,就会进行分片,分两次插入{{shard,<<"test/set/#">>,1},<0.511.1>} 、 {<<"test/set/#">>, {shard,1}}。分发消息时,会先命中后一项,在根据{shard,1} 去命中前一项。 例子中的1是获取的分片id,可以理解为分表id。

  • emqx_shared_subscription
字段 说明 示例
group <<"emqx@192.168.0.1">>
topic Topic <<"test/set/#">>
subpid SessionPid <0.511.1>

共享订阅关系表 ,以Group、Topic为维度,bag表。共享订阅时使用。
是共享订阅下发消息使用的重要的表。 下发时以Group、Topic查询所有订阅者的SPid。

  • emqx_shared_subscriber
字段 说明 示例
key 主键 {<<"emqx@127.0.0.1">>,<<"test/set/#">>} Group和Topic组成的元组
value Session进程ID <0.511.1> SPid

共享订阅者表,bag表,以{Group,Topic}为主键,共享订阅时使用。
因为是bag表,可以用来在订阅/取消订阅时判断是否需要do_add_route/do_delete_route。
订阅操作时,插入emqx_shared_subscriber前,表中没有此key需要do_add_route;取消订阅时,删除emqx_shared_subscriber后,表中仍存在此key,就不能执行do_delete_route。

  • emqx_alive_shared_subscribers
字段 说明 示例
key 主键 <0.511.1> SPid

用来跟踪订阅Session进程的状态的,表中只存活着非本节点的订阅者SessionPid。
SessionPid进程挂掉时,删除该项。并取消该进程的所有共享订阅信息。

2、分布层

分布层维护全局主题树(Topic Trie)与路由表(Route Table)。主题树由通配主题构成,路由表映射主题到节点:


分布层设计一(图六)

分布层通过匹配主题树(Topic Trie)和查找路由表(Route Table),在集群的节点间转发路由 MQTT 消息:


分布层设计二(图七)

分布层主要涉及的表如下:
emqx_route (全局)、emqx_trie(全局)、emqx_trie_node(全局)
当topic中出现通配符时,订阅动作才会写入emqx_trie_node、emqx_trie两张表。
下发消息时,会首先查找后两张表,看有没有满足通配匹配的TopicFilter。

  • emqx_route
字段 说明 示例
topic 主键 <<"test/set/#">> Topic
dest 目标节点 'emqx@127.0.0.1' / {<<"emqx@127.0.0.1">>, 'emqx@127.0.0.1'}

查找路由表,以topic为主键,bag表。消息下发时确定消息路由节点。

  • emqx_trie_node
字段 说明 示例
node_id 主键 以<<"test/set/#">>为例, node_id 可以为以下值 <<"test">> <<"test/set">> <<"test/set/#">>
edge_count 子节点数量
topic Topic <<"test/set/#">> / undefind ,只有叶子节点才有值
flags undefined
  • emqx_trie
字段 说明 示例
egde 主键 #trie_edge{node_id = <<"test/set">>,word = '#'}
node_id 子节点node_id <<"test/set/#">>

这两张表共同构成了匹配主题树表。

二、举例说明

前面匹配主题树表结构有点复杂,我们举个例子讲解一下。

当A订阅 <<"test/set/nick">> 时, 因为这个TopicFilter中没有通配符,所有直接插入emqx_route表中 {<<"test/set/nick">>,PidA}。
当B订阅 <<"test/set/#">>时,因为这个TopicFilter有通配符,需要先插入匹配主题树,最后再在emqx_route 中插入{<<"test/set/nick">>,PidA}。

我们细看插入匹配主题树的过程,先将<<"test/set/#">> 进行分词拆分 [<<"test">>,<<"set">>,'#'],插入emqx_trie中,我们有了以下记录

[
 #trie{edge = #trie_edge{node_id = root,word = <<"test">>},
       node_id = <<"test">>},
 #trie{edge = #trie_edge{node_id = <<"test">>,
                         word = <<"set">>},
       node_id = <<"test/set">>},
  #trie{edge = #trie_edge{node_id = <<"test/set">>,
                         word = '#'},
       node_id = <<"test/set/#">>}]

emqx_trie的结构就是 {{node_id, word}, child}
再看一下

{root, <<"test">>}, <<"test">>
{<<"test">>, "set"} , <<"test/set">>
{<<"test/set">>, '#'}, <<"test/set/#">>

除了根节点, node_id 拼接 word 就是 child 的node_id ,通过这样的方式够成了树。

我们查看emqx_trie_node

[
#trie_node{node_id = root,edge_count = 1,topic = undefined,
            flags = undefined},
#trie_node{node_id = <<"test">>,edge_count = 1,
            topic = undefined,flags = undefined},
#trie_node{node_id = <<"test/set">>,edge_count = 1,
            topic = undefined,flags = undefined},
 #trie_node{node_id = <<"test/set/#">>,edge_count = 0,
            topic = <<"test/set/#">>,flags = undefined}]

这张表记录了所有的节点,和子节点数量,只有叶子节topic有值。

下面我们看一下发布流程:
当C向<<"test/set/nick">> 发布一条消息时。

  1. 会先查emqx_trie和emqx_trie_node两种表,同样是先把Topic进行拆分, [<<"test">>,<<"set">>,<<"nick">>],加上根节点root,即 [root, <<"test">>,<<"set">>,<<"nick">>],
    先按{root, '#'} 查询emqx_trie表,此时记录为空,如果有会存入Acc里面,再查 {<<"test">>, <<"set">>} 和 {<<"test", '+'>>} 查到记录就递归查询,前者有记录 {{<<"test">>,<<"set">>},<<"test/set">>} ,所以递归查询{<<"test/set">>, <<"nick">>} 和 {<<"test/set">>, '+'} ,查到结果为空,此时分词表到头了,接着查询{<<"test/set">>, '#'} 查到结果是{{<<"test/set">>, '#'}, <<"test/set/#">>} ,根据这条记录查询emqx_trie_node表,得到 #trie_node{node_id = <<"test/set/#">>,edge_count = 0,topic = <<"test/set/#">>,flags = undefined},至此我们通过匹配主题树查到了一个topicFilter <<"test/set/#">>。
  2. 接着我们通过前面查到的<<"test/set/#">> 和最初的 <<"test/set/nick">> 一起查询emqx_route表,根据路由表将消息路由的目标节点去执行下一步的分发。
  3. 这里我们关注一下匹配主题树,除了首尾尾查询{root, '#'}、{<<"test/set">>, '#', 其他环节都是在查询一个word和'+'二叉树
      root
      /   \
    #   test
         /   \
      +       set
              /   \
            +      #

查询过程就是这颗树。

具体代码如下:
emqx_trie.erl

match(Topic) when is_binary(Topic) ->
    TrieNodes = match_node(root, emqx_topic:words(Topic)),
    [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= undefined].

match_node(root, [NodeId = <<$$, _/binary>>|Words]) ->
    match_node(NodeId, Words, []);

match_node(NodeId, Words) ->
    match_node(NodeId, Words, []).

match_node(NodeId, [], ResAcc) ->
    mnesia:read(?TRIE_NODE, NodeId) ++ 'match_#'(NodeId, ResAcc);

match_node(NodeId, [W|Words], ResAcc) ->
    lists:foldl(fun(WArg, Acc) ->
        case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of
            [#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc);
            [] -> Acc
        end
    end, 'match_#'(NodeId, ResAcc), [W, '+']).

%% @private
%% @doc Match node with '#'.
'match_#'(NodeId, ResAcc) ->
    case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = '#'}) of
        [#trie{node_id = ChildId}] ->
            mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc;
        [] -> ResAcc
    end.
  1. 接着我们串一下路由层如何继续分发吧。前面分布层我们查询emqx_route 得到了 [{<<"test/set/nick">>, Node1}, {<<"test/set/#">>, Node2}] ,Node1和Node2如果是普通订阅,则forward到目标节点(目标节点是本地则省略该步骤),查询emqx_subercriber表中SessionPid,直接send SubPid ! {dispatch, Topic, Msg},后续流程就接上前面章节的会话层逻辑了。 (如果出现分片情况,参见前面emqx_subscriber的逻辑)。

  2. 如果是共享订阅呢,也省掉了forward逻辑(因为emqx_shared_subscription是全局表),进入共享订阅的分发逻辑,共享订阅的emqx_route 记录格式类似这样{<<"test/set/nick">>, {Group,Node}} ,实际上Group就是binary格式的Node。共享订阅的逻辑就是从emqx_shared_subscription表中pick一个会话,发布消息,失败了则进行下一个,直到有一个成功。共享订阅的概念就是从订阅这个Topic的Subscriber中选择一个进行投递,保证只有一个人收到。

三、结语

本文仅仅从整体上梳理了一下emqx3.0的功能流程,各个环节点到为止,后续有机会再出一些具体功能的详细介绍吧。

四、附

https://github.com/emqx/emqx
https://docs.emqx.io/broker/v3/cn/design.html#distributed-layer

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 安全性 设置客户端连接后进行任何其他指令前需要使用的密码。 警告:因为redis 速度相当快,所以在一台比较好的服...
    OzanShareing阅读 5,820评论 1 7
  • 大致可以通过上述情况进行排除 1.kafka服务器问题 查看日志是否有报错,网络访问问题等。 2. kafka p...
    生活的探路者阅读 12,223评论 0 10
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 10,791评论 0 9
  • JAVA面试题 1、作用域public,private,protected,以及不写时的区别答:区别如下:作用域 ...
    JA尐白阅读 4,858评论 1 0
  • 很快的,几周之后,两楚卫国军,这个司马差点第一次上线被这群人弄死结果又稀里糊涂又被加进去的“强大”公会组织,马上就...
    Mod模君阅读 754评论 0 0