diff --git a/guides/telemetry.md b/guides/telemetry.md index 2d87e40c..a79b773d 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -62,7 +62,7 @@ This event is raised only on the master node. ```erlang event_name: [amoc, throttle, rate] -measurements: #{rate := non_neg_integer()} +measurements: #{rate_per_minute := float()} metadata: #{monotonic_time := integer(), name := atom(), msg => binary()} ``` diff --git a/guides/throttle.md b/guides/throttle.md index 75065bfb..cf0da4af 100644 --- a/guides/throttle.md +++ b/guides/throttle.md @@ -1,6 +1,6 @@ ## API -See `amoc_throttle` +See `amoc_throttle`. ## Overview @@ -9,9 +9,8 @@ It works in both local and distributed environments, allows for dynamic rate cha Amoc throttle allows to: -- Setting the execution `Rate` per `Interval` +- Setting the execution `Rate` per `Interval`, or inversely, the `Interarrival` time between actions. - Limiting the number of parallel executions when `Rate` is set to `infinity` -- Setting the `Interarrival` time between actions. Each `Rate` is identified with a `Name`. The rate limiting mechanism allows responding to a request only when it does not exceed the given `Rate`. diff --git a/src/throttle/amoc_throttle.erl b/src/throttle/amoc_throttle.erl index ca461114..cb87bbcb 100644 --- a/src/throttle/amoc_throttle.erl +++ b/src/throttle/amoc_throttle.erl @@ -24,17 +24,10 @@ -type interval() :: pos_integer(). %% In milliseconds, defaults to 60000 (one minute). --type throttle() :: #{rate := rate(), interval := interval()} | - #{interarrival := interarrival()}. +-type t() :: #{rate := rate(), interval => interval()} | + #{interarrival := interarrival()}. %% Throttle unit of measurement --type config() :: #{rate := rate(), - interval => interval(), - parallelism => non_neg_integer()} - | #{interarrival := interarrival(), - parallelism => non_neg_integer()}. -%% Literal throttle configuration. - -type gradual_rate_config() :: #{from_rate := non_neg_integer(), to_rate := non_neg_integer(), interval => interval(), @@ -42,8 +35,8 @@ step_size => pos_integer(), step_count => pos_integer(), duration => pos_integer()} | - #{from_interarrival := interarrival(), - to_interarrival := interarrival(), + #{from_interarrival := pos_integer(), + to_interarrival := pos_integer(), step_interval => pos_integer(), step_size => pos_integer(), step_count => pos_integer(), @@ -55,14 +48,14 @@ %% %% All other values can be calculated from the provided. --export_type([name/0, rate/0, interval/0, throttle/0, config/0, gradual_rate_config/0]). +-export_type([t/0, name/0, rate/0, interval/0, gradual_rate_config/0]). %% @doc Starts the throttle mechanism for a given `Name' with a given config. %% -%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10). %% `Name' is needed to identify the rate as a single test can have different rates for different tasks. -%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity. --spec start(name(), config() | rate()) -> {ok, started | already_started} | {error, any()}. +%% `Interval' is given in milliseconds, the default is one minute, +%% and can be changed to a different value for convenience or higher granularity. +-spec start(name(), t() | rate()) -> {ok, started | already_started} | {error, any()}. start(Name, #{} = Config) -> amoc_throttle_controller:ensure_throttle_processes_started(Name, Config); start(Name, Rate) -> @@ -75,23 +68,23 @@ start(Name, Rate) -> pause(Name) -> amoc_throttle_controller:pause(Name). -%% @doc Resumes the executions for the given `Name', to their original `Rate' and `Interval' values. --spec resume(name()) -> ok | {error, any()}. -resume(Name) -> - amoc_throttle_controller:resume(Name). - %% @doc Unlocks executions for the given `Name' as if `Rate' was set to `infinity'. -spec unlock(name()) -> ok | {error, any()}. unlock(Name) -> amoc_throttle_controller:unlock(Name). +%% @doc Resumes the executions for the given `Name', to their original `Rate' and `Interval' values. +-spec resume(name()) -> ok | {error, any()}. +resume(Name) -> + amoc_throttle_controller:resume(Name). + %% @doc Sets `Throttle' for `Name' according to the given values. %% %% Can change whether Amoc throttle limits `Name' to parallel executions or to `Rate' per `Interval', %% according to the given `Interval' value. --spec change_rate(name(), throttle()) -> ok | {error, any()}. -change_rate(Name, #{rate := Rate, interval := Interval}) -> - amoc_throttle_controller:change_rate(Name, Rate, Interval). +-spec change_rate(name(), t()) -> ok | {error, any()}. +change_rate(Name, Config) -> + amoc_throttle_controller:change_rate(Name, Config). %% @doc Allows to set a plan of gradual rate changes for a given `Name'. %% diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index 0ae77ac4..837179d2 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -9,8 +9,10 @@ %% API -export([start_link/0, ensure_throttle_processes_started/2, - pause/1, resume/1, unlock/1, stop/1, - change_rate/3, change_rate_gradually/2, + pause/1, resume/1, unlock/1, stop/1, get_info/1, + change_rate/2, change_rate_gradually/2, + pg_scope/0, + get_throttle_process/1, raise_event_on_slave_node/2, telemetry_event/2]). %% gen_server callbacks @@ -20,21 +22,21 @@ -export([verify_gradual_config/1]). -endif. +-define(PG_SCOPE, amoc_throttle). -define(SERVER, ?MODULE). -define(MASTER_SERVER, {?SERVER, amoc_cluster:master_node()}). -define(DEFAULT_STEP_SIZE, 1). -define(DEFAULT_INTERVAL, 60000). %% one minute --define(DEFAULT_NO_PROCESSES, 10). -define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)). --define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)). -define(POS_INT(N), (is_integer(N) andalso N > 0)). -record(throttle_info, { + pool_sup :: pid(), + pool_config :: pool_config(), rate :: amoc_throttle:rate(), interval :: amoc_throttle:interval(), - no_of_procs :: pos_integer(), - active :: boolean(), - change_plan :: change_rate_plan() | undefined + active = true :: boolean(), + change_plan :: undefined | change_rate_plan() }). -record(change_rate_plan, { @@ -50,33 +52,74 @@ -type event() :: init | execute | request. -type config() :: #{rate := amoc_throttle:rate(), - interval := amoc_throttle:interval(), - parallelism := non_neg_integer()}. --type gradual_rate_change() :: #{from_rate := non_neg_integer(), - to_rate := non_neg_integer(), - interval := amoc_throttle:interval(), - step_interval := pos_integer(), - step_size := pos_integer(), - step_count := pos_integer()}. + interval := amoc_throttle:interval()}. +-type gradual() :: #{from_rate := non_neg_integer(), + to_rate := non_neg_integer(), + interval := amoc_throttle:interval(), + step_interval := pos_integer(), + step_size := pos_integer(), + step_count := pos_integer()}. +-type pool_config() :: #{ProcessN :: non_neg_integer() := + #{max_n := infinity | non_neg_integer(), + delay := non_neg_integer(), + status := active | inactive, + pid => undefined | pid()}}. %%%=================================================================== %%% API %%%=================================================================== +-spec pg_scope() -> atom(). +pg_scope() -> + ?PG_SCOPE. + +-spec no_of_processes() -> non_neg_integer(). +no_of_processes() -> + 3 * erlang:system_info(schedulers_online). + +-spec get_throttle_process(amoc_throttle:name()) -> + {error, no_throttle_process_registered} | {ok, pid()}. +get_throttle_process(Name) -> + case pg:get_members(?PG_SCOPE, Name) of + [] -> + {error, no_throttle_process_registered}; + List -> %% nonempty list + N = rand:uniform(length(List)), + {ok, lists:nth(N, List)} + end. + -spec start_link() -> gen_server:start_ret(). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec ensure_throttle_processes_started(name(), amoc_throttle:config()) -> +-spec ensure_throttle_processes_started(name(), amoc_throttle:t()) -> {ok, started | already_started} | - {error, invalid_throttle | wrong_reconfiguration | wrong_no_of_procs}. + {error, invalid_throttle | wrong_reconfiguration | error_starting_pool}. ensure_throttle_processes_started(Name, Config) when is_atom(Name) -> case verify_config(Config) of {error, invalid_throttle} -> {error, invalid_throttle}; VerifiedConfig -> raise_event_on_slave_node(Name, init), - gen_server:call(?MASTER_SERVER, {start_processes, Name, VerifiedConfig}) + gen_server:call(?MASTER_SERVER, {start_throttle, Name, VerifiedConfig}) + end. + +-spec change_rate(name(), amoc_throttle:t()) -> ok | {error, any()}. +change_rate(Name, Config) -> + case verify_config(Config) of + {error, invalid_throttle} -> + {error, invalid_throttle}; + VerifiedConfig -> + gen_server:call(?MASTER_SERVER, {change_rate, Name, VerifiedConfig}) + end. + +-spec change_rate_gradually(name(), amoc_throttle:gradual_rate_config()) -> ok | {error, any()}. +change_rate_gradually(Name, GradualChangeRate) -> + case verify_gradual_config(GradualChangeRate) of + {error, _} = Error -> + Error; + VerifiedConfig -> + gen_server:call(?MASTER_SERVER, {change_rate_gradually, Name, VerifiedConfig}) end. -spec pause(name()) -> ok | {error, any()}. @@ -91,18 +134,9 @@ resume(Name) -> unlock(Name) -> gen_server:call(?MASTER_SERVER, {unlock, Name}). --spec change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}. -change_rate(Name, Rate, Interval) -> - gen_server:call(?MASTER_SERVER, {change_rate, Name, Rate, Interval}). - --spec change_rate_gradually(name(), amoc_throttle:gradual_rate_config()) -> ok | {error, any()}. -change_rate_gradually(Name, GradualChangeRate) -> - case verify_gradual_config(GradualChangeRate) of - {error, _} = Error -> - Error; - Config -> - gen_server:call(?MASTER_SERVER, {change_rate_gradually, Name, Config}) - end. +-spec get_info(name()) -> #{_ := _} | {error, any()}. +get_info(Name) -> + gen_server:call(?MASTER_SERVER, {get_info, Name}). -spec stop(name()) -> ok | {error, any()}. stop(Name) -> @@ -129,75 +163,64 @@ raise_event_on_slave_node(Name, Event) -> init([]) -> {ok, #{}}. --spec handle_call({start_processes, name(), config()}, - From :: {pid(), Tag :: term()}, state()) -> +-spec handle_call({start_throttle, name(), config()}, gen_server:from(), state()) -> {reply, {ok, started | already_started}, state()} | - {reply, {error, wrong_reconfiguration | wrong_no_of_procs}, state()}; - ({pause | resume | unlock | stop}, From :: {pid(), Tag :: term()}, state()) -> + {reply, {error, wrong_reconfiguration | error_starting_pool}, state()}; + ({change_rate, name(), config()}, gen_server:from(), state()) -> {reply, ok, state()} | - {reply, Error :: any(), state()}; - ({change_rate, name(), amoc_throttle:rate(), amoc_throttle:interval()}, - From :: {pid(), Tag :: term()}, state()) -> + {reply, {error, any()}, state()}; + ({change_rate_gradually, name(), gradual()}, gen_server:from(), state()) -> {reply, ok, state()} | {reply, {error, any()}, state()}; - ({change_rate_gradually, name(), gradual_rate_change()}, - From :: {pid(), Tag :: term()}, state()) -> + ({pause | resume | unlock | stop}, gen_server:from(), state()) -> {reply, ok, state()} | + {reply, Error :: any(), state()}; + ({get_info, name()}, gen_server:from(), state()) -> + {reply, #{_ := _}, state()} | {reply, {error, any()}, state()}. -handle_call({start_processes, Name, Config}, _From, State) -> - case amoc_throttle_process:get_throttle_processes(Name) of - {error, no_throttle_process_registered} -> - Info = start_processes(Name, Config), - NewState = State#{Name => Info}, - {reply, {ok, started}, NewState}; - {ok, Group} -> - verify_new_start_matches_running(Name, Config, Group, State) - end; -handle_call({pause, Name}, _From, State) -> - case run_in_all_processes(Name, pause) of - ok -> - Info = maps:get(Name, State), - {reply, ok, State#{Name => Info#throttle_info{active = false}}}; - Error -> - {reply, Error, State} - end; -handle_call({Op, Name}, _From, State) when unlock =:= Op; resume =:= Op -> - case run_in_all_processes(Name, Op) of - ok -> - Info = maps:get(Name, State), - {reply, ok, State#{Name => Info#throttle_info{active = true}}}; - Error -> {reply, Error, State} +handle_call({start_throttle, Name, #{rate := Rate, interval := Interval}}, _From, State) -> + case State of + #{Name := #throttle_info{rate = Rate, interval = Interval}} -> + {reply, {ok, already_started}, State}; + #{Name := #throttle_info{}} -> + {reply, {error, wrong_reconfiguration}, State}; + _ -> + do_start_throttle(Name, Rate, Interval, State) end; -handle_call({change_rate, Name, Rate, Interval}, _From, State) -> +handle_call({change_rate, Name, #{rate := Rate, interval := Interval}}, _From, State) -> case State of - #{Name := Info} -> - case maybe_change_rate(Name, Rate, Interval, Info) of - ok -> - UpdatedInfo = Info#throttle_info{rate = Rate, interval = Interval}, - {reply, ok, State#{Name => UpdatedInfo}}; - Error -> - {reply, Error, State} - end; + #{Name := #throttle_info{rate = Rate, interval = Interval}} -> + {reply, ok, State}; + #{Name := #throttle_info{change_plan = undefined} = Info} -> + do_change_rate(Name, Rate, Interval, Info, State); + #{Name := #throttle_info{}} -> + {reply, {error, cannot_change_rate}, State}; _ -> {reply, {error, {no_throttle_by_name, Name}}, State} end; handle_call({change_rate_gradually, Name, GradualChangeRate}, _From, State) -> case State of #{Name := #throttle_info{change_plan = undefined} = Info} -> - NewInfo = start_gradual_rate_change(Name, Info, GradualChangeRate), - {reply, ok, State#{Name => NewInfo}}; + do_gradual_change_rate(Name, Info, GradualChangeRate, State); #{Name := _} -> {reply, {error, cannot_change_rate}, State}; _ -> {reply, {error, {no_throttle_by_name, Name}}, State} end; -handle_call({stop, Name}, _From, State) -> - case run_in_all_processes(Name, stop) of - ok -> - {reply, ok, maps:remove(Name, State)}; - Error -> - {reply, Error, State} - end. +handle_call({Op, Name}, _From, State) + when stop =:= Op; pause =:= Op; unlock =:= Op; resume =:= Op -> + case State of + #{Name := Info} -> + do_run_op(Op, Name, Info, State); + _ -> + {reply, {error, {no_throttle_by_name, Name}}, State} + end; +handle_call({get_info, Name}, _From, State) -> + Info = maps:get(Name, State), + Fields = record_info(fields, throttle_info), + [_ | Values] = tuple_to_list(Info), + Ret = maps:from_list(lists:zip(Fields, Values)), + {reply, Ret, State}. -spec(handle_cast(any(), state()) -> {noreply, state()}). handle_cast(_, State) -> @@ -222,18 +245,77 @@ handle_info({change_plan, Name}, State) -> raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init -> amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}). -report_rate(Name, RatePerMinute) -> - amoc_telemetry:execute([throttle, rate], #{rate => RatePerMinute}, #{name => Name}). +report_rate(Name, infinity, _) -> + amoc_telemetry:execute([throttle, rate], #{rate_per_minute => infinity}, #{name => Name}); +report_rate(Name, Rate, Interval) -> + RatePerMinute = (Rate * 60000) div Interval, + amoc_telemetry:execute([throttle, rate], #{rate_per_minute => RatePerMinute}, #{name => Name}). + +-spec do_start_throttle(name(), amoc_throttle:rate(), amoc_throttle:interval(), state()) -> + {reply, {ok, started}, state()} | + {reply, {error, error_starting_pool}, state()}. +do_start_throttle(Name, Rate, Interval, State) -> + PoolConfig = pool_config(Rate, Interval), + case amoc_throttle_pooler:start_pool(Name, PoolConfig) of + {ok, PoolSup} when is_pid(PoolSup) -> + PoolConfig1 = process_config(Name, PoolSup, PoolConfig), + raise_event(Name, init), + report_rate(Name, Rate, Interval), + Info = #throttle_info{pool_sup = PoolSup, pool_config = PoolConfig1, + rate = Rate, interval = Interval}, + NewState = State#{Name => Info}, + {reply, {ok, started}, NewState}; + _ -> + {reply, {error, error_starting_pool}, State} + end. + +-spec do_change_rate( + name(), amoc_throttle:rate(), amoc_throttle:interval(), throttle_info(), state()) -> + {reply, ok, state()} | + {reply, {error, any()}, state()}. +do_change_rate(Name, Rate, Interval, Info, State) -> + NewInfo = do_change_rate(Name, Rate, Interval, Info), + {reply, ok, State#{Name => NewInfo}}. + +-spec do_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval(), throttle_info()) -> + throttle_info(). +do_change_rate(Name, Rate, Interval, #throttle_info{pool_config = OldPoolConfig} = Info) -> + NewPoolConfig = pool_config(Rate, Interval), + report_rate(Name, Rate, Interval), + PoolConfig1 = update_throttle_processes(Name, OldPoolConfig, NewPoolConfig), + Info#throttle_info{rate = Rate, interval = Interval, pool_config = PoolConfig1}. -spec change_rate_and_stop_plan(name(), state(), throttle_info(), change_rate_plan()) -> state(). change_rate_and_stop_plan(Name, State, Info, Plan) -> Interval = Info#throttle_info.interval, TRef = Plan#change_rate_plan.timer, HighRate = Plan#change_rate_plan.high_rate, - ok = do_change_rate(Name, HighRate, Interval), + Info1 = do_change_rate(Name, HighRate, Interval, Info), {ok, cancel} = timer:cancel(TRef), consume_all_timer_ticks({change_plan, Name}), - State#{Name => Info#throttle_info{rate = HighRate, change_plan = undefined}}. + State#{Name => Info1#throttle_info{change_plan = undefined}}. + +do_run_op(stop, Name, #throttle_info{pool_sup = PoolSup}, State) -> + ok = amoc_throttle_pooler:stop_pool(PoolSup), + {reply, ok, maps:remove(Name, State)}; +do_run_op(pause, Name, #throttle_info{pool_config = PoolConfig} = Info, State) -> + Fun = fun(_, #{pid := Pid}) -> + amoc_throttle_process:update(Pid, 0, infinity) + end, + maps:foreach(Fun, PoolConfig), + {reply, ok, State#{Name => Info#throttle_info{active = false}}}; +do_run_op(unlock, Name, #throttle_info{pool_config = PoolConfig} = Info, State) -> + Fun = fun(_, #{pid := Pid}) -> + amoc_throttle_process:update(Pid, infinity, 0) + end, + maps:foreach(Fun, PoolConfig), + {reply, ok, State#{Name => Info#throttle_info{active = true}}}; +do_run_op(resume, Name, #throttle_info{pool_config = PoolConfig} = Info, State) -> + Fun = fun(_, #{max_n := MaxN, delay := Delay, pid := Pid}) -> + amoc_throttle_process:update(Pid, MaxN, Delay) + end, + maps:foreach(Fun, PoolConfig), + {reply, ok, State#{Name => Info#throttle_info{active = true}}}. consume_all_timer_ticks(Msg) -> receive @@ -247,132 +329,127 @@ continue_plan(Name, State, Info, Plan) -> NoOfSteps = Plan#change_rate_plan.no_of_steps, StepSize = Plan#change_rate_plan.step_size, NewRate = LowRate + StepSize, - ok = do_change_rate(Name, NewRate, Info#throttle_info.interval), + Info1 = do_change_rate(Name, NewRate, Info#throttle_info.interval, Info), NewPlan = Plan#change_rate_plan{no_of_steps = NoOfSteps - 1}, - State#{Name => Info#throttle_info{rate = NewRate, change_plan = NewPlan}}. - --spec rate_per_minute(amoc_throttle:rate(), amoc_throttle:interval()) -> amoc_throttle:rate(). -rate_per_minute(infinity, _) -> infinity; -rate_per_minute(_, 0) -> 0; -rate_per_minute(Rate, Interval) -> - (Rate * 60000) div Interval. - --spec start_processes(name(), config()) -> throttle_info(). -start_processes(Name, #{rate := Rate, interval := Interval, parallelism := NoOfProcesses}) -> - raise_event(Name, init), - RatePerMinute = rate_per_minute(Rate, Interval), - report_rate(Name, RatePerMinute), - RealNoOfProcs = expected_no_of_processes(Rate, NoOfProcesses), - start_throttle_processes(Name, Interval, Rate, RealNoOfProcs), - #throttle_info{rate = Rate, interval = Interval, active = true, no_of_procs = RealNoOfProcs}. - --spec maybe_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval(), throttle_info()) -> - ok | {error, any()}. -maybe_change_rate(Name, Rate, Interval, Info) -> - CurrentRatePerMin = rate_per_minute(Info#throttle_info.rate, Info#throttle_info.interval), - ReqRatePerMin = rate_per_minute(Rate, Interval), - case {CurrentRatePerMin, Info#throttle_info.change_plan} of - {ReqRatePerMin, _} -> ok; - {_, undefined} -> do_change_rate(Name, Rate, Interval); - _ -> {error, cannot_change_rate} - end. - --spec do_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}. -do_change_rate(Name, Rate, Interval) -> - case amoc_throttle_process:get_throttle_processes(Name) of - {ok, List} -> - RatePerMinute = rate_per_minute(Rate, Interval), - report_rate(Name, RatePerMinute), - update_throttle_processes(List, Interval, Rate, length(List)), - ok; - Error -> - Error - end. - --spec start_gradual_rate_change(name(), throttle_info(), gradual_rate_change()) -> - throttle_info(). -start_gradual_rate_change(Name, Info, + State#{Name => Info1#throttle_info{change_plan = NewPlan}}. + +%% Helpers + +-spec pool_config(amoc_throttle:rate(), amoc_throttle:interval()) -> pool_config(). +pool_config(infinity, _) -> + Config = #{max_n => infinity, delay => 0, status => active, pid => undefined}, + maps:from_keys(lists:seq(1, no_of_processes()), Config); +pool_config(0, _) -> + Config = #{max_n => 0, delay => infinity, status => active, pid => undefined}, + maps:from_keys(lists:seq(1, no_of_processes()), Config); +pool_config(Rate, Interval) -> + NoOfProcesses = no_of_processes(), + RatePerMinutePerProcess = (60000 * Rate div Interval) div NoOfProcesses, + DelayPerProcess = (NoOfProcesses * Interval) div Rate, + Rem = ((60000 * Rate div Interval) rem NoOfProcesses) + + ((NoOfProcesses * Interval) rem Rate), + Fun = fun(N, {Acc, R}) -> + case {RatePerMinutePerProcess < NoOfProcesses, R} of + {true, 0} -> + Config = #{max_n => RatePerMinutePerProcess, + delay => DelayPerProcess + 1, + status => inactive, pid => undefined}, + {Acc#{N => Config}, R}; + {true, R} -> + Config = #{max_n => RatePerMinutePerProcess, + delay => DelayPerProcess, + status => active, pid => undefined}, + {Acc#{N => Config}, R - 1}; + {false, 0} -> + Config = #{max_n => RatePerMinutePerProcess, + delay => DelayPerProcess, + status => active, pid => undefined}, + {Acc#{N => Config}, R}; + {false, R} -> + Config = #{max_n => RatePerMinutePerProcess, + delay => DelayPerProcess + 1, + status => active, pid => undefined}, + {Acc#{N => Config}, R - 1} + end + end, + {PoolConfig, _} = lists:foldl(Fun, {#{}, Rem}, lists:seq(1, NoOfProcesses)), + PoolConfig. + +-spec process_config(amoc_throttle:name(), pid(), pool_config()) -> pool_config(). +process_config(Name, PoolSup, PoolConfig) -> + Processes = supervisor:which_children(PoolSup), + Workers = [ {N, Pid} || {{amoc_throttle_process, N}, Pid, _, _} <- Processes, is_pid(Pid) ], + Fun1 = fun(N, Config) -> + {_, Pid} = lists:keyfind(N, 1, Workers), + Config#{pid => Pid} + end, + PoolConfig1 = maps:map(Fun1, PoolConfig), + process_pool(Name, PoolConfig1). + +-spec process_pool(amoc_throttle:name(), pool_config()) -> pool_config(). +process_pool(Name, PoolConfig1) -> + Fun2 = fun({_, #{status := active, pid := Pid}}) -> + {true, Pid}; + (_) -> + false + end, + Pids = lists:filtermap(Fun2, maps:to_list(PoolConfig1)), + pg:join(?PG_SCOPE, Name, Pids), + PoolConfig1. + +-spec do_gradual_change_rate(name(), throttle_info(), gradual(), state()) -> + {reply, ok, state()}. +do_gradual_change_rate(Name, Info, #{from_rate := LowRate, to_rate := HighRate, interval := RateInterval, - step_interval := StepInterval, step_count := StepCount, step_size := StepSize}) -> - ok = do_change_rate(Name, LowRate, RateInterval), + step_interval := StepInterval, step_count := StepCount, step_size := StepSize}, State) -> + Info1 = do_change_rate(Name, LowRate, RateInterval, Info), {ok, Timer} = timer:send_interval(StepInterval, {change_plan, Name}), Plan = #change_rate_plan{high_rate = HighRate, timer = Timer, no_of_steps = StepCount, step_size = StepSize}, - Info#throttle_info{rate = LowRate, interval = RateInterval, change_plan = Plan}. - -start_throttle_processes(Name, Interval, Rate, N) -> - ok = amoc_throttle_pool:start_process_pool(Name, Interval, Rate, N). - -update_throttle_processes([Pid], Interval, Rate, 1) -> - amoc_throttle_process:update(Pid, Interval, Rate); -update_throttle_processes([Pid | Tail], Interval, Rate, N) when N > 1 -> - ProcessRate = Rate div N, - amoc_throttle_process:update(Pid, Interval, ProcessRate), - update_throttle_processes(Tail, Interval, Rate - ProcessRate, N - 1). - -run_in_all_processes(Name, Cmd) -> - case amoc_throttle_process:get_throttle_processes(Name) of - {ok, List} -> - [run_cmd(P, Cmd) || P <- List], - ok; - Error -> - Error - end. - -verify_new_start_matches_running(Name, Config, Group, State) -> - #{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config, - ExpectedNoOfProcesses = expected_no_of_processes(Rate, NoOfProcesses), - case {length(Group), State} of - {ExpectedNoOfProcesses, #{Name := #throttle_info{rate = Rate, interval = Interval}}} -> - {reply, {ok, already_started}, State}; - {ExpectedNoOfProcesses, #{Name := #throttle_info{}}} -> - {reply, {error, wrong_reconfiguration}, State}; - _ -> - {reply, {error, wrong_no_of_procs}, State} - end. - -expected_no_of_processes(0, NoOfProcesses) -> - min(1, NoOfProcesses); -expected_no_of_processes(Rate, NoOfProcesses) -> - min(Rate, NoOfProcesses). - -run_cmd(Pid, stop) -> - amoc_throttle_process:stop(Pid); -run_cmd(Pid, pause) -> - amoc_throttle_process:pause(Pid); -run_cmd(Pid, resume) -> - amoc_throttle_process:resume(Pid); -run_cmd(Pid, unlock) -> - amoc_throttle_process:unlock(Pid). - --spec verify_config(amoc_throttle:config()) -> amoc_throttle:config() | {error, any()}. -verify_config(#{interarrival := infinity} = Config) -> - Config1 = #{rate => 0, interval => ?DEFAULT_INTERVAL}, - Config1#{parallelism => maps:get(parallelism, Config, ?DEFAULT_NO_PROCESSES)}; -verify_config(#{interarrival := 0} = Config) -> - Config1 = #{rate => infinity, interval => ?DEFAULT_INTERVAL}, - Config1#{parallelism => maps:get(parallelism, Config, ?DEFAULT_NO_PROCESSES)}; + NewInfo = Info1#throttle_info{rate = LowRate, interval = RateInterval, change_plan = Plan}, + {reply, ok, State#{Name => NewInfo}}. + +-spec update_throttle_processes(amoc_throttle:name(), pool_config(), pool_config()) -> + pool_config(). +update_throttle_processes(Name, OldPoolConfig, NewPoolConfig) -> + Fun = fun(N, #{status := Status, delay := Delay, max_n := MaxN} = V, {C, J, L}) -> + #{status := OldStatus, pid := Pid} = maps:get(N, C), + amoc_throttle_process:update(Pid, MaxN, Delay), + case {Status, OldStatus} of + {active, inactive} -> + {C#{N := V#{pid := Pid}}, [Pid | J], L}; + {inactive, active} -> + {C#{N := V#{pid := Pid}}, J, [Pid | L]}; + {Same, Same} -> + {C#{N := V#{pid := Pid}}, J, L} + end + end, + {PoolConfig, Join, Leave} = maps:fold(Fun, {OldPoolConfig, [], []}, NewPoolConfig), + pg:join(?PG_SCOPE, Name, Join), + pg:leave(?PG_SCOPE, Name, Leave), + PoolConfig. + +-spec verify_config(amoc_throttle:t()) -> config() | {error, any()}. +verify_config(#{interarrival := infinity} = Config) + when 1 =:= map_size(Config) -> + #{rate => 0, interval => ?DEFAULT_INTERVAL}; +verify_config(#{interarrival := 0} = Config) + when 1 =:= map_size(Config) -> + #{rate => infinity, interval => ?DEFAULT_INTERVAL}; verify_config(#{interarrival := Interarrival} = Config) - when ?POS_INT(Interarrival) -> - Config1 = #{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL}, - Config1#{parallelism => maps:get(parallelism, Config, ?DEFAULT_NO_PROCESSES)}; -verify_config(#{rate := Rate, interval := Interval, parallelism := NoOfProcesses} = Config) - when ?TIMEOUT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) -> - Config; + when 1 =:= map_size(Config), ?POS_INT(Interarrival) -> + #{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL}; verify_config(#{rate := Rate, interval := Interval} = Config) - when ?TIMEOUT(Rate), ?NONNEG_INT(Interval) -> - Config#{parallelism => ?DEFAULT_NO_PROCESSES}; -verify_config(#{rate := Rate, parallelism := NoOfProcesses} = Config) - when ?TIMEOUT(Rate), ?POS_INT(NoOfProcesses) -> - Config#{interval => ?DEFAULT_INTERVAL}; + when 2 =:= map_size(Config), ?TIMEOUT(Rate), ?POS_INT(Interval) -> + Config; verify_config(#{rate := Rate} = Config) - when ?TIMEOUT(Rate) -> - Config#{interval => ?DEFAULT_INTERVAL, parallelism => ?DEFAULT_NO_PROCESSES}; + when 1 =:= map_size(Config), ?TIMEOUT(Rate) -> + Config#{interval => ?DEFAULT_INTERVAL}; verify_config(_Config) -> {error, invalid_throttle}. -spec verify_gradual_config(amoc_throttle:gradual_rate_config()) -> - gradual_rate_change() | {error, any()}. + gradual() | {error, any()}. verify_gradual_config(Config) -> try do_verify_gradual_config(Config) of Change -> Change @@ -388,7 +465,13 @@ check_step_size_with_from_to_rate(From, To, StepSize) when From > To, StepSize < check_step_parameters(StepSize, StepSize) -> ok. --spec do_verify_gradual_config(amoc_throttle:gradual_rate_config()) -> gradual_rate_change(). +-spec do_verify_gradual_config(amoc_throttle:gradual_rate_config()) -> gradual(). +do_verify_gradual_config( + #{from_interarrival := FromInterarrival, to_interarrival := ToInterarrival} = Config0) -> + FromRate = ?DEFAULT_INTERVAL div FromInterarrival, + ToRate = ?DEFAULT_INTERVAL div ToInterarrival, + Config1 = Config0#{from_rate => FromRate, to_rate => ToRate}, + do_verify_gradual_config(maps:without([from_interarrival, to_interarrival], Config1)); do_verify_gradual_config( #{from_rate := FromRate, to_rate := ToRate, step_interval := StepInterval, step_count := StepCount, step_size := StepSize} = Config) -> diff --git a/src/throttle/amoc_throttle_pool.erl b/src/throttle/amoc_throttle_pool.erl index 4132a5a8..e1764b7a 100644 --- a/src/throttle/amoc_throttle_pool.erl +++ b/src/throttle/amoc_throttle_pool.erl @@ -5,52 +5,29 @@ -behaviour(supervisor). --export([start_process_pool/4]). --export([start_link/4, init/1]). +-export([start_link/2, init/1]). --spec start_process_pool( - amoc_throttle:name(), - amoc_throttle:interval(), - amoc_throttle:rate(), - pos_integer() - ) -> ok | error. -start_process_pool(Name, Interval, Rate, NoOfProcesses) -> - {ok, _} = supervisor:start_child(amoc_throttle_pooler, [Name, Interval, Rate, NoOfProcesses]), - ok. +-spec start_link(amoc_throttle:name(), dynamic()) -> supervisor:startlink_ret(). +start_link(Name, PoolConfig) -> + case supervisor:start_link(?MODULE, {Name, PoolConfig}) of + {ok, Pid} -> + {ok, Pid}; + Else -> + Else + end. --spec start_link( - amoc_throttle:name(), - amoc_throttle:interval(), - amoc_throttle:rate(), - pos_integer() - ) -> supervisor:startlink_ret(). -start_link(Name, Interval, Rate, NoOfProcesses) when NoOfProcesses > 0 -> - supervisor:start_link(?MODULE, {Name, Interval, Rate, NoOfProcesses}). - --spec init({amoc_throttle:name(), amoc_throttle:rate(), amoc_throttle:interval(), pos_integer()}) -> +-spec init({amoc_throttle:name(), dynamic()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. -init({Name, Interval, Rate, NoOfProcesses}) -> - RatesPerProcess = calculate_rate_per_process(Rate, NoOfProcesses), - Tags = lists:seq(1, NoOfProcesses), +init({Name, ConfigPerProcess}) -> Children = [ - #{id => {amoc_throttle_process, Name, N}, - start => {amoc_throttle_process, start_link, [Name, Interval, RatePerProcess]}, + #{id => {amoc_throttle_process, N}, + start => {amoc_throttle_process, start_link, [Name, MaxN, Delay]}, type => worker, - shutdown => timer:seconds(5), + shutdown => timer:seconds(60), restart => transient, modules => [amoc_throttle_process] } - || {RatePerProcess, N} <- lists:zip(RatesPerProcess, Tags) + || {N, #{max_n := MaxN, delay := Delay}} <- maps:to_list(ConfigPerProcess) ], SupFlags = #{strategy => one_for_one, intensity => 0}, {ok, {SupFlags, Children}}. - -%% Helpers -calculate_rate_per_process(Rate, NoOfProcesses) -> - calculate_rate_per_process([], Rate, NoOfProcesses). - -calculate_rate_per_process(Acc, Rate, 1) -> - [Rate | Acc]; -calculate_rate_per_process(Acc, Rate, N) when is_integer(N), N > 1 -> - ProcessRate = Rate div N, - calculate_rate_per_process([ProcessRate | Acc], Rate - ProcessRate, N - 1). diff --git a/src/throttle/amoc_throttle_pooler.erl b/src/throttle/amoc_throttle_pooler.erl index 38ca1bfc..4b069ce9 100644 --- a/src/throttle/amoc_throttle_pooler.erl +++ b/src/throttle/amoc_throttle_pooler.erl @@ -5,13 +5,23 @@ -behaviour(supervisor). +-export([start_pool/2, stop_pool/1]). -export([start_link/0, init/1]). +-spec start_pool(amoc_throttle:name(), dynamic()) -> + supervisor:startchild_ret(). +start_pool(Name, PoolConfig) -> + supervisor:start_child(amoc_throttle_pooler, [Name, PoolConfig]). + +-spec stop_pool(pid()) -> ok. +stop_pool(Pool) -> + ok = supervisor:terminate_child(amoc_throttle_pooler, Pool). + -spec start_link() -> supervisor:startlink_ret(). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +-spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init([]) -> ChildSpec = #{id => amoc_throttle_pool, start => {amoc_throttle_pool, start_link, []}, diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index 913f1c22..df2b39e5 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -112,7 +112,7 @@ format_status(#{state := State} = FormatStatus) -> initial_state(Name, infinity, 0) -> #state{name = Name, max_n = infinity, delay_between_executions = 0}; -initial_state(Name, 0, infinity) -> +initial_state(Name, MaxN, infinity) when is_integer(MaxN) -> #state{name = Name, max_n = 0, delay_between_executions = infinity}; initial_state(Name, MaxN, Delay) when is_integer(MaxN), is_integer(Delay) -> #state{name = Name, max_n = MaxN, delay_between_executions = Delay}. diff --git a/src/throttle/amoc_throttle_runner.erl b/src/throttle/amoc_throttle_runner.erl index 587f11d1..51726383 100644 --- a/src/throttle/amoc_throttle_runner.erl +++ b/src/throttle/amoc_throttle_runner.erl @@ -17,7 +17,7 @@ run(RunnerPid) -> -spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}. throttle(Name, Action) -> - case amoc_throttle_process:get_throttle_process(Name) of + case amoc_throttle_controller:get_throttle_process(Name) of {ok, ThrottlerPid} -> Args = [Name, self(), ThrottlerPid, Action], RunnerPid = erlang:spawn_link(?MODULE, async_runner, Args), diff --git a/src/throttle/amoc_throttle_sup.erl b/src/throttle/amoc_throttle_sup.erl index f7999d88..3e78d91e 100644 --- a/src/throttle/amoc_throttle_sup.erl +++ b/src/throttle/amoc_throttle_sup.erl @@ -25,8 +25,6 @@ -behaviour(supervisor). --define(PG_SCOPE, amoc_throttle). - -export([start_link/0, init/1]). -spec start_link() -> supervisor:startlink_ret(). @@ -37,7 +35,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one}, Pg = #{id => pg, - start => {pg, start_link, [?PG_SCOPE]}, + start => {pg, start_link, [amoc_throttle_controller:pg_scope()]}, type => worker, shutdown => timer:seconds(5), restart => permanent, diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index 4752ee1a..1d2cab17 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -4,7 +4,6 @@ -compile([export_all, nowarn_export_all]). --define(DEFAULT_NO_PROCESSES, 10). -define(DEFAULT_INTERVAL, 60000). %% one minute -define(RECV(Msg, Timeout), receive Msg -> ok after Timeout -> {error, not_received_yet} end). @@ -24,17 +23,18 @@ groups() -> start_interarrival_infinity, start_rate_zero, start_rate_infinity, - low_rate_gets_remapped, - low_interval_get_remapped, + low_rate_does_not_get_remapped, + low_interval_does_not_get_remapped, start_and_stop, change_rate, + change_rate_triggers_paralellism, change_rate_gradually, + change_interarrival_gradually, change_rate_gradually_verify_descriptions, just_wait, wait_for_process_to_die_sends_a_kill, async_runner_dies_while_waiting_raises_exit, async_runner_dies_when_throttler_dies, - run_with_interval_zero_limits_only_number_of_parallel_executions, pause_and_resume_and_unlock, get_state ]} @@ -69,93 +69,61 @@ start(_) -> ?assertMatch({ok, already_started}, amoc_throttle:start(?FUNCTION_NAME, 100)), ?assertMatch({error, wrong_reconfiguration}, - amoc_throttle:start(?FUNCTION_NAME, 101)), - ?assertMatch({error, wrong_no_of_procs}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 100, - parallelism => ?DEFAULT_NO_PROCESSES + 1})). + amoc_throttle:start(?FUNCTION_NAME, 101)). start_descriptive(_) -> %% Starts successfully - Description = #{rate => 100, interval => 5000, parallelism => 12}, + Description = #{rate => 100, interval => 5000}, ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)). start_interarrival(_) -> %% Starts successfully - Description = #{interarrival => 50, parallelism => 1}, + Description = #{interarrival => 50}, ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := 50, - n := 1200}, - State). + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := 1200}, State). start_interarrival_zero(_) -> %% Starts successfully - Description = #{interarrival => 0, parallelism => 1}, + Description = #{interarrival => 0}, ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := 0, - n := infinity}, - State). + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := infinity}, State). start_interarrival_infinity(_) -> %% Starts successfully - Description = #{interarrival => infinity, parallelism => 1}, + Description = #{interarrival => infinity}, ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := infinity, - n := 0}, - State). + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := 0}, State). start_rate_zero(_) -> %% Starts successfully - Description = #{rate => 0, parallelism => 1}, + Description = #{rate => 0}, ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := infinity, - n := 0}, - State). + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := 0}, State). start_rate_infinity(_) -> %% Starts successfully - Description = #{rate => infinity, parallelism => 1}, + Description = #{rate => infinity}, ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := 0, - n := infinity}, - State). - -low_rate_gets_remapped(_) -> + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := infinity}, State). + +low_rate_does_not_get_remapped(_) -> ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, - #{rate => 2, interval => 100, parallelism => 1})), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 100, - delay_between_executions := 50}, - State), - assert_telemetry_event([amoc, throttle, process], warning, ?FUNCTION_NAME, 2, 100). - -low_interval_get_remapped(_) -> + #{rate => 2, interval => 100})), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 100, rate := 2}, State). + +low_interval_does_not_get_remapped(_) -> ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, - #{rate => 1, interval => 1, parallelism => 1})), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 1, - delay_between_executions := 10}, - State), - assert_telemetry_event([amoc, throttle, process], warning, ?FUNCTION_NAME, 1, 1). + #{rate => 1, interval => 1})), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 1, rate := 1}, State). start_and_stop(_) -> %% Starts successfully @@ -169,8 +137,7 @@ start_and_stop(_) -> change_rate(_) -> ?assertMatch({error, {no_throttle_by_name, ?FUNCTION_NAME}}, - amoc_throttle:change_rate(?FUNCTION_NAME, - #{rate => 100, interval => ?DEFAULT_INTERVAL})), + amoc_throttle:change_rate(?FUNCTION_NAME, #{rate => 100})), ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), E1 = #{rate => 100, interval => ?DEFAULT_INTERVAL}, ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E1)), @@ -179,13 +146,21 @@ change_rate(_) -> E3 = #{rate => 100, interval => ?DEFAULT_INTERVAL + 2}, ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E3)). +change_rate_triggers_paralellism(_) -> + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1)), + #{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME), + ?assertEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0))), + E1 = #{rate => 60000}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E1)), + #{pool_config := Config1} = get_throttle_info(?FUNCTION_NAME), + ?assertNotEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config1))). + change_rate_gradually(_) -> C1 = #{from_rate => 100, to_rate => 200, interval => 1, step_interval => 1, step_count => 1}, ?assertMatch({error, {no_throttle_by_name, ?FUNCTION_NAME}}, amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C1)), ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), - C2 = #{from_rate => 10, to_rate => 3000, interval => 1, step_interval => 100, step_count => 300}, ?assertMatch(ok, amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C2)), @@ -197,6 +172,20 @@ change_rate_gradually(_) -> E1 = #{rate => 100, interval => ?DEFAULT_INTERVAL + 1}, ?assertMatch({error, cannot_change_rate}, amoc_throttle:change_rate(?FUNCTION_NAME, E1)). +change_interarrival_gradually(_) -> + C1 = #{from_interarrival => 100, to_interarrival => 200}, + ?assertMatch({error, {no_throttle_by_name, ?FUNCTION_NAME}}, + amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C1)), + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), + C2 = #{from_interarrival => 10, to_interarrival => 3000}, + ?assertMatch(ok, amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C2)), + %% We cannot change rate while a current gradual change is already running. + C3 = #{from_interarrival => 50, to_interarrival => 200}, + ?assertMatch({error, cannot_change_rate}, + amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C3)), + E1 = #{rate => 100, interval => ?DEFAULT_INTERVAL + 1}, + ?assertMatch({error, cannot_change_rate}, amoc_throttle:change_rate(?FUNCTION_NAME, E1)). + %% Bad description also fails change_rate_gradually_verify_descriptions(_) -> %% Condition 0 @@ -252,13 +241,12 @@ change_rate_gradually_verify_descriptions(_) -> amoc_throttle_controller:verify_gradual_config(E1)). just_wait(_) -> - %% it failts if the throttle wasn't started yet + %% it fails if the throttle wasn't started yet ?assertMatch({error, no_throttle_process_registered}, amoc_throttle:wait(?FUNCTION_NAME)), - %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 100, interval => 10, parallelism => 1})), + %% Start 100-per-10ms throttle, that is, 600k per minute + Description = #{rate => 100, interval => 10}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), %% wait passes fine ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), %% One message is received sufficiently fast @@ -266,53 +254,39 @@ just_wait(_) -> ?assertMatch(ok, ?RECV(receive_this, 100)), %% If someone else fills the throttle heavily, %% it will take proportionally so long to execute for me - fill_throttle(?FUNCTION_NAME, 100 * 10), + %% TODO + fill_throttle(?FUNCTION_NAME, 100 * 100), amoc_throttle:send(?FUNCTION_NAME, receive_this), ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). wait_for_process_to_die_sends_a_kill(_) -> erlang:process_flag(trap_exit, true), - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 100, interval => 10, parallelism => 1})), + Description = #{rate => 100, interval => 10}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), amoc_throttle:run(?FUNCTION_NAME, fun() -> exit(?FUNCTION_NAME) end), ?assertMatch(ok, ?RECV({'EXIT', _, ?FUNCTION_NAME}, 100)). async_runner_dies_while_waiting_raises_exit(_) -> - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 1, interval => 1, parallelism => 1})), + Description = #{rate => 1, interval => 1}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), find_new_link_and_kill_it(self()), ?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)). async_runner_dies_when_throttler_dies(_) -> erlang:process_flag(trap_exit, true), {links, OriginalLinks} = erlang:process_info(self(), links), - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 1, interval => 60000, parallelism => 1})), + Description = #{rate => 1, interval => 60000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), wait_until_one_throttle_worker(?FUNCTION_NAME), amoc_throttle:send(?FUNCTION_NAME, receive_this), wait_until_one_async_runner(self(), OriginalLinks), amoc_throttle:stop(?FUNCTION_NAME), ?assertMatch(ok, ?RECV({'EXIT', _, {throttler_worker_died, _, _}}, 100)). -run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> - %% Start 10 actions at once in 10 processes - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 10, interval => 0, parallelism => 1})), - %% If someone else fills the throttle heavily, - %% it will take proportionally so long to execute for me - fill_throttle(?FUNCTION_NAME, 100), - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, ?RECV(receive_this, 200)). - pause_and_resume_and_unlock(_) -> - %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 5000, parallelism => 1})), + %% Start a 10-per-ms throttle + Description = #{rate => 600000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), %% pauses runs correctly ?assertMatch(ok, amoc_throttle:pause(?FUNCTION_NAME)), @@ -324,24 +298,18 @@ pause_and_resume_and_unlock(_) -> ?assertMatch(ok, ?RECV(receive_this, 200)), %% If unlocked, all messages are always received ?assertMatch(ok, amoc_throttle:unlock(?FUNCTION_NAME)), - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, ?RECV(receive_this, 200)), + amoc_throttle:send(?FUNCTION_NAME, receive_this_too), + ?assertMatch(ok, ?RECV(receive_this_too, 200)), %% From unlock it can resume ?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - delay_between_executions := 12}, - State). + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{rate := 600000}, State). get_state(_) -> - ?assertMatch({ok, started}, - amoc_throttle:start(?FUNCTION_NAME, - #{rate => 100, interval => 60000, parallelism => 1})), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := 600}, - State). + Config = #{rate => 100, interval => 60000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Config)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{rate := 100, interval := 60000, active := true}, State). %% Helpers @@ -363,11 +331,10 @@ get_number_of_workers(Name) -> Processes = get_throttle_workers(Name), length(Processes). -get_state_of_one_process(Name) -> - Processes = get_throttle_workers(Name), - ?assertMatch([_ | _], Processes), - [Process | _] = Processes, - amoc_throttle_process:get_state(Process). +get_throttle_info(Name) -> + Info = amoc_throttle_controller:get_info(Name), + ?assert(is_map(Info)), + Info. wait_until_one_throttle_worker(Name) -> GetWorkers = fun() -> get_throttle_workers(Name) end,