Skip to content

Commit

Permalink
Optionally notify client app with AMQP 1.0 performative
Browse files Browse the repository at this point in the history
This commit notifies the client app with the AMQP performative if
connection config `notify_with_performative` is set to `true`.

This allows the client app to learn about all fields including
properties and capabilities returned by the AMQP server.
  • Loading branch information
ansd committed Oct 18, 2024
1 parent 3d668fd commit d1d7d7b
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 48 deletions.
4 changes: 4 additions & 0 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ begin_session_sync(Connection, Timeout) when is_pid(Connection) ->
receive
{amqp10_event, {session, Session, begun}} ->
{ok, Session};
{amqp10_event, {session, Session, {begun, #'v1_0.begin'{}}}} ->
{ok, Session};
{amqp10_event, {session, Session, {ended, Err}}} ->
{error, Err}
after Timeout -> session_timeout
Expand Down Expand Up @@ -186,6 +188,8 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
receive
{amqp10_event, {link, Ref, attached}} ->
{ok, Ref};
{amqp10_event, {link, Ref, {attached, #'v1_0.attach'{}}}} ->
{ok, Ref};
{amqp10_event, {link, Ref, {detached, Err}}} ->
{error, Err}
after ?TIMEOUT -> link_timeout
Expand Down
57 changes: 39 additions & 18 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
notify => pid() | none, % the pid to send connection events to
notify_when_opened => pid() | none,
notify_when_closed => pid() | none,
notify_with_performative => boolean(),
%% incoming maximum frame size set by our client application
max_frame_size => pos_integer(), % TODO: constrain to large than 512
%% outgoing maximum frame size set by AMQP peer in OPEN performative
Expand Down Expand Up @@ -253,7 +254,7 @@ hdr_sent({call, From}, begin_session,
{keep_state, State1}.

open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize,
idle_time_out = Timeout},
idle_time_out = Timeout} = Open,
#state{pending_session_reqs = PendingSessionReqs,
config = Config} = State0) ->
State = case Timeout of
Expand All @@ -278,7 +279,7 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize,
_ = gen_statem:reply(From, Ret),
S2
end, State1, PendingSessionReqs),
ok = notify_opened(Config),
ok = notify_opened(Config, Open),
{next_state, opened, State2#state{pending_session_reqs = []}};
open_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
Expand All @@ -292,19 +293,18 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) ->
ok = send_heartbeat(State),
{ok, Tmr} = start_heartbeat_timer(T),
{keep_state, State#state{heartbeat_timer = Tmr}};
opened(_EvtType, {close, Reason}, State = #state{config = Config}) ->
opened(_EvtType, {close, Reason}, State) ->
%% We send the first close frame and wait for the reply.
%% TODO: stop all sessions writing
%% We could still accept incoming frames (See: 2.4.6)
ok = notify_closed(Config, Reason),
case send_close(State, Reason) of
ok -> {next_state, close_sent, State};
{error, closed} -> {stop, normal, State};
Error -> {stop, Error, State}
end;
opened(_EvtType, #'v1_0.close'{error = Error}, State = #state{config = Config}) ->
opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
%% We receive the first close frame, reply and terminate.
ok = notify_closed(Config, translate_err(Error)),
ok = notify_closed(Config, Close),
_ = send_close(State, none),
{stop, normal, State};
opened({call, From}, begin_session, State) ->
Expand All @@ -329,7 +329,8 @@ close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _},
#state{reader = ReaderPid} = State) ->
%% if the reader exits we probably wont receive a close frame
{stop, normal, State};
close_sent(_EvtType, #'v1_0.close'{}, State) ->
close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
ok = notify_closed(Config, Close),
%% TODO: we should probably set up a timer before this to ensure
%% we close down event if no reply is received
{stop, normal, State}.
Expand Down Expand Up @@ -489,25 +490,45 @@ socket_shutdown({tcp, Socket}, How) ->
socket_shutdown({ssl, Socket}, How) ->
ssl:shutdown(Socket, How).

notify_opened(#{notify_when_opened := none}) ->
ok;
notify_opened(#{notify_when_opened := Pid}) when is_pid(Pid) ->
Pid ! amqp10_event(opened),
notify_opened(#{notify_when_opened := none}, _) ->
ok;
notify_opened(#{notify := Pid}) when is_pid(Pid) ->
Pid ! amqp10_event(opened),
ok;
notify_opened(_) ->
notify_opened(#{notify_when_opened := Pid} = Config, Perf)
when is_pid(Pid) ->
notify_opened0(Config, Pid, Perf);
notify_opened(#{notify := Pid} = Config, Perf)
when is_pid(Pid) ->
notify_opened0(Config, Pid, Perf);
notify_opened(_, _) ->
ok.

notify_opened0(Config, Pid, Perf) ->
Evt = case Config of
#{notify_with_performative := true} ->
{opened, Perf};
_ ->
opened
end,
Pid ! amqp10_event(Evt),
ok.

notify_closed(#{notify_when_closed := none}, _Reason) ->
ok;
notify_closed(#{notify := none}, _Reason) ->
ok;
notify_closed(#{notify_when_closed := Pid}, Reason) when is_pid(Pid) ->
Pid ! amqp10_event({closed, Reason}),
notify_closed(#{notify_when_closed := Pid} = Config, Reason)
when is_pid(Pid) ->
notify_closed0(Config, Pid, Reason);
notify_closed(#{notify := Pid} = Config, Reason)
when is_pid(Pid) ->
notify_closed0(Config, Pid, Reason).

notify_closed0(#{notify_with_performative := true}, Pid, Perf = #'v1_0.close'{}) ->
Pid ! amqp10_event({closed, Perf}),
ok;
notify_closed0(_, Pid, #'v1_0.close'{error = Error}) ->
Pid ! amqp10_event({closed, translate_err(Error)}),
ok;
notify_closed(#{notify := Pid}, Reason) when is_pid(Pid) ->
notify_closed0(_, Pid, Reason) ->
Pid ! amqp10_event({closed, Reason}),
ok.

Expand Down
67 changes: 46 additions & 21 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ unmapped({call, From}, {attach, Attach},
begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
next_outgoing_id = {uint, NOI},
incoming_window = {uint, InWindow},
outgoing_window = {uint, OutWindow}},
outgoing_window = {uint, OutWindow}} = Begin,
#state{early_attach_requests = EARs} = State) ->

State1 = State#state{remote_channel = RemoteChannel},
Expand All @@ -264,7 +264,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
S2
end, State1, EARs),

ok = notify_session_begun(State2),
ok = notify_session_begun(Begin, State2),

{next_state, mapped, State2#state{early_attach_requests = [],
next_incoming_id = NOI,
Expand All @@ -291,18 +291,17 @@ mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, Incomi
outgoing_window = ?UINT_OUTGOING_WINDOW},
ok = send(Flow, State),
{keep_state, State#state{incoming_window = IncomingWindow}};
mapped(cast, #'v1_0.end'{error = Err}, State) ->
mapped(cast, #'v1_0.end'{} = End, State) ->
%% We receive the first end frame, reply and terminate.
_ = send_end(State),
% TODO: send notifications for links?
Reason = reason(Err),
ok = notify_session_ended(State, Reason),
ok = notify_session_ended(End, State),
{stop, normal, State};
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
initial_delivery_count = IDC,
handle = {uint, InHandle},
role = PeerRoleBool,
max_message_size = MaybeMaxMessageSize},
max_message_size = MaybeMaxMessageSize} = Attach,
#state{links = Links, link_index = LinkIndex,
link_handle_index = LHI} = State0) ->

Expand All @@ -311,7 +310,7 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
LinkIndexKey = {OurRole, Name},
#{LinkIndexKey := OutHandle} = LinkIndex,
#{OutHandle := Link0} = Links,
ok = notify_link_attached(Link0),
ok = notify_link_attached(Link0, Attach, State0),

{DeliveryCount, MaxMessageSize} =
case Link0 of
Expand All @@ -334,13 +333,11 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
link_index = maps:remove(LinkIndexKey, LinkIndex),
link_handle_index = LHI#{InHandle => OutHandle}},
{keep_state, State};
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
error = Err},
mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}} = Detach,
#state{links = Links, link_handle_index = LHI} = State0) ->
with_link(InHandle, State0,
fun (#link{output_handle = OutHandle} = Link, State) ->
Reason = reason(Err),
ok = notify_link_detached(Link, Reason),
ok = notify_link_detached(Link, Detach, State),
{keep_state,
State#state{links = maps:remove(OutHandle, Links),
link_handle_index = maps:remove(InHandle, LHI)}}
Expand Down Expand Up @@ -552,9 +549,8 @@ mapped(_EvtType, Msg, _State) ->
[Msg, 10]),
keep_state_and_data.

end_sent(_EvtType, #'v1_0.end'{error = Err}, State) ->
Reason = reason(Err),
ok = notify_session_ended(State, Reason),
end_sent(_EvtType, #'v1_0.end'{} = End, State) ->
ok = notify_session_ended(End, State),
{stop, normal, State};
end_sent(_EvtType, _Frame, _State) ->
% just drop frames here
Expand Down Expand Up @@ -989,22 +985,51 @@ maybe_notify_link_credit(#link{role = sender,
maybe_notify_link_credit(_Old, _New) ->
ok.

notify_link_attached(Link) ->
notify_link(Link, attached).

notify_link_detached(Link, Reason) ->
notify_link_attached(Link, Perf, #state{connection_config = Cfg}) ->
What = case Cfg of
#{notify_with_performative := true} ->
{attached, Perf};
_ ->
attached
end,
notify_link(Link, What).

notify_link_detached(Link,
Perf = #'v1_0.detach'{error = Err},
#state{connection_config = Cfg}) ->
Reason = case Cfg of
#{notify_with_performative := true} ->
Perf;
_ ->
reason(Err)
end,
notify_link(Link, {detached, Reason}).

notify_link(#link{notify = Pid, ref = Ref}, What) ->
Evt = {amqp10_event, {link, Ref, What}},
Pid ! Evt,
ok.

notify_session_begun(#state{notify = Pid}) ->
Pid ! amqp10_session_event(begun),
notify_session_begun(Perf, #state{notify = Pid,
connection_config = Cfg}) ->
Evt = case Cfg of
#{notify_with_performative := true} ->
{begun, Perf};
_ ->
begun
end,
Pid ! amqp10_session_event(Evt),
ok.

notify_session_ended(#state{notify = Pid}, Reason) ->
notify_session_ended(Perf = #'v1_0.end'{error = Err},
#state{notify = Pid,
connection_config = Cfg}) ->
Reason = case Cfg of
#{notify_with_performative := true} ->
Perf;
_ ->
reason(Err)
end,
Pid ! amqp10_session_event({ended, Reason}),
ok.

Expand Down
65 changes: 59 additions & 6 deletions deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ all() ->

groups() ->
[
{rabbitmq, [], shared()},
{rabbitmq, [], shared() ++ [notify_with_performative]},
{activemq, [], shared()},
{rabbitmq_strict, [], [
basic_roundtrip_tls,
Expand Down Expand Up @@ -458,6 +458,52 @@ transfer_id_vs_delivery_id(Config) ->
?assertEqual(serial_number:add(amqp10_msg:delivery_id(RcvMsg1), 1),
amqp10_msg:delivery_id(RcvMsg2)).

notify_with_performative(Config) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),

OpenConf = #{?FUNCTION_NAME => true,
address => Hostname,
port => Port,
sasl => anon},

{ok, Connection} = amqp10_client:open_connection(OpenConf),
receive {amqp10_event, {connection, Connection, {opened, #'v1_0.open'{}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

{ok, Session1} = amqp10_client:begin_session(Connection),
receive {amqp10_event, {session, Session1, {begun, #'v1_0.begin'{}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

{ok, Sender1} = amqp10_client:attach_sender_link(Session1, <<"sender 1">>, <<"/exchanges/amq.fanout">>),
receive {amqp10_event, {link, Sender1, {attached, #'v1_0.attach'{}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

ok = amqp10_client:detach_link(Sender1),
receive {amqp10_event, {link, Sender1, {detached, #'v1_0.detach'{}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

ok = amqp10_client:end_session(Session1),
receive {amqp10_event, {session, Session1, {ended, #'v1_0.end'{}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end,

%% Test that the amqp10_client:*_sync functions work.
{ok, Session2} = amqp10_client:begin_session_sync(Connection),
{ok, Sender2} = amqp10_client:attach_sender_link_sync(Session2, <<"sender 2">>, <<"/exchanges/amq.fanout">>),
ok = amqp10_client:detach_link(Sender2),
ok = amqp10_client:end_session(Session2),
flush(),

ok = amqp10_client:close_connection(Connection),
receive {amqp10_event, {connection, Connection, {closed, #'v1_0.close'{}}}} -> ok
after 5000 -> ct:fail({missing_event, ?LINE})
end.

% a message is sent before the link attach is guaranteed to
% have completed and link credit granted
% also queue a link detached immediately after transfer
Expand Down Expand Up @@ -832,8 +878,10 @@ incoming_heartbeat(Config) ->
Hostname = ?config(mock_host, Config),
Port = ?config(mock_port, Config),
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>},
idle_time_out = {uint, 0}}]}
{Ch, [#'v1_0.open'{
container_id = {utf8, <<"mock">>},
%% The server doesn't expect any heartbeats from us (client).
idle_time_out = {uint, 0}}]}
end,

CloseStep = fun({0 = Ch, #'v1_0.close'{error = _TODO}, _Pay}) ->
Expand All @@ -847,20 +895,25 @@ incoming_heartbeat(Config) ->
MockRef = monitor(process, MockPid),
ok = mock_server:set_steps(Mock, Steps),
CConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config),
idle_time_out => 1000, notify => self()},
%% If the server does not send any traffic to us (client), we will expect
%% our client to close the connection after 1 second because
%% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold."
idle_time_out => 500,
notify => self()},
{ok, Connection} = amqp10_client:open_connection(CConf),
%% We expect our client to initiate closing the connection
%% and the server to reply with a close frame.
receive
{amqp10_event,
{connection, Connection0,
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}}
{closed, _}}}
when Connection0 =:= Connection ->
ok
after 5000 ->
exit(incoming_heartbeat_assert)
end,
demonitor(MockRef).


%%% HELPERS
%%%

Expand Down
4 changes: 1 addition & 3 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4611,9 +4611,7 @@ idle_time_out_on_client(Config) ->
receive
{amqp10_event,
{connection, Connection,
{closed,
{resource_limit_exceeded,
<<"remote idle-time-out">>}}}} -> ok
{closed, _}}} -> ok
after 5000 ->
ct:fail({missing_event, ?LINE})
end,
Expand Down
5 changes: 5 additions & 0 deletions deps/rabbit/test/amqp_system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ groups() ->
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------

suite() ->
[
{timetrap, {minutes, 3}}
].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config.
Expand Down
Loading

0 comments on commit d1d7d7b

Please sign in to comment.