Skip to content

Commit

Permalink
Merge pull request #6224 from cloudamqp/shovel_connection_blocked
Browse files Browse the repository at this point in the history
Shovel: handle `connection.(un)blocked` messages
  • Loading branch information
michaelklishin authored Oct 25, 2022
2 parents bc71986 + 68e931a commit f4a7d2f
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 50 deletions.
50 changes: 40 additions & 10 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ init_dest(Conf = #{ack_mode := AckMode,
_ ->
ok
end,
amqp_connection:register_blocked_handler(Conn, self()),
Conf#{dest => Dst#{unacked => #{}}}.

ack(Tag, Multi, State = #{source := #{current := {_, Chan, _}}}) ->
Expand Down Expand Up @@ -153,6 +154,24 @@ dest_endpoint(#{dest := Dest}) ->
Keys = [dest_exchange, dest_exchange_key, dest_queue],
maps:to_list(maps:filter(fun(K, _) -> proplists:is_defined(K, Keys) end, Dest)).

forward_pending(State) ->
case pop_pending(State) of
empty ->
State;
{{Tag, Props, Payload}, S} ->
S2 = do_forward(Tag, Props, Payload, S),
S3 = control_throttle(S2),
case is_blocked(S3) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Stop forwarding pending messages.
S3;
false ->
forward_pending(S3)
end
end.

forward(IncomingTag, Props, Payload, State) ->
State1 = control_throttle(State),
case is_blocked(State1) of
Expand Down Expand Up @@ -271,13 +290,19 @@ handle_dest({'EXIT', Conn, Reason}, #{dest := #{current := {Conn, _, _}}}) ->
handle_dest({'EXIT', _Pid, {shutdown, {server_initiated_close, ?PRECONDITION_FAILED, Reason}}}, _State) ->
{stop, {outbound_link_or_channel_closure, Reason}};

handle_dest(#'connection.blocked'{}, State) ->
update_blocked_by(connection_blocked, true, State);

handle_dest(#'connection.unblocked'{}, State) ->
State1 = update_blocked_by(connection_blocked, false, State),
%% we are unblocked so can begin to forward
forward_pending(State1);

handle_dest({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{Pending, State1} = reset_pending(control_throttle(State)),
State1 = control_throttle(State),
%% we have credit so can begin to forward
lists:foldl(fun ({Tag, Props, Payload}, S) ->
forward(Tag, Props, Payload, S)
end, State1, lists:reverse(Pending));
forward_pending(State1);

handle_dest(_Msg, _State) ->
not_handled.
Expand Down Expand Up @@ -358,12 +383,17 @@ is_blocked(_) ->
false.

add_pending(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, []),
State#{dest => Dest#{pending => [Elem|Pending]}}.

reset_pending(State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, []),
{Pending, State#{dest => Dest#{pending => []}}}.
Pending = maps:get(pending, Dest, queue:new()),
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.

pop_pending(State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
case queue:out(Pending) of
{empty, _} ->
empty;
{{value, Elem}, Pending2} ->
{Elem, State#{dest => Dest#{pending => Pending2}}}
end.

make_conn_and_chan([], {VHost, Name} = _ShovelName) ->
rabbit_log:error(
Expand Down
6 changes: 5 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
code_change/3]).

%% for testing purposes
-export([get_connection_name/1]).
-export([get_connection_name/1,
get_internal_config/1]).

-include("rabbit_shovel.hrl").

Expand Down Expand Up @@ -244,3 +245,6 @@ get_connection_name(_) ->
close_connections(#state{config = Conf}) ->
ok = rabbit_shovel_behaviour:close_source(Conf),
ok = rabbit_shovel_behaviour:close_dest(Conf).

get_internal_config(#state{config = Conf}) ->
Conf.
Loading

0 comments on commit f4a7d2f

Please sign in to comment.