Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interarrival as throttle #186

Draft
wants to merge 8 commits into
base: throttle_api
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ Amoc supports the following generic configuration parameters:
* default value - empty list (`[]`)
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`


* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
for two consecutive users:
* default value - 50 ms.
* example: `AMOC_INTERARRIVAL="50"`
* `interarrival` - a throttle rate (in units per millisecond) between creating the processes
for two consecutive users, or 0 for no throttling:
* default value - {1200, 60000}, i.e., a new user every 50ms.
* example: `AMOC_INTERARRIVAL="{1200, 60000}"`
* this parameter can be updated at runtime (in the same way as scenario configuration).

* `extra_code_paths` - a list of paths that should be included using `code:add_pathsz/1` interface
Expand Down
1 change: 1 addition & 0 deletions integration_test/helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ function compile_file() {

function contains_all() {
local output="$(cat -)"
echo "Checking if \""$*"\" are contained in \"$output\""
local ret= acc=0
for pattern in "$@"; do
ret="$(echo "$output" | grep -L -e "$pattern" | wc -l)"
Expand Down
136 changes: 55 additions & 81 deletions src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@
-behaviour(gen_server).

-define(SERVER, ?MODULE).
-define(DEFAULT_INTERARRIVAL, 1200).

-required_variable(#{name => interarrival, default_value => 50,
verification => {?MODULE, non_neg_integer, 1},
description => "a delay between creating the processes for two "
"consecutive users (ms, def: 50ms)",
update => {?MODULE, maybe_update_interarrival_timer, 2}}).
-required_variable(#{name => interarrival, default_value => ?DEFAULT_INTERARRIVAL,
verification => {?MODULE, verify_interarrival, 1},
description => "Throttle rate for the Scenario:start/1,2 callback (def: 50ms)",
update => {?MODULE, update_interarrival_rate, 2}}).

-record(state, {scenario :: amoc:scenario() | undefined,
last_user_id = 0 :: last_user_id(),
status = idle :: idle | running | terminating | finished |
{error, any()} | disabled,
scenario_state :: any(), %% state returned from Scenario:init/0
create_users = [] :: [amoc_scenario:user_id()],
tref :: timer:tref() | undefined}).
scenario_state :: amoc_scenario:state() %% state returned from Scenario:init/0
}).

-type state() :: #state{}.
%% Internal state of the node's controller
Expand All @@ -41,7 +40,7 @@
%% Number of users currently running in the node
-type last_user_id() :: non_neg_integer().
%% Highest user id registered in the node
-type interarrival() :: non_neg_integer().
-type interarrival() :: amoc_throttle:rate() | amoc_throttle:throttle().
%% Time to wait in between spawning new users

%% ------------------------------------------------------------------
Expand All @@ -65,14 +64,14 @@
%% ------------------------------------------------------------------
%% Parameters verification functions
%% ------------------------------------------------------------------
-export([maybe_update_interarrival_timer/2, non_neg_integer/1]).
-export([update_interarrival_rate/2, verify_interarrival/1]).

-export([zero_users_running/0]).
-export([get_interarrival/0, zero_users_running/0]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

%% ------------------------------------------------------------------
%% API Function Definitions
Expand Down Expand Up @@ -122,14 +121,22 @@ disable() ->
gen_server:call(?SERVER, disable).

%% @private
-spec non_neg_integer(any()) -> boolean().
non_neg_integer(Interarrival) ->
is_integer(Interarrival) andalso Interarrival >= 0.
-spec verify_interarrival(any()) -> boolean().
verify_interarrival(infinity) ->
true;
verify_interarrival(Rate)
when is_integer(Rate), Rate >= 0 ->
true;
verify_interarrival(#{rate := Rate, interval := Interval})
when is_integer(Rate), Rate >= 0, is_integer(Interval), Interval > 0 ->
true;
verify_interarrival(_) ->
false.

%% @private
-spec maybe_update_interarrival_timer(interarrival, term()) -> ok.
maybe_update_interarrival_timer(interarrival, _) ->
gen_server:cast(?SERVER, maybe_update_interarrival_timer).
-spec update_interarrival_rate(interarrival, amoc_throttle:throttle()) -> ok.
update_interarrival_rate(interarrival, #{rate := Rate, interval := Interval}) ->
ok = amoc_throttle:change_rate(interarrival, Rate, Interval).

%% @private
-spec zero_users_running() -> ok.
Expand Down Expand Up @@ -180,8 +187,6 @@ handle_call(_Request, _From, State) ->

%% @private
-spec handle_cast(any(), state()) -> {noreply, state()}.
handle_cast(maybe_update_interarrival_timer, State) ->
{noreply, maybe_update_interarrival_timer(State)};
handle_cast(zero_users_running, State) ->
NewSate = handle_zero_users_running(State),
{noreply, NewSate};
Expand All @@ -190,15 +195,14 @@ handle_cast(_Msg, State) ->

%% @private
-spec handle_info(any(), state()) -> {noreply, state()}.
handle_info(start_user, State) ->
NewSate = handle_start_user(State),
{noreply, NewSate};
handle_info(start_all_users, State) ->
NewSate = handle_start_all_users(State),
{noreply, NewSate};
handle_info(_Msg, State) ->
{noreply, State}.

%% @private
-spec terminate(term(), state()) -> any().
terminate(_Reason, _State) ->
amoc_users_sup:terminate_all_children().

%% ------------------------------------------------------------------
%% internal functions
%% ------------------------------------------------------------------
Expand Down Expand Up @@ -244,17 +248,14 @@ handle_update_settings(_Settings, #state{status = Status}) ->
-spec handle_add(amoc_scenario:user_id(), amoc_scenario:user_id(), state()) ->
{handle_call_res(), state()}.
handle_add(StartId, EndId, #state{last_user_id = LastId,
create_users = ScheduledUsers,
status = running,
scenario = Scenario,
tref = TRef} = State) when StartId =< EndId,
scenario_state = ScenarioState} = State) when StartId =< EndId,
LastId < StartId ->
amoc_telemetry:execute([controller, users], #{count => EndId - StartId + 1},
#{scenario => Scenario, type => add}),
NewUsers = lists:seq(StartId, EndId),
NewScheduledUsers = lists:append(ScheduledUsers, NewUsers),
NewTRef = maybe_start_timer(TRef),
{ok, State#state{create_users = NewScheduledUsers, tref = NewTRef, last_user_id = EndId}};
amoc_users_sup:start_children(Scenario, lists:seq(StartId, EndId), ScenarioState),
{ok, State#state{last_user_id = EndId}};
handle_add(_StartId, _EndId, #state{status = running} = State) ->
{{error, invalid_range}, State};
handle_add(_StartId, _EndId, #state{status = Status} = State) ->
Expand Down Expand Up @@ -287,23 +288,6 @@ handle_disable(#state{status = idle} = State) ->
handle_disable(#state{status = Status} = State) ->
{{error, {invalid_status, Status}}, State}.

-spec handle_start_user(state()) -> state().
handle_start_user(#state{create_users = [UserId | T],
scenario = Scenario,
scenario_state = ScenarioState} = State) ->
amoc_users_sup:start_child(Scenario, UserId, ScenarioState),
State#state{create_users = T};
handle_start_user(#state{create_users = [], tref = TRef} = State) ->
State#state{tref = maybe_stop_timer(TRef)}.

-spec handle_start_all_users(state()) -> state().
handle_start_all_users(#state{create_users = AllUsers,
scenario = Scenario,
scenario_state = ScenarioState,
tref = TRef} = State) ->
amoc_users_sup:start_children(Scenario, AllUsers, ScenarioState),
State#state{create_users = [], tref = maybe_stop_timer(TRef)}.

%% ------------------------------------------------------------------
%% helpers
%% ------------------------------------------------------------------
Expand All @@ -316,12 +300,15 @@ start_tables() -> %% ETS creation
{ok | error, any()}.
init_scenario(Scenario, Settings) ->
case amoc_config_scenario:parse_scenario_settings(Scenario, Settings) of
ok -> amoc_scenario:init(Scenario);
ok ->
start_interarrival(),
amoc_scenario:init(Scenario);
{error, Type, Reason} -> {error, {Type, Reason}}
end.

-spec terminate_scenario(state()) -> ok | {ok, any()} | {error, any()}.
terminate_scenario(#state{scenario = Scenario, scenario_state = ScenarioState}) ->
stop_interarrival(),
amoc_scenario:terminate(Scenario, ScenarioState).

-spec handle_zero_users_running(state()) -> state().
Expand All @@ -331,35 +318,22 @@ handle_zero_users_running(#state{status = terminating} = State) ->
handle_zero_users_running(State) ->
State.

-spec maybe_stop_timer(timer:tref() | undefined) -> undefined.
maybe_stop_timer(undefined) ->
undefined;
maybe_stop_timer(TRef) ->
{ok, cancel} = timer:cancel(TRef),
undefined.

-spec get_interarrival() -> interarrival().
-spec get_interarrival() -> infinity | interarrival().
get_interarrival() ->
amoc_config:get(interarrival).

-spec maybe_update_interarrival_timer(state()) -> state().
maybe_update_interarrival_timer(#state{tref = undefined} = State) ->
State;
maybe_update_interarrival_timer(#state{tref = TRef} = State) ->
{ok, cancel} = timer:cancel(TRef),
Value = get_interarrival(),
NewTRef = do_interarrival(Value),
State#state{tref = NewTRef}.

-spec maybe_start_timer(timer:tref() | undefined) -> timer:tref().
maybe_start_timer(undefined) ->
Value = get_interarrival(),
do_interarrival(Value);
maybe_start_timer(TRef) -> TRef.

do_interarrival(0) ->
self() ! start_all_users,
undefined;
do_interarrival(Value) ->
{ok, NewTRef} = timer:send_interval(Value, start_user),
NewTRef.
amoc_config:get(interarrival, ?DEFAULT_INTERARRIVAL).

-spec start_interarrival() -> any().
start_interarrival() ->
case get_interarrival() of
infinity ->
amoc_throttle:start(interarrival, 1);
#{rate := Rate, interval := Interval} ->
Config = #{rate => Rate, interval => Interval},
amoc_throttle:start(interarrival, Config);
Rate ->
amoc_throttle:start(interarrival, Rate)
end.

-spec stop_interarrival() -> any().
stop_interarrival() ->
amoc_throttle:stop(interarrival).
Loading
Loading