Skip to content

Commit

Permalink
Merge pull request #12507 from rabbitmq/mergify/bp/v4.0.x/pr-12506
Browse files Browse the repository at this point in the history
Track requeue history (backport #12506)
  • Loading branch information
michaelklishin authored Oct 11, 2024
2 parents ab97ab5 + 67319f0 commit 2292638
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 44 deletions.
11 changes: 9 additions & 2 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1181,13 +1181,20 @@ wrap_map_value(true) ->
wrap_map_value(false) ->
{boolean, false};
wrap_map_value(V) when is_integer(V) ->
{uint, V};
case V < 0 of
true ->
{int, V};
false ->
uint(V)
end;
wrap_map_value(V) when is_binary(V) ->
utf8(V);
wrap_map_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V)).
utf8(atom_to_list(V));
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8(V) -> amqp10_client_types:utf8(V).

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
-type str() :: atom() | string() | binary().
-type internal_ann_key() :: atom().
-type x_ann_key() :: binary(). %% should begin with x- or ideally x-opt-
-type x_ann_value() :: str() | integer() | float() | [x_ann_value()].
-type x_ann_value() :: str() | integer() | float() | TaggedValue :: tuple() | [x_ann_value()].
-type protocol() :: module().
-type annotations() :: #{internal_ann_key() => term(),
x_ann_key() => x_ann_value()}.
Expand Down
5 changes: 2 additions & 3 deletions deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ infer_type(V) when is_integer(V) ->
{long, V};
infer_type(V) when is_boolean(V) ->
{boolean, V};
infer_type({T, _} = V) when is_atom(T) ->
%% looks like a pre-tagged type
V.
infer_type(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.

utf8_string_is_ascii(UTF8String) ->
utf8_scan(UTF8String, fun(Char) -> Char >= 0 andalso Char < 128 end).
Expand Down
13 changes: 10 additions & 3 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@ settle_op_from_outcome(#'v1_0.modified'{delivery_failed = DelFailed,
Anns1 = lists:map(
%% "all symbolic keys except those beginning with "x-" are reserved." [3.2.10]
fun({{symbol, <<"x-", _/binary>> = K}, V}) ->
{K, unwrap(V)}
{K, unwrap_simple_type(V)}
end, KVList),
maps:from_list(Anns1)
end,
Expand Down Expand Up @@ -3603,7 +3603,14 @@ format_status(
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

unwrap({_Tag, V}) ->

unwrap_simple_type(V = {list, _}) ->
V;
unwrap_simple_type(V = {map, _}) ->
V;
unwrap_simple_type(V = {array, _, _}) ->
V;
unwrap_simple_type({_SimpleType, V}) ->
V;
unwrap(V) ->
unwrap_simple_type(V) ->
V.
209 changes: 178 additions & 31 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ groups() ->
modified_classic_queue,
modified_quorum_queue,
modified_dead_letter_headers_exchange,
modified_dead_letter_history,
dead_letter_headers_exchange,
dead_letter_reject,
dead_letter_reject_message_order_classic_queue,
Expand Down Expand Up @@ -253,7 +254,8 @@ init_per_testcase(T, Config)
end;
init_per_testcase(T, Config)
when T =:= modified_quorum_queue orelse
T =:= modified_dead_letter_headers_exchange ->
T =:= modified_dead_letter_headers_exchange orelse
T =:= modified_dead_letter_history ->
case rpc(Config, rabbit_feature_flags, is_enabled, ['rabbitmq_4.0.0']) of
true ->
rabbit_ct_helpers:testcase_started(Config, T);
Expand Down Expand Up @@ -490,79 +492,127 @@ modified_quorum_queue(Config) ->
ok = amqp10_client:send_msg(Sender, Msg2),
ok = amqp10_client:detach_link(Sender),

{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, unsettled),
Receiver1Name = <<"receiver 1">>,
Receiver2Name = <<"receiver 2">>,
{ok, Receiver1} = amqp10_client:attach_receiver_link(Session, Receiver1Name, Address, unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(Session, Receiver2Name, Address, unsettled),

{ok, M1} = amqp10_client:get_msg(Receiver),
{ok, M1} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m1">>], amqp10_msg:body(M1)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M1)),
ok = amqp10_client:settle_msg(Receiver, M1, {modified, false, true, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M1, {modified, false, true, #{}}),

{ok, M2a} = amqp10_client:get_msg(Receiver),
{ok, M2a} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2a)),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(M2a)),
ok = amqp10_client:settle_msg(Receiver, M2a, {modified, false, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2a, {modified, false, false, #{}}),

{ok, M2b} = amqp10_client:get_msg(Receiver),
{ok, M2b} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2b)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(M2b)),
ok = amqp10_client:settle_msg(Receiver, M2b, {modified, true, false, #{}}),
ok = amqp10_client:settle_msg(Receiver1, M2b, {modified, true, false, #{}}),

{ok, M2c} = amqp10_client:get_msg(Receiver),
{ok, M2c} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2c)),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(M2c)),
ok = amqp10_client:settle_msg(Receiver, M2c,
{modified, true, false,
#{<<"x-opt-key">> => <<"val 1">>}}),

{ok, M2d} = amqp10_client:get_msg(Receiver),
ok = amqp10_client:settle_msg(
Receiver1, M2c,
{modified, true, false,
%% Test that a history of requeue events can be tracked as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> => {list, [{utf8, <<"reason 1">>}]},
<<"x-opt-my-map">> => {map, [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}}
]}}}),

{ok, M2d} = amqp10_client:get_msg(Receiver2),
?assertEqual([<<"m2">>], amqp10_msg:body(M2d)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2d)),
?assertMatch(#{<<"x-opt-key">> := <<"val 1">>}, amqp10_msg:message_annotations(M2d)),
ok = amqp10_client:settle_msg(Receiver, M2d,
{modified, false, false,
#{<<"x-opt-key">> => <<"val 2">>,
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver),
#{<<"x-opt-requeued-by">> := {array, utf8, L0},
<<"x-opt-requeue-reason">> := L1,
<<"x-opt-my-map">> := L2} = amqp10_msg:message_annotations(M2d),
ok = amqp10_client:settle_msg(
Receiver1, M2d,
{modified, false, false,
#{<<"x-opt-requeued-by">> => {array, utf8, [{utf8, Receiver2Name} | L0]},
<<"x-opt-requeue-reason">> => {list, [{symbol, <<"reason 2">>} | L1]},
<<"x-opt-my-map">> => {map, L2 ++ [{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}]},
<<"x-other">> => 99}}),

{ok, M2e} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m2">>], amqp10_msg:body(M2e)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(M2e)),
?assertMatch(#{<<"x-opt-key">> := <<"val 2">>,
?assertMatch(#{<<"x-opt-requeued-by">> := {array, utf8, [{utf8, Receiver2Name}, {utf8, Receiver1Name}]},
<<"x-opt-requeue-reason">> := [{symbol, <<"reason 2">>}, {utf8, <<"reason 1">>}],
<<"x-opt-my-map">> := [
{{utf8, <<"k1">>}, {byte, -1}},
{{utf8, <<"k2">>}, {ulong, 2}},
{{symbol, <<"k3">>}, {symbol, <<"val 3">>}}
],
<<"x-other">> := 99}, amqp10_msg:message_annotations(M2e)),
ok = amqp10_client:settle_msg(Receiver, M2e, modified),
ok = amqp10_client:settle_msg(Receiver1, M2e, modified),

ok = amqp10_client:detach_link(Receiver),
?assertMatch({ok, #{message_count := 1}},
rabbitmq_amqp_client:delete_queue(LinkPair, QName)),
%% Test that we can consume via AMQP 0.9.1
Ch = rabbit_ct_client_helpers:open_channel(Config),
{#'basic.get_ok'{},
#amqp_msg{payload = <<"m2">>,
props = #'P_basic'{headers = Headers}}
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
%% We expect to receive only modified AMQP 1.0 message annotations that are of simple types
%% (i.e. excluding list, map, array).
?assertEqual({value, {<<"x-other">>, long, 99}},
lists:keysearch(<<"x-other">>, 1, Headers)),
?assertEqual({value, {<<"x-delivery-count">>, long, 5}},
lists:keysearch(<<"x-delivery-count">>, 1, Headers)),
ok = rabbit_ct_client_helpers:close_channel(Ch),

ok = amqp10_client:detach_link(Receiver1),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that a message can be routed based on the message-annotations
%% provided in the modified outcome.
%% provided in the modified outcome as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
modified_dead_letter_headers_exchange(Config) ->
{Connection, Session, LinkPair} = init(Config),
HeadersXName = <<"my headers exchange">>,
AlternateXName = <<"my alternate exchange">>,
SourceQName = <<"source quorum queue">>,
AppleQName = <<"dead letter classic queue receiving apples">>,
BananaQName = <<"dead letter quorum queue receiving bananas">>,
TrashQName = <<"trash queue receiving anything that doesn't match">>,

ok = rabbitmq_amqp_client:declare_exchange(
LinkPair,
HeadersXName,
#{type => <<"headers">>,
arguments => #{<<"alternate-exchange">> => {utf8, AlternateXName}}}),

ok = rabbitmq_amqp_client:declare_exchange(LinkPair, AlternateXName, #{type => <<"fanout">>}),

{ok, #{type := <<"quorum">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
SourceQName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-overflow">> => {utf8, <<"reject-publish">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-least-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<"amq.headers">>}}}),
<<"x-dead-letter-exchange">> => {utf8, HeadersXName}}}),
{ok, #{type := <<"classic">>}} = rabbitmq_amqp_client:declare_queue(
LinkPair,
AppleQName,
Expand All @@ -571,14 +621,16 @@ modified_dead_letter_headers_exchange(Config) ->
LinkPair,
BananaQName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TrashQName, #{}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, AppleQName, <<"amq.headers">>, <<>>,
LinkPair, AppleQName, HeadersXName, <<>>,
#{<<"x-fruit">> => {utf8, <<"apple">>},
<<"x-match">> => {utf8, <<"any-with-x">>}}),
ok = rabbitmq_amqp_client:bind_queue(
LinkPair, BananaQName, <<"amq.headers">>, <<>>,
LinkPair, BananaQName, HeadersXName, <<>>,
#{<<"x-fruit">> => {utf8, <<"banana">>},
<<"x-match">> => {utf8, <<"any-with-x">>}}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, TrashQName, AlternateXName, <<>>, #{}),

{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(SourceQName)),
Expand All @@ -589,6 +641,8 @@ modified_dead_letter_headers_exchange(Config) ->
Session, <<"receiver apple">>, rabbitmq_amqp_address:queue(AppleQName), unsettled),
{ok, ReceiverBanana} = amqp10_client:attach_receiver_link(
Session, <<"receiver banana">>, rabbitmq_amqp_address:queue(BananaQName), unsettled),
{ok, ReceiverTrash} = amqp10_client:attach_receiver_link(
Session, <<"receiver trash">>, rabbitmq_amqp_address:queue(TrashQName), unsettled),

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>)),
Expand All @@ -598,7 +652,8 @@ modified_dead_letter_headers_exchange(Config) ->
ok = amqp10_client:send_msg(Sender, amqp10_msg:set_message_annotations(
#{"x-fruit" => <<"apple">>},
amqp10_msg:new(<<"t4">>, <<"m4">>))),
ok = wait_for_accepts(3),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t5">>, <<"m5">>)),
ok = wait_for_accepts(5),

{ok, Msg1} = amqp10_client:get_msg(Receiver),
?assertMatch(#{delivery_count := 0,
Expand Down Expand Up @@ -639,13 +694,105 @@ modified_dead_letter_headers_exchange(Config) ->
amqp10_msg:headers(MsgBanana2)),
ok = amqp10_client:accept_msg(ReceiverBanana, MsgBanana2),

{ok, Msg5} = amqp10_client:get_msg(Receiver),
%% This message should be routed via the alternate exchange to the trash queue.
ok = amqp10_client:settle_msg(Receiver, Msg5, {modified, false, true, #{<<"x-fruit">> => <<"strawberry">>}}),
{ok, MsgTrash} = amqp10_client:get_msg(ReceiverTrash),
?assertEqual([<<"m5">>], amqp10_msg:body(MsgTrash)),
?assertMatch(#{delivery_count := 0,
first_acquirer := false},
amqp10_msg:headers(MsgTrash)),
ok = amqp10_client:accept_msg(ReceiverTrash, MsgTrash),

ok = detach_link_sync(Sender),
ok = detach_link_sync(Receiver),
ok = detach_link_sync(ReceiverApple),
ok = detach_link_sync(ReceiverBanana),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, SourceQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, AppleQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, BananaQName),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, TrashQName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, HeadersXName),
ok = rabbitmq_amqp_client:delete_exchange(LinkPair, AlternateXName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

%% Test that custom dead lettering event tracking works as described in
%% https://rabbitmq.com/blog/2024/10/11/modified-outcome
modified_dead_letter_history(Config) ->
{Connection, Session, LinkPair} = init(Config),
Q1 = <<"qq 1">>,
Q2 = <<"qq 2">>,

{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, Q1,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<"amq.fanout">>}}}),
{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, Q2,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>},
<<"x-dead-letter-strategy">> => {utf8, <<"at-most-once">>},
<<"x-dead-letter-exchange">> => {utf8, <<>>}}}),
ok = rabbitmq_amqp_client:bind_queue(LinkPair, Q2, <<"amq.fanout">>, <<>>, #{}),

{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, rabbitmq_amqp_address:queue(Q1)),
wait_for_credit(Sender),
{ok, Receiver1} = amqp10_client:attach_receiver_link(
Session, <<"receiver 1">>, rabbitmq_amqp_address:queue(Q1), unsettled),
{ok, Receiver2} = amqp10_client:attach_receiver_link(
Session, <<"receiver 2">>, rabbitmq_amqp_address:queue(Q2), unsettled),

ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t">>, <<"m">>)),
ok = wait_for_accepts(1),
ok = detach_link_sync(Sender),

{ok, Msg1} = amqp10_client:get_msg(Receiver1),
?assertMatch(#{delivery_count := 0,
first_acquirer := true},
amqp10_msg:headers(Msg1)),
ok = amqp10_client:settle_msg(
Receiver1, Msg1,
{modified, true, true,
#{<<"x-opt-history-list">> => {list, [{utf8, <<"l1">>}]},
<<"x-opt-history-map">> => {map, [{{symbol, <<"k1">>}, {byte, -1}}]},
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a1">>}]}}
}),

{ok, Msg2} = amqp10_client:get_msg(Receiver2),
?assertMatch(#{delivery_count := 1,
first_acquirer := false},
amqp10_msg:headers(Msg2)),
#{<<"x-opt-history-list">> := L1,
<<"x-opt-history-map">> := L2,
<<"x-opt-history-array">> := {array, utf8, L0}
} = amqp10_msg:message_annotations(Msg2),
ok = amqp10_client:settle_msg(
Receiver2, Msg2,
{modified, true, true,
#{<<"x-opt-history-list">> => {list, [{int, -99} | L1]},
<<"x-opt-history-map">> => {map, [{{symbol, <<"k2">>}, {symbol, <<"v2">>}} | L2]},
<<"x-opt-history-array">> => {array, utf8, [{utf8, <<"a2">>} | L0]},
<<"x-other">> => -99}}),

{ok, Msg3} = amqp10_client:get_msg(Receiver1),
?assertEqual([<<"m">>], amqp10_msg:body(Msg3)),
?assertMatch(#{delivery_count := 2,
first_acquirer := false},
amqp10_msg:headers(Msg3)),
?assertMatch(#{<<"x-opt-history-array">> := {array, utf8, [{utf8, <<"a2">>}, {utf8, <<"a1">>}]},
<<"x-opt-history-list">> := [{int, -99}, {utf8, <<"l1">>}],
<<"x-opt-history-map">> := [{{symbol, <<"k2">>}, {symbol, <<"v2">>}},
{{symbol, <<"k1">>}, {byte, -1}}],
<<"x-other">> := -99}, amqp10_msg:message_annotations(Msg3)),
ok = amqp10_client:accept_msg(Receiver1, Msg3),

ok = detach_link_sync(Receiver1),
ok = detach_link_sync(Receiver2),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q1),
{ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, Q2),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).
Expand Down
Loading

0 comments on commit 2292638

Please sign in to comment.