RabbitMQ服务端触发Confirm时机

Publisher Confirms 机制通常用来确认消息到达服务端。对于一条持久化消息,通过 channel(confirm模式)发送到持久化队列,如果生产者收到服务端响应关于这条消息的 basic.ack指令,那么生产者可以认为消息已经成功发到服务端(并且已经持久化)。

本文主要探讨 RabbitMQ(v3.6.x) 服务端触发 confirm 的时机

流程

  1. rabbit_channel 进程处理 basic.publish 方法时,执行 deliver_to_queues/2 方法
handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
                               routing_key = RoutingKey,
                               mandatory   = Mandatory},
              Content, State = #ch{virtual_host    = VHostPath,
                                   tx              = Tx,
                                   channel         = ChannelNum,
                                   confirm_enabled = ConfirmEnabled,
                                   trace_state     = TraceState,
                                   user            = #user{username = Username},
                                   conn_name       = ConnName,
                                   delivery_flow   = Flow}) ->
    check_msg_size(Content),
    ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
    check_write_permitted(ExchangeName, State),
    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
    check_internal_exchange(Exchange),
    %% We decode the content's properties here because we're almost
    %% certain to want to look at delivery-mode and priority.
    DecodedContent = #content {properties = Props} =
        maybe_set_fast_reply_to(
          rabbit_binary_parser:ensure_content_decoded(Content), State),
    check_user_id_header(Props, State),
    check_expiration_header(Props),
    DoConfirm = Tx =/= none orelse ConfirmEnabled,
    {MsgSeqNo, State1} =
        case DoConfirm orelse Mandatory of
            false -> {undefined, State};
            true  -> SeqNo = State#ch.publish_seqno,
                     {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
        end,
    case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
        {ok, Message} ->
            Delivery = rabbit_basic:delivery(
                         Mandatory, DoConfirm, Message, MsgSeqNo),
            QNames = rabbit_exchange:route(Exchange, Delivery),
            rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
                                Username, TraceState),
            DQ = {Delivery#delivery{flow = Flow}, QNames},
            {noreply, case Tx of
                          none         -> deliver_to_queues(DQ, State1);
                          {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
                                          State1#ch{tx = {Msgs1, Acks}}
                      end};
        {error, Reason} ->
            precondition_failed("invalid message: ~p", [Reason])
    end;
  1. deliver_to_queues/2 会调用 rabbit_amqqueue 模块 deliver/2方法:通过delegate 进程 同时向队列进程(master、slaves进程)发送消息
deliver(Qs, Delivery = #delivery{flow = Flow}) ->
    {MPids, SPids} = qpids(Qs),
    QPids = MPids ++ SPids,
    %% We use up two credits to send to a slave since the message
    %% arrives at the slave from two directions. We will ack one when
    %% the slave receives the message direct from the channel, and the
    %% other when it receives it via GM.
    case Flow of
        %% Here we are tracking messages sent by the rabbit_channel
        %% process. We are accessing the rabbit_channel process
        %% dictionary.
        flow   -> [credit_flow:send(QPid) || QPid <- QPids],
                  [credit_flow:send(QPid) || QPid <- SPids];
        noflow -> ok
    end,

    %% We let slaves know that they were being addressed as slaves at
    %% the time - if they receive such a message from the channel
    %% after they have become master they should mark the message as
    %% 'delivered' since they do not know what the master may have
    %% done with it.
    MMsg = {deliver, Delivery, false},
    SMsg = {deliver, Delivery, true},
    %% 同时向队列master、slaves进程发
    delegate:cast(MPids, MMsg),
    delegate:cast(SPids, SMsg),
    QPids.

  1. 队列进程 rabbit_amqqueue_process 接收消息
handle_cast({deliver, Delivery = #delivery{sender = Sender,
                                           flow   = Flow}, SlaveWhenPublished},
            State = #q{senders = Senders}) ->
    Senders1 = case Flow of
    %% In both credit_flow:ack/1 we are acking messages to the channel
    %% process that sent us the message delivery. See handle_ch_down
    %% for more info.
                   flow   -> credit_flow:ack(Sender),
                             case SlaveWhenPublished of
                                 true  -> credit_flow:ack(Sender); %% [0]
                                 false -> ok
                             end,
                             pmon:monitor(Sender, Senders);
                   noflow -> Senders
               end,
    State1 = State#q{senders = Senders1},
    noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
  1. deliver_or_enqueue/3 尝试投递给消费者;或者 入队
deliver_or_enqueue(Delivery = #delivery{message = Message,
                                        sender  = SenderPid,
                                        flow    = Flow},
                   Delivered, State = #q{backing_queue       = BQ,
                                         backing_queue_state = BQS}) ->
    send_mandatory(Delivery), %% must do this before confirms
    {Confirm, State1} = send_or_record_confirm(Delivery, State),
    Props = message_properties(Message, Confirm, State1),
    {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
    State2 = State1#q{backing_queue_state = BQS1},
    case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
                                             State2) of
        true ->
            State2;
        {delivered, State3} ->
            State3;
        %% The next one is an optimisation
        {undelivered, State3 = #q{ttl = 0, dlx = undefined,
                                  backing_queue_state = BQS2,
                                  msg_id_to_channel   = MTC}} ->
            {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
            State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
        {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
            BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
            {Dropped, State4 = #q{backing_queue_state = BQS4}} =
                maybe_drop_head(State3#q{backing_queue_state = BQS3}),
            QLen = BQ:len(BQS4),
            %% optimisation: it would be perfectly safe to always
            %% invoke drop_expired_msgs here, but that is expensive so
            %% we only do that if a new message that might have an
            %% expiry ends up at the head of the queue. If the head
            %% remains unchanged, or if the newly published message
            %% has no expiry and becomes the head of the queue then
            %% the call is unnecessary.
            case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
                {false, false,         _} -> State4;
                {true,  true,  undefined} -> State4;
                {_,     _,             _} -> drop_expired_msgs(State4)
            end
    end.
  1. send_or_record_confirm/2,只有当消息队列都设置为持久化时,才会将消息状态保存在 msg_id_to_channel 数据结构中(等到消息最终落盘才会触发 confirm)
send_or_record_confirm(#delivery{confirm    = true,
                                 sender     = SenderPid,
                                 msg_seq_no = MsgSeqNo,
                                 message    = #basic_message {
                                   is_persistent = true,
                                   id            = MsgId}},
                       State = #q{q                 = #amqqueue{durable = true},
                                  msg_id_to_channel = MTC}) ->
    MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
    {eventually, State#q{msg_id_to_channel = MTC1}};
  1. attempt_delivery/4 后根据投递状态,执行 BQ:publish/6BQ:publish_delivered/5 方法,最终底层 rabbit_variable_queue 调用 rabbit_msg_store 进行消息持久化
  1. 步骤3最后执行noreply/1
noreply(NewState) ->
    {NewState1, Timeout} = next_state(NewState),
    {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.

next_state(State = #q{backing_queue       = BQ,
                      backing_queue_state = BQS,
                      msg_id_to_channel   = MTC}) ->
    assert_invariant(State),
    {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
    MTC1 = confirm_messages(MsgIds, MTC),
    State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
    case BQ:needs_timeout(BQS1) of
        false -> {stop_sync_timer(State1),   hibernate     };
        idle  -> {stop_sync_timer(State1),   ?SYNC_INTERVAL};
        timed -> {ensure_sync_timer(State1), 0             }
    end.
confirm_messages(MsgIds, MTC) ->
    {CMs, MTC1} =
        lists:foldl(
          fun(MsgId, {CMs, MTC0}) ->
                  case gb_trees:lookup(MsgId, MTC0) of
                      {value, {SenderPid, MsgSeqNo}} ->
                          {rabbit_misc:gb_trees_cons(SenderPid,
                                                     MsgSeqNo, CMs),
                           gb_trees:delete(MsgId, MTC0)};
                      none ->
                          {CMs, MTC0}
                  end
          end, {gb_trees:empty(), MTC}, MsgIds),
    rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
    MTC1.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。