Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry curation #155

Closed
wants to merge 9 commits into from
78 changes: 57 additions & 21 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,79 @@ Amoc also exposes the following telemetry events:

## Scenario

A telemetry span of a full scenario execution
A telemetry span of a full scenario execution globally (i.e. the exported `init/0` function):
```erlang
event_name: [amoc, scenario, user]
measurements: #{}
metadata: #{}
event_name: [amoc, scenario, run, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module()} %% Plus as described in `telemetry:span/3`
```

A telemetry span of a full scenario execution for a user (i.e. the exported `start/1,2` function):
```erlang
event_name: [amoc, scenario, user, _]
measurements: #{} %% As described in `telemetry:span/3`
metadata: #{scenario := module(),
user_id := non_neg_integer()} %% Plus as described in `telemetry:span/3`
```

## Controller

Indicates the number of users added or removed
Indicates the number of users manually added or removed
```erlang
event_name: [amoc, controller, users]
measurements: #{count => non_neg_integer()}
metadata: #{type => add | remove}
measurements: #{count := non_neg_integer()}
metadata: #{monotonic_time := integer(), scenario := module(), type := add | remove}
```

## Throttle

### Init

Raised when a throttle mechanism is initialised.

```erlang
event_name: [amoc, throttle, init]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Rate

Raised when a throttle mechanism is initialised or its configured rate is changed.
This event is raised only on the master node.

```erlang
event_name: [amoc, throttle, rate]
measurements: #{rate => non_neg_integer()}
metadata: #{name => atom()}
measurements: #{rate := non_neg_integer()}
metadata: #{monotonic_time := integer(), name := atom(), msg => binary()}
```

### Request

Raised when a process client requests to be allowed pass through a throttled mechanism.

This event is raised only on the master node.
```erlang
event_name: [amoc, throttle, request]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Execute

Raised when a process client is allowed to execute after a throttled mechanism.

This event is raised only on the master node.
```erlang
event_name: [amoc, throttle, execute]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

### Throttle process internals

Events related to internals of the throttle processes, these might expose unstable conditions you
might want to log or reconfigure:
```erlang
event_name: [amoc, throttle, process]
measurements: #{msg := binary(), process := pid()}
metadata: #{monotonic_time := integer(), name := atom(), printable_state => map()}
```

## Coordinator
Expand All @@ -68,6 +84,26 @@ Indicates when a coordinating event was raised, like a process being added for c
### Event
```erlang
event_name: [amoc, coordinator, start | stop | add | reset | timeout]
measurements: #{count => 1}
metadata: #{name => atom()}
measurements: #{count := 1}
metadata: #{monotonic_time := integer(), name := atom()}
```

## Config

### Internal events
There are related to bad configuration events, they might deserve logging
```erlang
event_name: [amoc, config, get | verify | env]
measurements: #{}
metadata: #{log_class => syslog_level(), _ => _}
```

## Cluster

### Internal events
There are related to clustering events
```erlang
event_name: [amoc, cluster, connect_nodes | nodedown | master_node_down]
measurements: #{count => non_neg_integer()},
metadata: #{node => node(), nodes => nodes(), state => map()}
```
9 changes: 6 additions & 3 deletions src/amoc_config/amoc_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
%%==============================================================================
-module(amoc_config).

-include_lib("kernel/include/logger.hrl").
-include("amoc_config.hrl").

-export([get/1, get/2]).
Expand All @@ -21,13 +20,17 @@ get(Name) ->
get(Name, Default) when is_atom(Name) ->
case ets:lookup(amoc_config, Name) of
[] ->
?LOG_ERROR("no scenario setting ~p", [Name]),
telemetry:execute([amoc, config, get], #{},
#{log_class => error, msg => <<"no scenario setting">>,
scenario => Name}),
throw({invalid_setting, Name});
[#module_parameter{name = Name, value = undefined}] ->
Default;
[#module_parameter{name = Name, value = Value}] ->
Value;
InvalidLookupRet ->
?LOG_ERROR("invalid lookup return value ~p ~p", [Name, InvalidLookupRet]),
telemetry:execute([amoc, config, get], #{},
#{log_class => error, msg => <<"invalid lookup return value">>,
scenario => Name, return => InvalidLookupRet}),
throw({invalid_lookup_ret_value, InvalidLookupRet})
end.
13 changes: 5 additions & 8 deletions src/amoc_config/amoc_config_env.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

-export([get/1, get/2]).

-include_lib("kernel/include/logger.hrl").

-define(DEFAULT_PARSER_MODULE, amoc_config_parser).

-callback(parse_value(string()) -> {ok, amoc_config:value()} | {error, any()}).
Expand All @@ -38,12 +36,11 @@ get_os_env(Name, Default) ->
case parse_value(Value, Default) of
{ok, Term} -> Term;
{error, Error} ->
?LOG_ERROR("cannot parse environment variable, using default value.~n"
" parsing error: '~p'~n"
" variable name: '$~s'~n"
" variable value: '~s'~n"
" default value: '~p'~n",
[Error, EnvName, Value, Default]),
telemetry:execute(
[amoc, config, env], #{error => 1},
#{log_class => error, error => Error, variable_name => EnvName,
variable_value => Value, default_value => Default,
msg => <<"cannot parse environment variable, using default value">>}),
Default
end.

Expand Down
15 changes: 9 additions & 6 deletions src/amoc_config/amoc_config_verification.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
%% API
-export([process_scenario_config/2]).

-include_lib("kernel/include/logger.hrl").
-include("amoc_config.hrl").


-spec process_scenario_config(module_configuration(), settings()) ->
{ok, module_configuration()} | error().
process_scenario_config(Config, Settings) ->
Expand Down Expand Up @@ -41,12 +39,17 @@ verify(Fun, Value) ->
{true, NewValue} -> {true, NewValue};
{false, Reason} -> {false, {verification_failed, Reason}};
Ret ->
?LOG_ERROR("invalid verification method ~p(~p), return value : ~p",
[Fun, Value, Ret]),
telemetry:execute([amoc, config, verify], #{error => 1},
#{log_class => error, verification_method => Fun,
verification_arg => Value, verification_return => Ret,
msg => <<"invalid verification method">>}),
{false, {invalid_verification_return_value, Ret}}
catch
C:E:S ->
?LOG_ERROR("invalid verification method ~p(~p), exception: ~p ~p ~p",
[Fun, Value, C, E, S]),
telemetry:execute([amoc, config, verify], #{error => 1},
#{log_class => error, verification_method => Fun,
verification_arg => Value,
kind => C, reason => E, stacktrace => S,
msg => <<"invalid verification method">>}),
{false, {exception_during_verification, {C, E, S}}}
end.
52 changes: 43 additions & 9 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
scenario_state :: any(), %% state returned from Scenario:init/0
scenario_start :: undefined | integer(),
scenario_ref :: undefined | reference(),
create_users = [] :: [amoc_scenario:user_id()],
tref :: timer:tref() | undefined}).

Expand Down Expand Up @@ -96,13 +98,11 @@ update_settings(Settings) ->
-spec add_users(amoc_scenario:user_id(), amoc_scenario:user_id()) ->
ok | {error, term()}.
add_users(StartId, EndId) ->
telemetry:execute([amoc, controller, users], #{count => EndId - StartId + 1}, #{type => add}),
%% adding the exact range of the users
gen_server:call(?SERVER, {add, StartId, EndId}).

-spec remove_users(user_count(), boolean()) -> {ok, user_count()}.
remove_users(Count, ForceRemove) ->
telemetry:execute([amoc, controller, users], #{count => Count}, #{type => remove}),
%% trying to remove Count users, this action is async!!!
gen_server:call(?SERVER, {remove, Count, ForceRemove}).

Expand Down Expand Up @@ -182,12 +182,19 @@ handle_info(_Msg, State) ->
-spec handle_start_scenario(module(), amoc_config:settings(), state()) ->
{handle_call_res(), state()}.
handle_start_scenario(Scenario, Settings, #state{status = idle} = State) ->
StartTime = erlang:monotonic_time(),
Ref = erlang:make_ref(),
case init_scenario(Scenario, Settings) of
{ok, ScenarioState} ->
NewState = State#state{last_user_id = 0,
scenario = Scenario,
scenario_state = ScenarioState,
scenario_start = StartTime,
scenario_ref = Ref,
status = running},
telemetry:execute([amoc, scenario, run, start],
#{monotonic_time => StartTime, system_time => erlang:system_time()},
#{telemetry_span_context => Ref, scenario => Scenario}),
{ok, NewState};
{error, _} = Error ->
NewState = State#state{scenario = Scenario, status = Error},
Expand All @@ -197,9 +204,8 @@ handle_start_scenario(_Scenario, _Settings, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_stop_scenario(state()) -> {handle_call_res(), state()}.
handle_stop_scenario(#state{scenario = Scenario, scenario_state = ScenarioState,
no_of_users = 0, status = running} = State) ->
amoc_scenario:terminate(Scenario, ScenarioState),
handle_stop_scenario(#state{no_of_users = 0, status = running} = State) ->
terminate_scenario(State),
{ok, State#state{status = finished}};
handle_stop_scenario(#state{status = running} = State) ->
terminate_all_users(),
Expand All @@ -221,8 +227,12 @@ handle_update_settings(_Settings, #state{status = Status}) ->
handle_add(StartId, EndId, #state{last_user_id = LastId,
create_users = ScheduledUsers,
status = running,
scenario = Scenario,
tref = TRef} = State) when StartId =< EndId,
LastId < StartId ->
TimeStamp = erlang:monotonic_time(),
telemetry:execute([amoc, controller, users], #{count => EndId - StartId + 1},
#{monotonic_time => TimeStamp, scenario => Scenario, type => add}),
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
Expand All @@ -234,7 +244,10 @@ handle_add(_StartId, _EndId, #state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_remove(user_count(), boolean(), state()) -> handle_call_res().
handle_remove(Count, ForceRemove, #state{status = running}) ->
handle_remove(Count, ForceRemove, #state{status = running, scenario = Scenario}) ->
TimeStamp = erlang:monotonic_time(),
telemetry:execute([amoc, controller, users], #{count => Count},
#{monotonic_time => TimeStamp, scenario => Scenario, type => remove}),
Pids = case ets:match_object(?USERS_TABLE, '$1', Count) of
{Objects, _} -> [Pid || {_Id, Pid} <- Objects];
'$end_of_table' -> []
Expand Down Expand Up @@ -301,6 +314,28 @@ init_scenario(Scenario, Settings) ->
{error, Type, Reason} -> {error, {Type, Reason}}
end.

-spec terminate_scenario(state()) ->
ok | {ok, any()} | {error, any()}.
terminate_scenario(#state{scenario = Scenario,
scenario_state = ScenarioState,
scenario_start = StartTime,
scenario_ref = Ref}) ->
case amoc_scenario:terminate(Scenario, ScenarioState) of
{error, {Class, Reason, Stacktrace}} = Ret ->
StopTime = erlang:monotonic_time(),
telemetry:execute([amoc, scenario, run, exception],
#{duration => StopTime - StartTime, monotonic_time => StopTime},
#{telemetry_span_context => Ref, scenario => Scenario,
kind => Class, reason => Reason, stacktrace => Stacktrace}),
Ret;
Ret ->
StopTime = erlang:monotonic_time(),
telemetry:execute([amoc, scenario, run, stop],
#{duration => StopTime - StartTime, monotonic_time => StopTime},
#{telemetry_span_context => Ref, scenario => Scenario}),
Ret
end.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
{ok, TRef} = timer:send_interval(interarrival(), start_user),
Expand Down Expand Up @@ -337,9 +372,8 @@ terminate_all_users({Objects, Continuation}) ->
terminate_all_users('$end_of_table') -> ok.

-spec dec_no_of_users(state()) -> state().
dec_no_of_users(#state{scenario = Scenario, scenario_state = ScenarioState,
no_of_users = 1, status = terminating} = State) ->
amoc_scenario:terminate(Scenario, ScenarioState),
dec_no_of_users(#state{no_of_users = 1, status = terminating} = State) ->
terminate_scenario(State),
State#state{no_of_users = 0, status = finished};
dec_no_of_users(#state{no_of_users = N} = State) ->
State#state{no_of_users = N - 1}.
Expand Down
10 changes: 6 additions & 4 deletions src/amoc_coordinator/amoc_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
Plan = normalize_coordination_plan(CoordinationPlan),
case gen_event:start({local, Name}) of
{ok, _} ->
telemetry:execute([amoc, coordinator, start], #{count => 1}, #{name => Name}),
telemetry:execute([amoc, coordinator, start], #{count => 1},
#{monotonic_time => erlang:monotonic_time(), name => Name}),
%% according to gen_event documentation:
%%
%% When the event is received, the event manager calls
Expand All @@ -99,7 +100,8 @@ start(Name, CoordinationPlan, Timeout) when ?IS_TIMEOUT(Timeout) ->
-spec stop(name()) -> ok.
stop(Name) ->
gen_event:stop(Name),
telemetry:execute([amoc, coordinator, stop], #{count => 1}, #{name => Name}).
telemetry:execute([amoc, coordinator, stop], #{count => 1},
#{monotonic_time => erlang:monotonic_time(), name => Name}).

%% @see add/3
-spec add(name(), any()) -> ok.
Expand Down Expand Up @@ -163,8 +165,8 @@ handle_event(Event, {timeout, Name, Pid}) ->
reset_coordinator -> reset;
coordinator_timeout -> timeout
end,
telemetry:execute([amoc, coordinator, TelemetryEvent],
#{count => 1}, #{name => Name}),
telemetry:execute([amoc, coordinator, TelemetryEvent], #{count => 1},
#{monotonic_time => erlang:monotonic_time(), name => Name}),
erlang:send(Pid, Event),
{ok, {timeout, Name, Pid}};
handle_event(Event, {worker, WorkerPid}) ->
Expand Down
Loading