Publisher Confirms 机制通常用来确认消息到达服务端。对于一条持久化消息
,通过 channel(confirm
模式)发送到持久化队列
,如果生产者收到服务端响应关于这条消息的 basic.ack
指令,那么生产者可以认为消息已经成功发到服务端(并且已经持久化)。
本文主要探讨 RabbitMQ(v3.6.x
) 服务端触发 confirm 的时机
流程
- rabbit_channel 进程处理
basic.publish
方法时,执行 deliver_to_queues/2 方法
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username},
conn_name = ConnName,
delivery_flow = Flow}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
check_internal_exchange(Exchange),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
maybe_set_fast_reply_to(
rabbit_binary_parser:ensure_content_decoded(Content), State),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
{MsgSeqNo, State1} =
case DoConfirm orelse Mandatory of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
State1#ch{tx = {Msgs1, Acks}}
end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
-
deliver_to_queues/2
会调用 rabbit_amqqueue 模块deliver/2
方法:通过delegate 进程同时
向队列进程(master、slaves进程)发送消息
deliver(Qs, Delivery = #delivery{flow = Flow}) ->
{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,
%% We use up two credits to send to a slave since the message
%% arrives at the slave from two directions. We will ack one when
%% the slave receives the message direct from the channel, and the
%% other when it receives it via GM.
case Flow of
%% Here we are tracking messages sent by the rabbit_channel
%% process. We are accessing the rabbit_channel process
%% dictionary.
flow -> [credit_flow:send(QPid) || QPid <- QPids],
[credit_flow:send(QPid) || QPid <- SPids];
noflow -> ok
end,
%% We let slaves know that they were being addressed as slaves at
%% the time - if they receive such a message from the channel
%% after they have become master they should mark the message as
%% 'delivered' since they do not know what the master may have
%% done with it.
MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
%% 同时向队列master、slaves进程发
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
QPids.
- 队列进程
rabbit_amqqueue_process
接收消息
handle_cast({deliver, Delivery = #delivery{sender = Sender,
flow = Flow}, SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
%% In both credit_flow:ack/1 we are acking messages to the channel
%% process that sent us the message delivery. See handle_ch_down
%% for more info.
flow -> credit_flow:ack(Sender),
case SlaveWhenPublished of
true -> credit_flow:ack(Sender); %% [0]
false -> ok
end,
pmon:monitor(Sender, Senders);
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
-
deliver_or_enqueue/3
尝试投递给消费者;或者 入队
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
State2) of
true ->
State2;
{delivered, State3} ->
State3;
%% The next one is an optimisation
{undelivered, State3 = #q{ttl = 0, dlx = undefined,
backing_queue_state = BQS2,
msg_id_to_channel = MTC}} ->
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
QLen = BQ:len(BQS4),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
%% expiry ends up at the head of the queue. If the head
%% remains unchanged, or if the newly published message
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
{false, false, _} -> State4;
{true, true, undefined} -> State4;
{_, _, _} -> drop_expired_msgs(State4)
end
end.
-
send_or_record_confirm/2
,只有当消息
和队列
都设置为持久化时,才会将消息状态保存在msg_id_to_channel
数据结构中(等到消息最终落盘才会触发 confirm)
send_or_record_confirm(#delivery{confirm = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
State = #q{q = #amqqueue{durable = true},
msg_id_to_channel = MTC}) ->
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
{eventually, State#q{msg_id_to_channel = MTC1}};
-
attempt_delivery/4
后根据投递状态,执行BQ:publish/6
或BQ:publish_delivered/5
方法,最终底层 rabbit_variable_queue 调用 rabbit_msg_store 进行消息持久化
- 步骤
3
最后执行noreply/1
noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
MTC1 = confirm_messages(MsgIds, MTC),
State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
timed -> {ensure_sync_timer(State1), 0 }
end.
confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
case gb_trees:lookup(MsgId, MTC0) of
{value, {SenderPid, MsgSeqNo}} ->
{rabbit_misc:gb_trees_cons(SenderPid,
MsgSeqNo, CMs),
gb_trees:delete(MsgId, MTC0)};
none ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
MTC1.