Skip to content

Commit

Permalink
improving amoc_throttle telemetry reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
DenysGonchar committed Nov 5, 2023
1 parent 95e3d75 commit 18ffc54
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
11 changes: 11 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,20 @@ metadata: #{type => add | remove}

## Throttle

### Init

Raised when a throttle mechanism is initialised.

```erlang
event_name: [amoc, throttle, init]
measurements: #{count => 1}
metadata: #{name => atom()}
```

### Rate

Raised when a throttle mechanism is initialised or its configured rate is changed.
This event is raised only on the master node.

```erlang
event_name: [amoc, throttle, rate]
Expand Down
20 changes: 14 additions & 6 deletions src/amoc_throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ensure_throttle_processes_started/4,
pause/1, resume/1, stop/1,
change_rate/3, change_rate_gradually/6,
run/2]).
run/2, telemetry_event/2]).

%% gen_server callbacks
-export([init/1,
Expand Down Expand Up @@ -53,16 +53,17 @@ start_link() ->
{ok, throttle_processes_already_started} |
{error, any()}).
ensure_throttle_processes_started(Name, Interval, Rate, NoOfProcesses) ->
maybe_raise_event(Name, init),
gen_server:call(?MASTER_SERVER, {start_processes, Name, Interval, Rate, NoOfProcesses}).

-spec run(name(), fun(() -> any())) -> ok | {error, any()}.
run(Name, Fn) ->
case get_throttle_process(Name) of
{ok, Pid} ->
maybe_raise_event([amoc, throttle, request], #{count => 1}, #{name => Name}),
maybe_raise_event(Name, request),
Fun =
fun() ->
maybe_raise_event([amoc, throttle, execute], #{count => 1}, #{name => Name}),
maybe_raise_event(Name, execute),
Fn()
end,
amoc_throttle_process:run(Pid, Fun),
Expand Down Expand Up @@ -93,6 +94,10 @@ change_rate_gradually(Name, LowRate, HighRate, RateInterval, StepInterval, NoOfS
stop(Name) ->
gen_server:call(?MASTER_SERVER, {stop, Name}).

-spec telemetry_event(name(), request | execute) -> ok.
telemetry_event(Name, Event) when Event =:= request; Event =:= execute ->
raise_event(Name, Event).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -190,12 +195,15 @@ handle_info({change_plan, Name}, State) ->
%%% Internal functions
%%%===================================================================

maybe_raise_event(Name, Measurements, Metadata) ->
maybe_raise_event(Name, Event) ->
case amoc_cluster:master_node() =:= node() of
true -> ok;
_ -> telemetry:execute(Name, Measurements, Metadata)
_ -> raise_event(Name, Event)
end.

raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init ->
telemetry:execute([amoc, throttle, Event], #{count => 1}, #{name => Name}).

-spec change_rate_and_stop_plan(name(), state()) -> state().
change_rate_and_stop_plan(Name, State) ->
Info = maps:get(Name, State),
Expand Down Expand Up @@ -237,7 +245,7 @@ rate_per_minute(Rate, Interval) ->

-spec start_processes(name(), pos_integer(), non_neg_integer(), pos_integer()) -> pos_integer().
start_processes(Name, Rate, Interval, NoOfProcesses) ->
% Master metrics
raise_event(Name, init),
RatePerMinute = rate_per_minute(Rate, Interval),
report_rate(Name, RatePerMinute),
RealNoOfProcesses = min(Rate, NoOfProcesses),
Expand Down
4 changes: 2 additions & 2 deletions src/amoc_throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ handle_cast(pause_process, State) ->
handle_cast(resume_process, State) ->
{noreply, State#state{pause = false}, {continue, maybe_run_fn}};
handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) ->
telemetry:execute([amoc, throttle, request], #{count => 1}, #{name => Name}),
amoc_throttle_controller:telemetry_event(Name, request),
{noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}};
handle_cast({update, Interval, Rate}, State) ->
NewState = merge_state(initial_state(Interval, Rate), State),
Expand Down Expand Up @@ -188,7 +188,7 @@ maybe_run_fn(State) ->
run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) ->
erlang:monitor(process, RunnerPid),
RunnerPid ! scheduled,
telemetry:execute([amoc, throttle, execute], #{count => 1}, #{name => Name}),
amoc_throttle_controller:telemetry_event(Name, execute),
State#state{schedule = T, n = N - 1}.

async_runner(Fun) ->
Expand Down

0 comments on commit 18ffc54

Please sign in to comment.