Skip to content

Commit

Permalink
Fix AMQP 1.0 shovel
Browse files Browse the repository at this point in the history
The shovel violated the AMQP 1.0 spec by sending transfers with settled=true
under sender settle mode unsettled (in case of shovel ack-mode being
on-publish).
  • Loading branch information
ansd committed Sep 25, 2024
1 parent 9d7ebf3 commit c0fbc1a
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ connect_source(State = #{name := Name,
ack_mode := AckMode,
source := #{uris := [Uri | _],
source_address := Addr} = Src}) ->
SndSettleMode = case AckMode of
no_ack -> settled;
on_publish -> unsettled;
on_confirm -> unsettled
end,
AttachFun = fun amqp10_client:attach_receiver_link/5,
{Conn, Sess, LinkRef} = connect(Name, AckMode, Uri, "receiver", Addr, Src,
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
AttachFun),
State#{source => Src#{current => #{conn => Conn,
session => Sess,
Expand All @@ -87,8 +92,13 @@ connect_dest(State = #{name := Name,
ack_mode := AckMode,
dest := #{uris := [Uri | _],
target_address := Addr} = Dst}) ->
SndSettleMode = case AckMode of
no_ack -> settled;
on_publish -> settled;
on_confirm -> unsettled
end,
AttachFun = fun amqp10_client:attach_sender_link_sync/5,
{Conn, Sess, LinkRef} = connect(Name, AckMode, Uri, "sender", Addr, Dst,
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "sender", Addr, Dst,
AttachFun),
%% wait for link credit here as if there are messages waiting we may try
%% to forward before we've received credit
Expand All @@ -99,7 +109,7 @@ connect_dest(State = #{name := Name,
link => LinkRef,
uri => Uri}}}.

connect(Name, AckMode, Uri, Postfix, Addr, Map, AttachFun) ->
connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
{ok, Config0} = amqp10_client:parse_uri(Uri),
%% As done for AMQP 0.9.1, exclude AMQP 1.0 shovel connections from maintenance mode
%% to prevent crashes and errors being logged by the shovel plugin when a node gets drained.
Expand All @@ -113,16 +123,11 @@ connect(Name, AckMode, Uri, Postfix, Addr, Map, AttachFun) ->
LinkName0 = gen_unique_name(Name, Postfix),
rabbit_data_coercion:to_binary(LinkName0)
end,
% mixed settlement mode covers all the ack_modes
SettlementMode = case AckMode of
no_ack -> settled;
_ -> unsettled
end,
% needs to be sync, i.e. awaits the 'attach' event as
% else we may try to use the link before it is ready
Durability = maps:get(durability, Map, unsettled_state),
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
SettlementMode,
SndSettleMode,
Durability),
{Conn, Sess, LinkRef}.

Expand Down

0 comments on commit c0fbc1a

Please sign in to comment.