From 7a949a6edecf6ee27d4f530ff4b249f0602b004f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 3 Feb 2023 10:04:30 +0000 Subject: [PATCH] Chunk quorum queue deliveries This puts a limit to the amount of message data that is added to the process heap at the same time to around 128KB. Large prefetch values combined with large messages could cause excessive garbage collection work. Also similify the intermediate delivery message format to avoid allocations that aren't necessary. --- deps/rabbit/src/rabbit_fifo.erl | 48 ++++++++++++++++++------- deps/rabbit/src/rabbit_fifo.hrl | 1 + deps/rabbit/src/rabbit_fifo_client.erl | 2 +- deps/rabbit/test/quorum_queue_SUITE.erl | 21 ++++++++++- deps/rabbit/test/rabbit_fifo_SUITE.erl | 21 +++++++++++ 5 files changed, 79 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index c45730406a22..15036697b989 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -72,7 +72,8 @@ ]). -ifdef(TEST). --export([update_header/4]). +-export([update_header/4, + chunk_disk_msgs/3]). -endif. %% command records representing all the protocol actions that are supported @@ -1876,9 +1877,9 @@ checkout(#{index := Index} = Meta, end. checkout0(Meta, {success, ConsumerId, MsgId, - ?MSG(RaftIdx, Header), ExpiredMsg, State, Effects}, + ?MSG(_RaftIdx, _Header) = Msg, ExpiredMsg, State, Effects}, SendAcc0) -> - DelMsg = {RaftIdx, {MsgId, Header}}, + DelMsg = {MsgId, Msg}, SendAcc = case maps:get(ConsumerId, SendAcc0, undefined) of undefined -> SendAcc0#{ConsumerId => [DelMsg]}; @@ -1887,7 +1888,7 @@ checkout0(Meta, {success, ConsumerId, MsgId, end, checkout0(Meta, checkout_one(Meta, ExpiredMsg, State, Effects), SendAcc); checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) -> - Effects = append_delivery_effects(Effects0, SendAcc, State0), + Effects = add_delivery_effects(Effects0, SendAcc, State0), {State0, ExpiredMsg, lists:reverse(Effects)}. evaluate_limit(_Index, Result, _BeforeState, @@ -1942,12 +1943,33 @@ evaluate_limit(Index, Result, BeforeState, {State0, Result, Effects0} end. -append_delivery_effects(Effects0, AccMap, _State) when map_size(AccMap) == 0 -> + +%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]] +chunk_disk_msgs([], _Bytes, [[] | Chunks]) -> + Chunks; +chunk_disk_msgs([], _Bytes, Chunks) -> + Chunks; +chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], + Bytes, Chunks) + when Bytes >= ?DELIVERY_CHUNK_LIMIT_B -> + Size = get_header(size, Header), + chunk_disk_msgs(Rem, Size, [[Msg] | Chunks]); +chunk_disk_msgs([{_MsgId, ?MSG(_RaftIdx, Header)} = Msg | Rem], Bytes, + [CurChunk | Chunks]) -> + Size = get_header(size, Header), + chunk_disk_msgs(Rem, Bytes + Size, [[Msg | CurChunk] | Chunks]). + +add_delivery_effects(Effects0, AccMap, _State) + when map_size(AccMap) == 0 -> %% does this ever happen? Effects0; -append_delivery_effects(Effects0, AccMap, State) -> - maps:fold(fun (C, DiskMsgs, Ef) when is_list(DiskMsgs) -> - [delivery_effect(C, lists:reverse(DiskMsgs), State) | Ef] +add_delivery_effects(Effects0, AccMap, State) -> + maps:fold(fun (C, DiskMsgs, Efs) + when is_list(DiskMsgs) -> + lists:foldl( + fun (Msgs, E) -> + [delivery_effect(C, Msgs, State) | E] + end, Efs, chunk_disk_msgs(DiskMsgs, 0, [[]])) end, Effects0, AccMap). take_next_msg(#?MODULE{returns = Returns0, @@ -1978,18 +2000,20 @@ get_next_msg(#?MODULE{returns = Returns0, Msg end. -delivery_effect({CTag, CPid}, [{Idx, {MsgId, Header}}], +delivery_effect({CTag, CPid}, [{MsgId, ?MSG(Idx, Header)}], #?MODULE{msg_cache = {Idx, RawMsg}}) -> {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]}, [local, ra_event]}; delivery_effect({CTag, CPid}, Msgs, _State) -> - {RaftIdxs, Data} = lists:unzip(Msgs), + RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) -> + [I | Acc] + end, [], Msgs), {log, RaftIdxs, fun(Log) -> DelMsgs = lists:zipwith( - fun (Cmd, {MsgId, Header}) -> + fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) -> {MsgId, {Header, get_msg(Cmd)}} - end, Log, Data), + end, Log, Msgs), [{send_msg, CPid, {delivery, CTag, DelMsgs}, [local, ra_event]}] end, {local, node(CPid)}}. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index 1d90610a3fc2..b4c37c70731a 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -96,6 +96,7 @@ -define(MB, 1_048_576). -define(LOW_LIMIT, 0.8). +-define(DELIVERY_CHUNK_LIMIT_B, 128_000). -record(consumer_cfg, {meta = #{} :: consumer_meta(), diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 6403ff90f6bc..5e4e1d4a5438 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -1,4 +1,4 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public +% This Source Code Form is subject to the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 15f15102a68f..e445ee69b5a0 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -153,7 +153,9 @@ all_tests() -> per_message_ttl_expiration_too_high, consumer_priorities, cancel_consumer_gh_3729, - cancel_and_consume_with_same_tag + cancel_and_consume_with_same_tag, + validate_messages_on_queue + ]. memory_tests() -> @@ -2936,6 +2938,23 @@ cancel_and_consume_with_same_tag(Config) -> + ok. + +validate_messages_on_queue(Config) -> + QQ = ?config(queue_name, Config), + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare_ok'{} = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + Messages = [begin + M = <>, + publish(Ch, QQ, M), + M + end || I <- lists:seq(1, 200)], + amqp_channel:wait_for_confirms_or_die(Ch), + validate_queue(Ch, QQ, Messages), + ok. leader_locator_client_local(Config) -> diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index b24673e7b661..9bc387176e25 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -1971,6 +1971,27 @@ header_test(_) -> ?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)), ok. +chunk_disk_msgs_test(_Config) -> + %% NB: this does test an internal function + %% input to this function is a reversed list of MSGs + Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)], + Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]), + ?assertMatch([_, _], Chunks), + [Chunk1, Chunk2] = Chunks, + ?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1), + %% the chunks are worked out in backwards order, hence the first chunk + %% will be a "remainder" chunk + ?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2), + ?assertEqual(128, length(Chunk2)), + ?assertEqual(72, length(Chunk1)), + + TwoBigMsgs = [{124, ?MSG(124, 200_000)}, + {123, ?MSG(123, 200_000)}], + ?assertMatch([[{123, ?MSG(123, 200_000)}], + [{124, ?MSG(124, 200_000)}]], + rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf).