diff --git a/guides/telemetry.md b/guides/telemetry.md index 1029d132..d895c666 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -2,20 +2,41 @@ Amoc also exposes the following telemetry events: ## Scenario -A telemetry span of a full scenario execution +All telemetry spans below contain an extra key `return` in the metadata for the `stop` event with the return value of the given callback. + +A telemetry span of a scenario initialisation (i.e. the exported `init/0` function): +```erlang +event_name: [amoc, scenario, init, _] +measurements: #{} %% As described in `telemetry:span/3` +metadata: #{scenario := module()} %% Plus as described in `telemetry:span/3` +``` + +A telemetry span of a full scenario execution for a user (i.e. the exported `start/1,2` function): ```erlang -event_name: [amoc, scenario, user] -measurements: #{} -metadata: #{} +event_name: [amoc, scenario, start, _] +measurements: #{} %% As described in `telemetry:span/3` +metadata: #{scenario := module(), %% Running scenario + state := term(), %% The state as returned by `init/0` + user_id := non_neg_integer() %% User ID assigned to the running process + } %% Plus as described in `telemetry:span/3` +``` + +A telemetry span of a full scenario execution for a user (i.e. the exported `terminate/1,2` function): +```erlang +event_name: [amoc, scenario, terminate, _] +measurements: #{} %% As described in `telemetry:span/3` +metadata: #{scenario := module(), %% Running scenario + state := term() %% The state as returned by `init/0` + } %% Plus as described in `telemetry:span/3` ``` ## Controller -Indicates the number of users added or removed +Indicates the number of users manually added or removed ```erlang event_name: [amoc, controller, users] -measurements: #{count => non_neg_integer()} -metadata: #{type => add | remove} +measurements: #{count := non_neg_integer()} +metadata: #{monotonic_time := integer(), scenario := module(), type := add | remove} ``` ## Throttle @@ -23,42 +44,38 @@ metadata: #{type => add | remove} ### Init Raised when a throttle mechanism is initialised. - ```erlang event_name: [amoc, throttle, init] -measurements: #{count => 1} -metadata: #{name => atom()} +measurements: #{count := 1} +metadata: #{monotonic_time := integer(), name := atom()} ``` ### Rate Raised when a throttle mechanism is initialised or its configured rate is changed. This event is raised only on the master node. - ```erlang event_name: [amoc, throttle, rate] -measurements: #{rate => non_neg_integer()} -metadata: #{name => atom()} +measurements: #{rate := non_neg_integer()} +metadata: #{monotonic_time := integer(), name := atom(), msg => binary()} ``` ### Request Raised when a process client requests to be allowed pass through a throttled mechanism. - ```erlang event_name: [amoc, throttle, request] -measurements: #{count => 1} -metadata: #{name => atom()} +measurements: #{count := 1} +metadata: #{monotonic_time := integer(), name := atom()} ``` ### Execute Raised when a process client is allowed to execute after a throttled mechanism. - ```erlang event_name: [amoc, throttle, execute] -measurements: #{count => 1} -metadata: #{name => atom()} +measurements: #{count := 1} +metadata: #{monotonic_time := integer(), name := atom()} ``` ## Coordinator @@ -68,6 +85,6 @@ Indicates when a coordinating event was raised, like a process being added for c ### Event ```erlang event_name: [amoc, coordinator, start | stop | add | reset | timeout] -measurements: #{count => 1} -metadata: #{name => atom()} +measurements: #{count := 1} +metadata: #{monotonic_time := integer(), name := atom()} ``` diff --git a/src/amoc_controller.erl b/src/amoc_controller.erl index a9c6874b..c98860b1 100644 --- a/src/amoc_controller.erl +++ b/src/amoc_controller.erl @@ -99,13 +99,11 @@ update_settings(Settings) -> -spec add_users(amoc_scenario:user_id(), amoc_scenario:user_id()) -> ok | {error, term()}. add_users(StartId, EndId) -> - telemetry:execute([amoc, controller, users], #{count => EndId - StartId + 1}, #{type => add}), %% adding the exact range of the users gen_server:call(?SERVER, {add, StartId, EndId}). -spec remove_users(user_count(), boolean()) -> {ok, user_count()}. remove_users(Count, ForceRemove) -> - telemetry:execute([amoc, controller, users], #{count => Count}, #{type => remove}), %% trying to remove Count users, this action is async!!! gen_server:call(?SERVER, {remove, Count, ForceRemove}). @@ -207,9 +205,8 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) -> {{error, {invalid_status, Status}}, State}. -spec handle_stop_scenario(state()) -> {handle_call_res(), state()}. -handle_stop_scenario(#state{scenario = Scenario, scenario_state = ScenarioState, - no_of_users = 0, status = running} = State) -> - amoc_scenario:terminate(Scenario, ScenarioState), +handle_stop_scenario(#state{no_of_users = 0, status = running} = State) -> + terminate_scenario(State), {ok, State#state{status = finished}}; handle_stop_scenario(#state{status = running} = State) -> terminate_all_users(), @@ -231,8 +228,11 @@ handle_update_settings(_Settings, #state{status = Status}) -> handle_add(StartId, EndId, #state{last_user_id = LastId, create_users = ScheduledUsers, status = running, + scenario = Scenario, tref = TRef} = State) when StartId =< EndId, LastId < StartId -> + amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1}, + #{scenario => Scenario, type => add}), NewUsers = lists:seq(StartId, EndId), NewScheduledUsers = lists:append(ScheduledUsers, NewUsers), NewTRef = maybe_start_timer(TRef), @@ -244,7 +244,9 @@ handle_add(_StartId, _EndId, #state{status = Status} = State) -> {{error, {invalid_status, Status}}, State}. -spec handle_remove(user_count(), boolean(), state()) -> handle_call_res(). -handle_remove(Count, ForceRemove, #state{status = running}) -> +handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) -> + amoc_telemetry:execute([controller, users], #{count => Count}, + #{scenario => Scenario, type => remove}), Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of {Objects, _} -> [Pid || {_Id, Pid} <- Objects]; '$end_of_table' -> [] @@ -311,6 +313,10 @@ init_scenario(Scenario, Settings) -> {error, Type, Reason} -> {error, {Type, Reason}} end. +-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}. +terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) -> + amoc_scenario:terminate(Scenario, ScenarioState). + -spec maybe_start_timer(timer:tref() | undefined) -> timer:tref(). maybe_start_timer(undefined) -> {ok, TRef} = timer:send_interval(interarrival(), start_user), @@ -347,9 +353,8 @@ terminate_all_users({Objects, Continuation}) -> terminate_all_users('$end_of_table') -> ok. -spec dec_no_of_users(state()) -> state(). -dec_no_of_users(#state{scenario = Scenario, scenario_state = ScenarioState, - no_of_users = 1, status = terminating} = State) -> - amoc_scenario:terminate(Scenario, ScenarioState), +dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) -> + terminate_scenario(State), State#state{no_of_users = 0, status = finished}; dec_no_of_users(#state{no_of_users = N} = State) -> State#state{no_of_users = N - 1}. diff --git a/src/amoc_coordinator/amoc_coordinator.erl b/src/amoc_coordinator/amoc_coordinator.erl index 374c9379..17e70a2b 100644 --- a/src/amoc_coordinator/amoc_coordinator.erl +++ b/src/amoc_coordinator/amoc_coordinator.erl @@ -74,7 +74,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) -> Plan = normalize_coordination_plan(CoordinationPlan), case gen_event:start({local, Name}) of {ok, _} -> - telemetry:execute([amoc, coordinator, start], #{count => 1}, #{name => Name}), + amoc_telemetry:execute([coordinator, start], #{count => 1}, #{name => Name}), %% according to gen_event documentation: %% %% When the event is received, the event manager calls @@ -99,7 +99,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) -> -spec stop(name()) -> ok. stop(Name) -> gen_event:stop(Name), - telemetry:execute([amoc, coordinator, stop], #{count => 1}, #{name => Name}). + amoc_telemetry:execute([coordinator, stop], #{count => 1}, #{name => Name}). %% @see add/3 -spec add(name(), any()) -> ok. @@ -156,15 +156,14 @@ init(CoordinationItem) -> -spec handle_event(Event :: term(), state()) -> {ok, state()}. handle_event(Event, {timeout, Name, Pid}) -> %% there's only one "timeout" event handler for coordinator, - %% so calling telemetry:execute/3 here to ensure that it's + %% so calling amoc_telemetry:execute/3 here to ensure that it's %% triggered just once per event. TelemetryEvent = case Event of {coordinate, _} -> add; reset_coordinator -> reset; coordinator_timeout -> timeout end, - telemetry:execute([amoc, coordinator, TelemetryEvent], - #{count => 1}, #{name => Name}), + amoc_telemetry:execute([coordinator, TelemetryEvent], #{count => 1}, #{name => Name}), erlang:send(Pid, Event), {ok, {timeout, Name, Pid}}; handle_event(Event, {worker, WorkerPid}) -> diff --git a/src/amoc_scenario.erl b/src/amoc_scenario.erl index 798ea015..f5fc6000 100644 --- a/src/amoc_scenario.erl +++ b/src/amoc_scenario.erl @@ -28,26 +28,31 @@ %%------------------------------------------------------------------------- %% @doc Applies the `Scenario:init/0' callback +%% +%% Runs on the controller process and spans a `[amoc, scenario, init, _]' telemetry event. -spec init(amoc:scenario()) -> {ok, state()} | {error, Reason :: term()}. init(Scenario) -> - apply_safely(Scenario, init, []). + apply_safely(Scenario, init, [], #{scenario => Scenario}). %% @doc Applies the `Scenario:terminate/0,1' callback %% %% `Scenario:terminate/0' and `Scenario:terminate/1' callbacks are optional. %% If the scenario module exports both functions, `Scenario:terminate/1' is used. +%% +%% Runs on the controller process and spans a `[amoc, scenario, terminate, _]' telemetry event. -spec terminate(amoc:scenario(), state()) -> ok | {ok, any()} | {error, Reason :: term()}. terminate(Scenario, State) -> + Metadata = #{scenario => Scenario, state => State}, case {erlang:function_exported(Scenario, terminate, 1), erlang:function_exported(Scenario, terminate, 0)} of {true, _} -> %% since we ignore Scenario:terminate/1 return value %% we can use apply_safely/3 function - apply_safely(Scenario, terminate, [State]); + apply_safely(Scenario, terminate, [State], Metadata); {_, true} -> %% since we ignore Scenario:terminate/0 return value %% we can use apply_safely/3 function - apply_safely(Scenario, terminate, []); + apply_safely(Scenario, terminate, [], Metadata); _ -> ok end. @@ -56,25 +61,39 @@ terminate(Scenario, State) -> %% %% Either `Scenario:start/1' or `Scenario:start/2' must be exported from the behaviour module. %% if scenario module exports both functions, `Scenario:start/2' is used. +%% +%% Runs on the user process and spans a `[amoc, scenario, user, _]' telemetry event. -spec start(amoc:scenario(), user_id(), state()) -> any(). start(Scenario, Id, State) -> - case {erlang:function_exported(Scenario, start, 2), - erlang:function_exported(Scenario, start, 1)} of - {true, _} -> - Scenario:start(Id, State); - {_, true} -> - Scenario:start(Id); - {false, false} -> - error("the scenario module must export either start/2 or start/1 function") - end. + Metadata = #{scenario => Scenario, state => State, user_id => Id}, + Span = case {erlang:function_exported(Scenario, start, 2), + erlang:function_exported(Scenario, start, 1)} of + {true, _} -> + fun() -> + Ret = Scenario:start(Id, State), + {Ret, Metadata#{return => Ret}} + end; + {_, true} -> + fun() -> + Ret = Scenario:start(Id), + {Ret, Metadata#{return => Ret}} + end; + {false, false} -> + exit("the scenario module must export either start/2 or start/1 function") + end, + telemetry:span([amoc, scenario, start], Metadata, Span). %% ------------------------------------------------------------------ %% internal functions %% ------------------------------------------------------------------ --spec apply_safely(atom(), atom(), [term()]) -> {ok | error, term()}. -apply_safely(M, F, A) -> - try erlang:apply(M, F, A) of +-spec apply_safely(atom(), atom(), [term()], map()) -> {ok | error, term()}. +apply_safely(M, F, A, Metadata) -> + Span = fun() -> + Ret = erlang:apply(M, F, A), + {Ret, Metadata#{return => Ret}} + end, + try telemetry:span([amoc, scenario, F], Metadata, Span) of {ok, RetVal} -> {ok, RetVal}; {error, Error} -> {error, Error}; Result -> {ok, Result} diff --git a/src/amoc_telemetry.erl b/src/amoc_telemetry.erl new file mode 100644 index 00000000..56ec5df1 --- /dev/null +++ b/src/amoc_telemetry.erl @@ -0,0 +1,15 @@ +%% @private +%% @copyright 2023 Erlang Solutions Ltd. +-module(amoc_telemetry). + +-export([execute/3]). + +-spec execute(EventName, Measurements, Metadata) -> ok when + EventName :: telemetry:event_name(), + Measurements :: telemetry:event_measurements(), + Metadata :: telemetry:event_metadata(). +execute(Name, Measurements, Metadata) -> + TimeStamp = erlang:monotonic_time(), + NameWithAmocPrefix = [amoc | Name], + MetadataWithTS = Metadata#{monotonic_time => TimeStamp}, + telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS). diff --git a/src/amoc_throttle/amoc_throttle_controller.erl b/src/amoc_throttle/amoc_throttle_controller.erl index b1fea17c..3e62ad49 100644 --- a/src/amoc_throttle/amoc_throttle_controller.erl +++ b/src/amoc_throttle/amoc_throttle_controller.erl @@ -204,7 +204,10 @@ maybe_raise_event(Name, Event) -> end. raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init -> - telemetry:execute([amoc, throttle, Event], #{count => 1}, #{name => Name}). + amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}). + +report_rate(Name, RatePerMinute) -> + amoc_telemetry:execute([throttle, rate], #{rate => RatePerMinute}, #{name => Name}). -spec change_rate_and_stop_plan(name(), state()) -> state(). change_rate_and_stop_plan(Name, State) -> @@ -329,6 +332,3 @@ run_cmd(Pid, pause) -> amoc_throttle_process:pause(Pid); run_cmd(Pid, resume) -> amoc_throttle_process:resume(Pid). - -report_rate(Name, RatePerMinute) -> - telemetry:execute([amoc, throttle, rate], #{rate => RatePerMinute}, #{name => Name}). diff --git a/src/amoc_user.erl b/src/amoc_user.erl index 92338225..aa2e47b5 100644 --- a/src/amoc_user.erl +++ b/src/amoc_user.erl @@ -27,5 +27,4 @@ stop(Pid, Force) when is_pid(Pid) -> init(Parent, Scenario, Id, State) -> proc_lib:init_ack(Parent, {ok, self()}), process_flag(trap_exit, true), - ScenarioFun = fun() -> {amoc_scenario:start(Scenario, Id, State), #{}} end, - telemetry:span([amoc, scenario, user], #{}, ScenarioFun). + amoc_scenario:start(Scenario, Id, State). diff --git a/test/amoc_coordinator_SUITE.erl b/test/amoc_coordinator_SUITE.erl index 88dc3f57..90241755 100644 --- a/test/amoc_coordinator_SUITE.erl +++ b/test/amoc_coordinator_SUITE.erl @@ -327,9 +327,8 @@ assert_telemetry_events(Name, [{_Pid, Call, _Ret} | History], [Event | EventList assert_telemetry_handler_call(Name, Call, Event) -> EventName = [amoc, coordinator, Event], Measurements = #{count => 1}, - EventMetadata = #{name => Name}, HandlerConfig = ?TELEMETRY_HANDLER_CONFIG, - ExpectedHandlerCall = {?TELEMETRY_HANDLER, handler, - [EventName, Measurements, - EventMetadata, HandlerConfig]}, - ?assertEqual(ExpectedHandlerCall, Call). + ?assertMatch( + {?TELEMETRY_HANDLER, handler, + [EventName, Measurements, + #{name := Name, monotonic_time := _}, HandlerConfig]}, Call).