Skip to content

Commit

Permalink
Chunk quorum queue deliveries
Browse files Browse the repository at this point in the history
This puts a limit to the amount of message data that is added
to the process heap at the same time to around 128KB.

Large prefetch values combined with large messages could cause
excessive garbage collection work.

Also similify the intermediate delivery message format to avoid
allocations that aren't necessary.
  • Loading branch information
kjnilsson committed Feb 7, 2023
1 parent 55e3569 commit 7a949a6
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 14 deletions.
48 changes: 36 additions & 12 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@
]).

-ifdef(TEST).
-export([update_header/4]).
-export([update_header/4,
chunk_disk_msgs/3]).
-endif.

%% command records representing all the protocol actions that are supported
Expand Down Expand Up @@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta,
end.

checkout0(Meta, {success, ConsumerId, MsgId,
?MSG(RaftIdx, Header), ExpiredMsg, State, Effects},
?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects},
SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
DelMsg = {MsgId, Msg},
SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of
undefined ->
SendAcc0#{ConsumerId => [DelMsg]};
Expand All @@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId,
end,
checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc);
checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects = append_delivery_effects(Effects0, SendAcc, State0),
Effects = add_delivery_effects(Effects0, SendAcc, State0),
{State0, ExpiredMsg, lists:reverse(Effects)}.

evaluate_limit(_Index, Result, _BeforeState,
Expand Down Expand Up @@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState,
{State0, Result, Effects0}
end.

append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 ->

%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->
Chunks;
chunk_disk_msgs([], _Bytes, Chunks) ->
Chunks;
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem],
Bytes, Chunks)
when Bytes >= ?DELIVERY_CHUNK_LIMIT_B ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]);
chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes,
[CurChunk | Chunks]) ->
Size = get_header(size, Header),
chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]).

add_delivery_effects(Effects0, AccMap, _State)
when map_size(AccMap) == 0 ->
%% does this ever happen?
Effects0;
append_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) ->
[delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef]
add_delivery_effects(Effects0, AccMap, State) ->
maps:fold(fun (C, DiskMsgs, Efs)
when is_list(DiskMsgs) ->
lists:foldl(
fun (Msgs, E) ->
[delivery_effect(C, Msgs, State) | E]
end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]]))
end, Effects0, AccMap).

take_next_msg(#?MODULE{returns = Returns0,
Expand Down Expand Up @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0,
Msg
end.

delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}],
delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}],
#?MODULE{msg_cache = {Idx, RawMsg}}) ->
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
[local, ra_event]};
delivery_effect({CTag, CPid}, Msgs, _State) ->
{RaftIdxs, Data} = lists:unzip(Msgs),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{log, RaftIdxs,
fun(Log) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, Header}) ->
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Data),
end, Log, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

-define(MB, 1_048_576).
-define(LOW_LIMIT, 0.8).
-define(DELIVERY_CHUNK_LIMIT_B, 128_000).

-record(consumer_cfg,
{meta = #{} :: consumer_meta(),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
Expand Down
21 changes: 20 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ all_tests() ->
per_message_ttl_expiration_too_high,
consumer_priorities,
cancel_consumer_gh_3729,
cancel_and_consume_with_same_tag
cancel_and_consume_with_same_tag,
validate_messages_on_queue

].

memory_tests() ->
Expand Down Expand Up @@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) ->



ok.

validate_messages_on_queue(Config) ->
QQ = ?config(queue_name, Config),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
Messages = [begin
M = <<I:8000/integer>>,
publish(Ch, QQ, M),
M
end || I <- lists:seq(1, 200)],
amqp_channel:wait_for_confirms_or_die(Ch),
validate_queue(Ch, QQ, Messages),

ok.

leader_locator_client_local(Config) ->
Expand Down
21 changes: 21 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1971,6 +1971,27 @@ header_test(_) ->
?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)),
ok.

chunk_disk_msgs_test(_Config) ->
%% NB: this does test an internal function
%% input to this function is a reversed list of MSGs
Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)],
Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]),
?assertMatch([_, _], Chunks),
[Chunk1, Chunk2] = Chunks,
?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1),
%% the chunks are worked out in backwards order, hence the first chunk
%% will be a "remainder" chunk
?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2),
?assertEqual(128, length(Chunk2)),
?assertEqual(72, length(Chunk1)),

TwoBigMsgs = [{124, ?MSG(124, 200_000)},
{123, ?MSG(123, 200_000)}],
?assertMatch([[{123, ?MSG(123, 200_000)}],
[{124, ?MSG(124, 200_000)}]],
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
ok.

%% Utility

init(Conf) -> rabbit_fifo:init(Conf).
Expand Down

0 comments on commit 7a949a6

Please sign in to comment.