diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index c5ebc7ba123f..e296d3ff8533 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -144,6 +144,8 @@ begin_session_sync(Connection, Timeout) when is_pid(Connection) -> receive {amqp10_event, {session, Session, begun}} -> {ok, Session}; + {amqp10_event, {session, Session, {begun, #'v1_0.begin'{}}}} -> + {ok, Session}; {amqp10_event, {session, Session, {ended, Err}}} -> {error, Err} after Timeout -> session_timeout @@ -186,6 +188,8 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) -> receive {amqp10_event, {link, Ref, attached}} -> {ok, Ref}; + {amqp10_event, {link, Ref, {attached, #'v1_0.attach'{}}}} -> + {ok, Ref}; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} after ?TIMEOUT -> link_timeout diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index df0548aa9ef1..8fbcb22f3d1b 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -63,6 +63,7 @@ notify => pid() | none, % the pid to send connection events to notify_when_opened => pid() | none, notify_when_closed => pid() | none, + notify_with_performative => boolean(), %% incoming maximum frame size set by our client application max_frame_size => pos_integer(), % TODO: constrain to large than 512 %% outgoing maximum frame size set by AMQP peer in OPEN performative @@ -253,7 +254,7 @@ hdr_sent({call, From}, begin_session, {keep_state, State1}. open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, - idle_time_out = Timeout}, + idle_time_out = Timeout} = Open, #state{pending_session_reqs = PendingSessionReqs, config = Config} = State0) -> State = case Timeout of @@ -278,7 +279,7 @@ open_sent(_EvtType, #'v1_0.open'{max_frame_size = MaybeMaxFrameSize, _ = gen_statem:reply(From, Ret), S2 end, State1, PendingSessionReqs), - ok = notify_opened(Config), + ok = notify_opened(Config, Open), {next_state, opened, State2#state{pending_session_reqs = []}}; open_sent({call, From}, begin_session, #state{pending_session_reqs = PendingSessionReqs} = State) -> @@ -292,19 +293,18 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> ok = send_heartbeat(State), {ok, Tmr} = start_heartbeat_timer(T), {keep_state, State#state{heartbeat_timer = Tmr}}; -opened(_EvtType, {close, Reason}, State = #state{config = Config}) -> +opened(_EvtType, {close, Reason}, State) -> %% We send the first close frame and wait for the reply. %% TODO: stop all sessions writing %% We could still accept incoming frames (See: 2.4.6) - ok = notify_closed(Config, Reason), case send_close(State, Reason) of ok -> {next_state, close_sent, State}; {error, closed} -> {stop, normal, State}; Error -> {stop, Error, State} end; -opened(_EvtType, #'v1_0.close'{error = Error}, State = #state{config = Config}) -> +opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> %% We receive the first close frame, reply and terminate. - ok = notify_closed(Config, translate_err(Error)), + ok = notify_closed(Config, Close), _ = send_close(State, none), {stop, normal, State}; opened({call, From}, begin_session, State) -> @@ -329,7 +329,8 @@ close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _}, #state{reader = ReaderPid} = State) -> %% if the reader exits we probably wont receive a close frame {stop, normal, State}; -close_sent(_EvtType, #'v1_0.close'{}, State) -> +close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) -> + ok = notify_closed(Config, Close), %% TODO: we should probably set up a timer before this to ensure %% we close down event if no reply is received {stop, normal, State}. @@ -489,25 +490,45 @@ socket_shutdown({tcp, Socket}, How) -> socket_shutdown({ssl, Socket}, How) -> ssl:shutdown(Socket, How). -notify_opened(#{notify_when_opened := none}) -> - ok; -notify_opened(#{notify_when_opened := Pid}) when is_pid(Pid) -> - Pid ! amqp10_event(opened), +notify_opened(#{notify_when_opened := none}, _) -> ok; -notify_opened(#{notify := Pid}) when is_pid(Pid) -> - Pid ! amqp10_event(opened), - ok; -notify_opened(_) -> +notify_opened(#{notify_when_opened := Pid} = Config, Perf) + when is_pid(Pid) -> + notify_opened0(Config, Pid, Perf); +notify_opened(#{notify := Pid} = Config, Perf) + when is_pid(Pid) -> + notify_opened0(Config, Pid, Perf); +notify_opened(_, _) -> + ok. + +notify_opened0(Config, Pid, Perf) -> + Evt = case Config of + #{notify_with_performative := true} -> + {opened, Perf}; + _ -> + opened + end, + Pid ! amqp10_event(Evt), ok. notify_closed(#{notify_when_closed := none}, _Reason) -> ok; notify_closed(#{notify := none}, _Reason) -> ok; -notify_closed(#{notify_when_closed := Pid}, Reason) when is_pid(Pid) -> - Pid ! amqp10_event({closed, Reason}), +notify_closed(#{notify_when_closed := Pid} = Config, Reason) + when is_pid(Pid) -> + notify_closed0(Config, Pid, Reason); +notify_closed(#{notify := Pid} = Config, Reason) + when is_pid(Pid) -> + notify_closed0(Config, Pid, Reason). + +notify_closed0(#{notify_with_performative := true}, Pid, Perf = #'v1_0.close'{}) -> + Pid ! amqp10_event({closed, Perf}), + ok; +notify_closed0(_, Pid, #'v1_0.close'{error = Error}) -> + Pid ! amqp10_event({closed, translate_err(Error)}), ok; -notify_closed(#{notify := Pid}, Reason) when is_pid(Pid) -> +notify_closed0(_, Pid, Reason) -> Pid ! amqp10_event({closed, Reason}), ok. diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 911886ce4143..7e2c82560398 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -254,7 +254,7 @@ unmapped({call, From}, {attach, Attach}, begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, next_outgoing_id = {uint, NOI}, incoming_window = {uint, InWindow}, - outgoing_window = {uint, OutWindow}}, + outgoing_window = {uint, OutWindow}} = Begin, #state{early_attach_requests = EARs} = State) -> State1 = State#state{remote_channel = RemoteChannel}, @@ -264,7 +264,7 @@ begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, S2 end, State1, EARs), - ok = notify_session_begun(State2), + ok = notify_session_begun(Begin, State2), {next_state, mapped, State2#state{early_attach_requests = [], next_incoming_id = NOI, @@ -291,18 +291,17 @@ mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, Incomi outgoing_window = ?UINT_OUTGOING_WINDOW}, ok = send(Flow, State), {keep_state, State#state{incoming_window = IncomingWindow}}; -mapped(cast, #'v1_0.end'{error = Err}, State) -> +mapped(cast, #'v1_0.end'{} = End, State) -> %% We receive the first end frame, reply and terminate. _ = send_end(State), % TODO: send notifications for links? - Reason = reason(Err), - ok = notify_session_ended(State, Reason), + ok = notify_session_ended(End, State), {stop, normal, State}; mapped(cast, #'v1_0.attach'{name = {utf8, Name}, initial_delivery_count = IDC, handle = {uint, InHandle}, role = PeerRoleBool, - max_message_size = MaybeMaxMessageSize}, + max_message_size = MaybeMaxMessageSize} = Attach, #state{links = Links, link_index = LinkIndex, link_handle_index = LHI} = State0) -> @@ -311,7 +310,7 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, LinkIndexKey = {OurRole, Name}, #{LinkIndexKey := OutHandle} = LinkIndex, #{OutHandle := Link0} = Links, - ok = notify_link_attached(Link0), + ok = notify_link_attached(Link0, Attach, State0), {DeliveryCount, MaxMessageSize} = case Link0 of @@ -334,13 +333,11 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name}, link_index = maps:remove(LinkIndexKey, LinkIndex), link_handle_index = LHI#{InHandle => OutHandle}}, {keep_state, State}; -mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}, - error = Err}, +mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}} = Detach, #state{links = Links, link_handle_index = LHI} = State0) -> with_link(InHandle, State0, fun (#link{output_handle = OutHandle} = Link, State) -> - Reason = reason(Err), - ok = notify_link_detached(Link, Reason), + ok = notify_link_detached(Link, Detach, State), {keep_state, State#state{links = maps:remove(OutHandle, Links), link_handle_index = maps:remove(InHandle, LHI)}} @@ -552,9 +549,8 @@ mapped(_EvtType, Msg, _State) -> [Msg, 10]), keep_state_and_data. -end_sent(_EvtType, #'v1_0.end'{error = Err}, State) -> - Reason = reason(Err), - ok = notify_session_ended(State, Reason), +end_sent(_EvtType, #'v1_0.end'{} = End, State) -> + ok = notify_session_ended(End, State), {stop, normal, State}; end_sent(_EvtType, _Frame, _State) -> % just drop frames here @@ -989,10 +985,24 @@ maybe_notify_link_credit(#link{role = sender, maybe_notify_link_credit(_Old, _New) -> ok. -notify_link_attached(Link) -> - notify_link(Link, attached). - -notify_link_detached(Link, Reason) -> +notify_link_attached(Link, Perf, #state{connection_config = Cfg}) -> + What = case Cfg of + #{notify_with_performative := true} -> + {attached, Perf}; + _ -> + attached + end, + notify_link(Link, What). + +notify_link_detached(Link, + Perf = #'v1_0.detach'{error = Err}, + #state{connection_config = Cfg}) -> + Reason = case Cfg of + #{notify_with_performative := true} -> + Perf; + _ -> + reason(Err) + end, notify_link(Link, {detached, Reason}). notify_link(#link{notify = Pid, ref = Ref}, What) -> @@ -1000,11 +1010,26 @@ notify_link(#link{notify = Pid, ref = Ref}, What) -> Pid ! Evt, ok. -notify_session_begun(#state{notify = Pid}) -> - Pid ! amqp10_session_event(begun), +notify_session_begun(Perf, #state{notify = Pid, + connection_config = Cfg}) -> + Evt = case Cfg of + #{notify_with_performative := true} -> + {begun, Perf}; + _ -> + begun + end, + Pid ! amqp10_session_event(Evt), ok. -notify_session_ended(#state{notify = Pid}, Reason) -> +notify_session_ended(Perf = #'v1_0.end'{error = Err}, + #state{notify = Pid, + connection_config = Cfg}) -> + Reason = case Cfg of + #{notify_with_performative := true} -> + Perf; + _ -> + reason(Err) + end, Pid ! amqp10_session_event({ended, Reason}), ok. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index 7a64425c7583..27a59d5efef8 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -30,7 +30,7 @@ all() -> groups() -> [ - {rabbitmq, [], shared()}, + {rabbitmq, [], shared() ++ [notify_with_performative]}, {activemq, [], shared()}, {rabbitmq_strict, [], [ basic_roundtrip_tls, @@ -458,6 +458,52 @@ transfer_id_vs_delivery_id(Config) -> ?assertEqual(serial_number:add(amqp10_msg:delivery_id(RcvMsg1), 1), amqp10_msg:delivery_id(RcvMsg2)). +notify_with_performative(Config) -> + Hostname = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + + OpenConf = #{?FUNCTION_NAME => true, + address => Hostname, + port => Port, + sasl => anon}, + + {ok, Connection} = amqp10_client:open_connection(OpenConf), + receive {amqp10_event, {connection, Connection, {opened, #'v1_0.open'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Session1} = amqp10_client:begin_session(Connection), + receive {amqp10_event, {session, Session1, {begun, #'v1_0.begin'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender1} = amqp10_client:attach_sender_link(Session1, <<"sender 1">>, <<"/exchanges/amq.fanout">>), + receive {amqp10_event, {link, Sender1, {attached, #'v1_0.attach'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:detach_link(Sender1), + receive {amqp10_event, {link, Sender1, {detached, #'v1_0.detach'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:end_session(Session1), + receive {amqp10_event, {session, Session1, {ended, #'v1_0.end'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Test that the amqp10_client:*_sync functions work. + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Sender2} = amqp10_client:attach_sender_link_sync(Session2, <<"sender 2">>, <<"/exchanges/amq.fanout">>), + ok = amqp10_client:detach_link(Sender2), + ok = amqp10_client:end_session(Session2), + flush(), + + ok = amqp10_client:close_connection(Connection), + receive {amqp10_event, {connection, Connection, {closed, #'v1_0.close'{}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end. + % a message is sent before the link attach is guaranteed to % have completed and link credit granted % also queue a link detached immediately after transfer @@ -832,8 +878,10 @@ incoming_heartbeat(Config) -> Hostname = ?config(mock_host, Config), Port = ?config(mock_port, Config), OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) -> - {Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}, - idle_time_out = {uint, 0}}]} + {Ch, [#'v1_0.open'{ + container_id = {utf8, <<"mock">>}, + %% The server doesn't expect any heartbeats from us (client). + idle_time_out = {uint, 0}}]} end, CloseStep = fun({0 = Ch, #'v1_0.close'{error = _TODO}, _Pay}) -> @@ -847,12 +895,18 @@ incoming_heartbeat(Config) -> MockRef = monitor(process, MockPid), ok = mock_server:set_steps(Mock, Steps), CConf = #{address => Hostname, port => Port, sasl => ?config(sasl, Config), - idle_time_out => 1000, notify => self()}, + %% If the server does not send any traffic to us (client), we will expect + %% our client to close the connection after 1 second because + %% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold." + idle_time_out => 500, + notify => self()}, {ok, Connection} = amqp10_client:open_connection(CConf), + %% We expect our client to initiate closing the connection + %% and the server to reply with a close frame. receive {amqp10_event, {connection, Connection0, - {closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} + {closed, _}}} when Connection0 =:= Connection -> ok after 5000 -> @@ -860,7 +914,6 @@ incoming_heartbeat(Config) -> end, demonitor(MockRef). - %%% HELPERS %%% diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 79f57bdc7e2d..dd641328601b 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4611,9 +4611,7 @@ idle_time_out_on_client(Config) -> receive {amqp10_event, {connection, Connection, - {closed, - {resource_limit_exceeded, - <<"remote idle-time-out">>}}}} -> ok + {closed, _}}} -> ok after 5000 -> ct:fail({missing_event, ?LINE}) end, diff --git a/deps/rabbit/test/amqp_system_SUITE.erl b/deps/rabbit/test/amqp_system_SUITE.erl index 251df8fc8013..d739c7b3fc96 100644 --- a/deps/rabbit/test/amqp_system_SUITE.erl +++ b/deps/rabbit/test/amqp_system_SUITE.erl @@ -52,6 +52,11 @@ groups() -> %% Testsuite setup/teardown. %% ------------------------------------------------------------------- +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), Config. diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index ce38b0241d10..0fde808151d8 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -97,6 +97,8 @@ await_attached(Ref) -> receive {amqp10_event, {link, Ref, attached}} -> ok; + {amqp10_event, {link, Ref, {attached, #'v1_0.attach'{}}}} -> + ok; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} after ?TIMEOUT -> @@ -129,6 +131,8 @@ await_detached(Ref) -> receive {amqp10_event, {link, Ref, {detached, normal}}} -> ok; + {amqp10_event, {link, Ref, {detached, #'v1_0.detach'{}}}} -> + ok; {amqp10_event, {link, Ref, {detached, Err}}} -> {error, Err} after ?TIMEOUT ->