Skip to content

Commit

Permalink
Introduce call to throttle-unlock mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Jun 29, 2024
1 parent c3b18d5 commit c16c3d6
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/throttle/amoc_throttle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
%% API
-export([start/2, stop/1,
send/2, send/3, wait/1,
run/2, pause/1, resume/1,
run/2, pause/1, resume/1, unlock/1,
change_rate/2, change_rate/3,
change_rate_gradually/2, change_rate_gradually/6]).

Expand Down Expand Up @@ -74,6 +74,11 @@ pause(Name) ->
resume(Name) ->
amoc_throttle_controller:resume(Name).

%% @doc Unlocks executions for the given `Name' as if `Rate' was set to `infinity'.
-spec unlock(name()) -> ok | {error, any()}.
unlock(Name) ->
amoc_throttle_controller:unlock(Name).

%% @doc Sets `Throttle' for `Name' according to the given values.
%%
%% Can change whether Amoc throttle limits `Name' to parallel executions or to `Rate' per `Interval',
Expand Down
10 changes: 8 additions & 2 deletions src/throttle/amoc_throttle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
%% API
-export([start_link/0,
ensure_throttle_processes_started/2,
pause/1, resume/1, stop/1,
pause/1, resume/1, unlock/1, stop/1,
change_rate/3, change_rate_gradually/2,
raise_event_on_slave_node/2, telemetry_event/2]).

Expand Down Expand Up @@ -112,6 +112,10 @@ pause(Name) ->
resume(Name) ->
gen_server:call(?MASTER_SERVER, {resume, Name}).

-spec unlock(name()) -> ok | {error, any()}.
unlock(Name) ->
gen_server:call(?MASTER_SERVER, {unlock, Name}).

-spec change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}.
change_rate(Name, Rate, Interval) ->
gen_server:call(?MASTER_SERVER, {change_rate, Name, Rate, Interval}).
Expand Down Expand Up @@ -362,7 +366,9 @@ run_cmd(Pid, stop) ->
run_cmd(Pid, pause) ->
amoc_throttle_process:pause(Pid);
run_cmd(Pid, resume) ->
amoc_throttle_process:resume(Pid).
amoc_throttle_process:resume(Pid);
run_cmd(Pid, unlock) ->
amoc_throttle_process:unlock(Pid).

-spec verify_config(amoc_throttle:gradual_rate_config()) -> gradual_rate_change() | {error, any()}.
verify_config(Config) ->
Expand Down
12 changes: 11 additions & 1 deletion src/throttle/amoc_throttle_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
update/3,
pause/1,
resume/1,
unlock/1,
get_state/1,
get_throttle_process/1,
get_throttle_processes/1
Expand All @@ -30,7 +31,7 @@
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute

-record(state, {can_run_fn = true :: boolean(),
status = running :: running | paused,
status = running :: running | paused | unlocked,
max_n :: infinity | non_neg_integer(),
name :: atom(),
n :: infinity | non_neg_integer(),
Expand Down Expand Up @@ -69,6 +70,10 @@ pause(Pid) ->
resume(Pid) ->
gen_server:cast(Pid, resume_process).

-spec unlock(pid()) -> ok.
unlock(Pid) ->
gen_server:cast(Pid, unlock_process).

-spec get_state(pid()) -> map().
get_state(Pid) ->
gen_server:call(Pid, get_state).
Expand Down Expand Up @@ -123,6 +128,8 @@ handle_cast(pause_process, State) ->
{noreply, State#state{status = paused}, {continue, maybe_run_fn}};
handle_cast(resume_process, State) ->
{noreply, State#state{status = running}, {continue, maybe_run_fn}};
handle_cast(unlock_process, State) ->
{noreply, State#state{status = unlocked}, {continue, maybe_run_fn}};
handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) ->
amoc_throttle_controller:telemetry_event(Name, request),
{noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}};
Expand Down Expand Up @@ -212,6 +219,9 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) ->
NewSchedule = lists:reverse(SchRev),
NewState = State#state{schedule = NewSchedule, schedule_reversed = []},
maybe_run_fn(NewState);
maybe_run_fn(#state{interval = _, status = unlocked, n = N} = State) when N > 0 ->
NewState = run_fn(State),
maybe_run_fn(NewState);
maybe_run_fn(#state{interval = 0, status = running, n = N} = State) when N > 0 ->
NewState = run_fn(State),
maybe_run_fn(NewState);
Expand Down
16 changes: 13 additions & 3 deletions test/throttle_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ groups() ->
async_runner_dies_while_waiting_raises_exit,
async_runner_dies_when_throttler_dies,
run_with_interval_zero_limits_only_number_of_parallel_executions,
pause_and_resume,
pause_and_resume_and_unlock,
get_state
]}
].
Expand Down Expand Up @@ -286,7 +286,7 @@ run_with_interval_zero_limits_only_number_of_parallel_executions(_) ->
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch(ok, ?RECV(receive_this, 200)).

pause_and_resume(_) ->
pause_and_resume_and_unlock(_) ->
%% Start 100-per-10ms throttle with a single process
?assertMatch({ok, started},
amoc_throttle:start(?FUNCTION_NAME,
Expand All @@ -299,7 +299,17 @@ pause_and_resume(_) ->
?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)),
%% After resume the message is then received
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
?assertMatch(ok, ?RECV(receive_this, 200)).
?assertMatch(ok, ?RECV(receive_this, 200)),
%% If unlocked, all messages are always received
?assertMatch(ok, amoc_throttle:unlock(?FUNCTION_NAME)),
amoc_throttle:send(?FUNCTION_NAME, receive_this),
?assertMatch(ok, ?RECV(receive_this, 200)),
%% From unlock it can resume
?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)),
State = get_state_of_one_process(?FUNCTION_NAME),
?assertMatch(#{name := ?FUNCTION_NAME,
delay_between_executions := 12},
State).

get_state(_) ->
?assertMatch({ok, started},
Expand Down

0 comments on commit c16c3d6

Please sign in to comment.