Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry/timestamps and spans #162

Merged
merged 9 commits into from
Dec 14, 2023
59 changes: 38 additions & 21 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,80 @@ Amoc also exposes the following telemetry events:

## Scenario

A telemetry span of a full scenario execution
All telemetry spans below contain an extra key `return` in the metadata for the `stop` event with the return value of the given callback.

A telemetry span of a scenario initialisation (i.e. the exported `init/0` function):
```erlang
event_name: [amoc, scenario, init, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module()} %% Plus as described in `telemetry:span/3`
```

A telemetry span of a full scenario execution for a user (i.e. the exported `start/1,2` function):
```erlang
event_name: [amoc, scenario, user]
measurements: #{}
metadata: #{}
event_name: [amoc, scenario, start, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module(), %% Running scenario
state := term(), %% The state as returned by `init/0`
user_id := non_neg_integer() %% User ID assigned to the running process
} %% Plus as described in `telemetry:span/3`
```

A telemetry span of a full scenario execution for a user (i.e. the exported `terminate/1,2` function):
```erlang
event_name: [amoc, scenario, terminate, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module(), %% Running scenario
state := term() %% The state as returned by `init/0`
} %% Plus as described in `telemetry:span/3`
```

## Controller

Indicates the number of users added or removed
Indicates the number of users manually added or removed
```erlang
event_name: [amoc, controller, users]
measurements: #{count => non_neg_integer()}
metadata: #{type => add | remove}
measurements: #{count := non_neg_integer()}
metadata: #{monotonic_time := integer(), scenario := module(), type := add | remove}
```

## Throttle

### Init

Raised when a throttle mechanism is initialised.

```erlang
event_name: [amoc, throttle, init]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Rate

Raised when a throttle mechanism is initialised or its configured rate is changed.
This event is raised only on the master node.

```erlang
event_name: [amoc, throttle, rate]
measurements: #{rate => non_neg_integer()}
metadata: #{name => atom()}
measurements: #{rate := non_neg_integer()}
metadata: #{monotonic_time := integer(), name := atom(), msg => binary()}
```

### Request

Raised when a process client requests to be allowed pass through a throttled mechanism.

```erlang
event_name: [amoc, throttle, request]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Execute

Raised when a process client is allowed to execute after a throttled mechanism.

```erlang
event_name: [amoc, throttle, execute]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

## Coordinator
Expand All @@ -68,6 +85,6 @@ Indicates when a coordinating event was raised, like a process being added for c
### Event
```erlang
event_name: [amoc, coordinator, start | stop | add | reset | timeout]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```
23 changes: 14 additions & 9 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,11 @@ update_settings(Settings) ->
-spec add_users(amoc_scenario:user_id(), amoc_scenario:user_id()) ->
ok | {error, term()}.
add_users(StartId, EndId) ->
telemetry:execute([amoc, controller, users], #{count => EndId - StartId + 1}, #{type => add}),
%% adding the exact range of the users
gen_server:call(?SERVER, {add, StartId, EndId}).

-spec remove_users(user_count(), boolean()) -> {ok, user_count()}.
remove_users(Count, ForceRemove) ->
telemetry:execute([amoc, controller, users], #{count => Count}, #{type => remove}),
%% trying to remove Count users, this action is async!!!
gen_server:call(?SERVER, {remove, Count, ForceRemove}).

Expand Down Expand Up @@ -207,9 +205,8 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{scenario = Scenario, scenario_state = ScenarioState,
no_of_users = 0, status = running} = State) ->
amoc_scenario:terminate(Scenario, ScenarioState),
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
Expand All @@ -231,8 +228,11 @@ handle_update_settings(_Settings, #state{status = Status}) ->
handle_add(StartId, EndId, #state{last_user_id = LastId,
create_users = ScheduledUsers,
status = running,
scenario = Scenario,
tref = TRef} = State) when StartId =< EndId,
LastId < StartId ->
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
#{scenario => Scenario, type => add}),
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
Expand All @@ -244,7 +244,9 @@ handle_add(_StartId, _EndId, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running}) ->
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
amoc_telemetry:execute([controller, users], #{count => Count},
#{scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
Expand Down Expand Up @@ -311,6 +313,10 @@ init_scenario(Scenario, Settings) ->
{error, Type, Reason} -> {error, {Type, Reason}}
end.

-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}.
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
amoc_scenario:terminate(Scenario, ScenarioState).

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
Expand Down Expand Up @@ -347,9 +353,8 @@ terminate_all_users({Objects, Continuation}) ->
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{scenario = Scenario, scenario_state = ScenarioState,
no_of_users = 1, status = terminating} = State) ->
amoc_scenario:terminate(Scenario, ScenarioState),
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.
Expand Down
9 changes: 4 additions & 5 deletions src/amoc_coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
Plan = normalize_coordination_plan(CoordinationPlan),
case gen_event:start({local, Name}) of
{ok, _} ->
telemetry:execute([amoc, coordinator, start], #{count => 1}, #{name => Name}),
amoc_telemetry:execute([coordinator, start], #{count => 1}, #{name => Name}),
%% according to gen_event documentation:
%%
%% When the event is received, the event manager calls
Expand All @@ -99,7 +99,7 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
-spec stop(name()) -> ok.
stop(Name) ->
gen_event:stop(Name),
telemetry:execute([amoc, coordinator, stop], #{count => 1}, #{name => Name}).
amoc_telemetry:execute([coordinator, stop], #{count => 1}, #{name => Name}).

%% @see add/3
-spec add(name(), any()) -> ok.
Expand Down Expand Up @@ -156,15 +156,14 @@ init(CoordinationItem) ->
-spec handle_event(Event :: term(), state()) -> {ok, state()}.
handle_event(Event, {timeout, Name, Pid}) ->
%% there's only one "timeout" event handler for coordinator,
%% so calling telemetry:execute/3 here to ensure that it's
%% so calling amoc_telemetry:execute/3 here to ensure that it's
%% triggered just once per event.
TelemetryEvent = case Event of
{coordinate, _} -> add;
reset_coordinator -> reset;
coordinator_timeout -> timeout
end,
telemetry:execute([amoc, coordinator, TelemetryEvent],
#{count => 1}, #{name => Name}),
amoc_telemetry:execute([coordinator, TelemetryEvent], #{count => 1}, #{name => Name}),
erlang:send(Pid, Event),
{ok, {timeout, Name, Pid}};
handle_event(Event, {worker, WorkerPid}) ->
Expand Down
49 changes: 34 additions & 15 deletions src/amoc_scenario.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,31 @@
%%-------------------------------------------------------------------------

%% @doc Applies the `Scenario:init/0' callback
%%
%% Runs on the controller process and spans a `[amoc, scenario, init, _]' telemetry event.
-spec init(amoc:scenario()) -> {ok, state()} | {error, Reason :: term()}.
init(Scenario) ->
apply_safely(Scenario, init, []).
apply_safely(Scenario, init, [], #{scenario => Scenario}).

%% @doc Applies the `Scenario:terminate/0,1' callback
%%
%% `Scenario:terminate/0' and `Scenario:terminate/1' callbacks are optional.
%% If the scenario module exports both functions, `Scenario:terminate/1' is used.
%%
%% Runs on the controller process and spans a `[amoc, scenario, terminate, _]' telemetry event.
-spec terminate(amoc:scenario(), state()) -> ok | {ok, any()} | {error, Reason :: term()}.
terminate(Scenario, State) ->
Metadata = #{scenario => Scenario, state => State},
case {erlang:function_exported(Scenario, terminate, 1),
erlang:function_exported(Scenario, terminate, 0)} of
{true, _} ->
%% since we ignore Scenario:terminate/1 return value
%% we can use apply_safely/3 function
apply_safely(Scenario, terminate, [State]);
apply_safely(Scenario, terminate, [State], Metadata);
{_, true} ->
%% since we ignore Scenario:terminate/0 return value
%% we can use apply_safely/3 function
apply_safely(Scenario, terminate, []);
apply_safely(Scenario, terminate, [], Metadata);
_ ->
ok
end.
Expand All @@ -56,25 +61,39 @@ terminate(Scenario, State) ->
%%
%% Either `Scenario:start/1' or `Scenario:start/2' must be exported from the behaviour module.
%% if scenario module exports both functions, `Scenario:start/2' is used.
%%
%% Runs on the user process and spans a `[amoc, scenario, user, _]' telemetry event.
-spec start(amoc:scenario(), user_id(), state()) -> any().
start(Scenario, Id, State) ->
case {erlang:function_exported(Scenario, start, 2),
erlang:function_exported(Scenario, start, 1)} of
{true, _} ->
Scenario:start(Id, State);
{_, true} ->
Scenario:start(Id);
{false, false} ->
error("the scenario module must export either start/2 or start/1 function")
end.
Metadata = #{scenario => Scenario, state => State, user_id => Id},
Span = case {erlang:function_exported(Scenario, start, 2),
erlang:function_exported(Scenario, start, 1)} of
{true, _} ->
fun() ->
Ret = Scenario:start(Id, State),
{Ret, Metadata#{return => Ret}}
end;
{_, true} ->
fun() ->
Ret = Scenario:start(Id),
{Ret, Metadata#{return => Ret}}
end;
{false, false} ->
exit("the scenario module must export either start/2 or start/1 function")
end,
telemetry:span([amoc, scenario, start], Metadata, Span).

%% ------------------------------------------------------------------
%% internal functions
%% ------------------------------------------------------------------

-spec apply_safely(atom(), atom(), [term()]) -> {ok | error, term()}.
apply_safely(M, F, A) ->
try erlang:apply(M, F, A) of
-spec apply_safely(atom(), atom(), [term()], map()) -> {ok | error, term()}.
apply_safely(M, F, A, Metadata) ->
Span = fun() ->
Ret = erlang:apply(M, F, A),
{Ret, Metadata#{return => Ret}}
end,
try telemetry:span([amoc, scenario, F], Metadata, Span) of
{ok, RetVal} -> {ok, RetVal};
{error, Error} -> {error, Error};
Result -> {ok, Result}
Expand Down
15 changes: 15 additions & 0 deletions src/amoc_telemetry.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
%% @private
%% @copyright 2023 Erlang Solutions Ltd.
-module(amoc_telemetry).

-export([execute/3]).

-spec execute(EventName, Measurements, Metadata) -> ok when
EventName :: telemetry:event_name(),
Measurements :: telemetry:event_measurements(),
Metadata :: telemetry:event_metadata().
execute(Name, Measurements, Metadata) ->
TimeStamp = erlang:monotonic_time(),
NameWithAmocPrefix = [amoc | Name],
MetadataWithTS = Metadata#{monotonic_time => TimeStamp},
telemetry:execute(NameWithAmocPrefix, Measurements, MetadataWithTS).
8 changes: 4 additions & 4 deletions src/amoc_throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ maybe_raise_event(Name, Event) ->
end.

raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init ->
telemetry:execute([amoc, throttle, Event], #{count => 1}, #{name => Name}).
amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}).

report_rate(Name, RatePerMinute) ->
amoc_telemetry:execute([throttle, rate], #{rate => RatePerMinute}, #{name => Name}).

-spec change_rate_and_stop_plan(name(), state()) -> state().
change_rate_and_stop_plan(Name, State) ->
Expand Down Expand Up @@ -329,6 +332,3 @@ run_cmd(Pid, pause) ->
amoc_throttle_process:pause(Pid);
run_cmd(Pid, resume) ->
amoc_throttle_process:resume(Pid).

report_rate(Name, RatePerMinute) ->
telemetry:execute([amoc, throttle, rate], #{rate => RatePerMinute}, #{name => Name}).
3 changes: 1 addition & 2 deletions src/amoc_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ stop(Pid, Force) when is_pid(Pid) ->
init(Parent, Scenario, Id, State) ->
proc_lib:init_ack(Parent, {ok, self()}),
process_flag(trap_exit, true),
ScenarioFun = fun() -> {amoc_scenario:start(Scenario, Id, State), #{}} end,
telemetry:span([amoc, scenario, user], #{}, ScenarioFun).
amoc_scenario:start(Scenario, Id, State).
9 changes: 4 additions & 5 deletions test/amoc_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,8 @@ assert_telemetry_events(Name, [{_Pid, Call, _Ret} | History], [Event | EventList
assert_telemetry_handler_call(Name, Call, Event) ->
EventName = [amoc, coordinator, Event],
Measurements = #{count => 1},
EventMetadata = #{name => Name},
HandlerConfig = ?TELEMETRY_HANDLER_CONFIG,
ExpectedHandlerCall = {?TELEMETRY_HANDLER, handler,
[EventName, Measurements,
EventMetadata, HandlerConfig]},
?assertEqual(ExpectedHandlerCall, Call).
?assertMatch(
{?TELEMETRY_HANDLER, handler,
[EventName, Measurements,
#{name := Name, monotonic_time := _}, HandlerConfig]}, Call).