Skip to content

Commit

Permalink
Keep a single table of users in the worker supervisors
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Jun 14, 2024
1 parent 0d11905 commit 42d65c5
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 169 deletions.
7 changes: 6 additions & 1 deletion src/amoc_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

%% ------------------------------------------------------------------
%% API Function Definitions
Expand Down Expand Up @@ -198,6 +198,11 @@ handle_cast(_Msg, State) ->
handle_info(_Msg, State) ->
{noreply, State}.

%% @private
-spec terminate(term(), state()) -> any().
terminate(_Reason, _State) ->
amoc_users_sup:terminate_all_children().

%% ------------------------------------------------------------------
%% internal functions
%% ------------------------------------------------------------------
Expand Down
123 changes: 68 additions & 55 deletions src/users/amoc_users_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
%%
%% It spawns a pool of workers as big as online schedulers. When starting a new user, as the user is
%% identified by ID, a worker will be chosen for this user based on its ID
%% (see get_sup_for_user_id/1).
%% (see gen_sup_from_userid/1).
%%
%% The currently running number of users is stored in an atomic that all workers update and the
%% controller can read.
Expand All @@ -16,9 +16,8 @@
-export([start_link/0, init/1]).

%% API
-export([incr_no_of_users/1, decr_no_of_users/1, count_no_of_users/0,
start_child/3, stop_child/2, start_children/3, stop_children/2, terminate_all_children/0]).

-export([handle_up_user/3, handle_down_user/2, count_no_of_users/0]).
-export([start_children/3, stop_child/2, stop_children/2, terminate_all_children/0]).
-export([distribute/2, get_all_children/0]).

-type count() :: non_neg_integer().
Expand All @@ -34,26 +33,34 @@
sups_count :: pos_integer()
}).

-define(SUPERVISOR, amoc_users_sup).
-define(STORAGE, amoc_users_sup_storage).
-define(TABLE, amoc_users_sup_table).

%% Supervisor

%% @private
-spec start_link() -> supervisor:startlink_ret().
start_link() ->
Ret = supervisor:start_link({local, ?MODULE}, ?MODULE, no_args),
UserSups = supervisor:which_children(?MODULE),
IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups],
Ret = supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, no_args),
UserSups = supervisor:which_children(?SUPERVISOR),
IndexedSupsUnsorted = [ {Pid, N} || {{amoc_users_worker_sup, N}, Pid, _, _} <- UserSups,
is_integer(N), is_pid(Pid)],
IndexedSups = lists:keysort(2, IndexedSupsUnsorted),
UserSupPidsTuple = list_to_tuple([ Pid || {Pid, _} <- IndexedSups ]),
SupCount = tuple_size(UserSupPidsTuple),
Atomics = atomics:new(1 + SupCount, [{signed, false}]),
Storage = #storage{user_count = Atomics, sups = UserSupPidsTuple,
sups_indexed = IndexedSups, sups_count = SupCount},
persistent_term:put(?MODULE, Storage),
persistent_term:put(?STORAGE, Storage),
Ret.

%% @private
-spec init(no_args) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
init(no_args) ->
EtsOpts = [ordered_set, public, named_table,
{read_concurrency, true}, {write_concurrency, auto}],
_Table = ets:new(?TABLE, EtsOpts),
Specs = [
#{
id => {amoc_users_worker_sup, N},
Expand All @@ -73,18 +80,20 @@ indexes() ->
%% API
-spec count_no_of_users() -> count().
count_no_of_users() ->
#storage{user_count = Atomics} = persistent_term:get(?MODULE),
#storage{user_count = Atomics} = persistent_term:get(?STORAGE),
atomics:get(Atomics, 1).

-spec incr_no_of_users(non_neg_integer()) -> any().
incr_no_of_users(SupNum) when SupNum > 1 ->
#storage{user_count = Atomics} = persistent_term:get(?MODULE),
-spec handle_up_user(non_neg_integer(), pid(), amoc_scenario:user_id()) -> any().
handle_up_user(SupNum, Pid, Id) when SupNum > 1 ->
ets:insert(?TABLE, {Pid, Id}),
#storage{user_count = Atomics} = persistent_term:get(?STORAGE),
atomics:add(Atomics, SupNum, 1),
atomics:add(Atomics, 1, 1).

-spec decr_no_of_users(non_neg_integer()) -> ok.
decr_no_of_users(SupNum) when SupNum > 1 ->
#storage{user_count = Atomics} = persistent_term:get(?MODULE),
-spec handle_down_user(non_neg_integer(), pid()) -> ok.
handle_down_user(SupNum, Pid) when SupNum > 1 ->
ets:delete(?TABLE, Pid),
#storage{user_count = Atomics} = persistent_term:get(?STORAGE),
atomics:sub(Atomics, SupNum, 1),
case atomics:sub_get(Atomics, 1, 1) of
0 ->
Expand All @@ -93,24 +102,22 @@ decr_no_of_users(SupNum) when SupNum > 1 ->
ok
end.

-spec start_child(amoc:scenario(), amoc_scenario:user_id(), any()) -> ok.
start_child(Scenario, Id, ScenarioState) ->
Sup = get_sup_for_user_id(Id),
amoc_users_worker_sup:start_child(Sup, Scenario, Id, ScenarioState).

-spec stop_child(pid(), boolean()) -> ok.
stop_child(Pid, Force) ->
amoc_users_worker_sup:stop_child(Pid, Force).
case ets:lookup(?TABLE, Pid) of
[Object] ->
Sup = gen_sup_from_userid(Object),
amoc_users_worker_sup:stop_children(Sup, [Pid], Force);
_ ->
ok
end.

%% Group all children based on ID to their respective worker supervisor and cast a request with each
%% group at once. This way we reduce the number of casts to each worker to always one, instead of
%% depending on the number of users.
-spec start_children(amoc:scenario(), [amoc_scenario:user_id()], any()) -> ok.
start_children(Scenario, UserIds, ScenarioState) ->
State = persistent_term:get(?MODULE),
#storage{sups = Supervisors, sups_indexed = IndexedSups, sups_count = SupCount} = State,
Acc = maps:from_list([ {Sup, []} || {Sup, _} <- IndexedSups ]),
Assignments = assign_users_to_sups(SupCount, Supervisors, UserIds, Acc),
Assignments = maps:groups_from_list(fun gen_sup_from_userid/1, UserIds),
CastFun = fun(Sup, Users) ->
amoc_users_worker_sup:start_children(Sup, Scenario, Users, ScenarioState)
end,
Expand All @@ -120,47 +127,53 @@ start_children(Scenario, UserIds, ScenarioState) ->
%% in order to load-balance the request among all workers.
-spec stop_children(non_neg_integer(), boolean()) -> non_neg_integer().
stop_children(Count, Force) ->
{CountRemove, Assignments} = assign_counts(Count),
[ amoc_users_worker_sup:stop_children(Sup, Int, Force) || {Sup, Int} <- Assignments ],
CountRemove.
Users = case ets:match_object(?TABLE, '$1', Count) of
'$end_of_table' ->
[];
{Objects, _} ->
Objects
end,
stop_children_assignments(Users, Force),
length(Users).

-spec get_all_children() -> [{pid(), amoc_scenario:user_id()}].
get_all_children() ->
#storage{sups_indexed = IndexedSups} = persistent_term:get(?MODULE),
All = [ amoc_users_worker_sup:get_all_children(Sup) || {Sup, _} <- IndexedSups ],
lists:flatten(All).
ets:tab2list(?TABLE).

-spec terminate_all_children() -> any().
terminate_all_children() ->
#storage{sups_indexed = IndexedSups} = persistent_term:get(?MODULE),
[ amoc_users_worker_sup:terminate_all_children(Sup) || {Sup, _} <- IndexedSups ].
Match = ets:match_object(?TABLE, '$1', 500),
do_terminate_all_my_children(Match).

-spec stop_children_assignments([{pid(), amoc_scenario:user_id()}], boolean()) -> ok.
stop_children_assignments(Users, Force) ->
Assignments = maps:groups_from_list(fun gen_sup_from_userid/1, fun get_pid/1, Users),
CastFun = fun(Sup, Assignment) ->
amoc_users_worker_sup:stop_children(Sup, Assignment, Force)
end,
maps:foreach(CastFun, Assignments).

%% ets:continuation/0 type is unfortunately not exported from the ets module.
-spec do_terminate_all_my_children({[tuple()], term()} | '$end_of_table') -> ok.
do_terminate_all_my_children({Users, Continuation}) ->
stop_children_assignments(Users, true),
Match = ets:match_object(Continuation),
do_terminate_all_my_children(Match);
do_terminate_all_my_children('$end_of_table') ->
ok.

%% Helpers
-spec get_sup_for_user_id(amoc_scenario:user_id()) -> pid().
get_sup_for_user_id(Id) ->
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE),
-spec gen_sup_from_userid({pid(), amoc_scenario:user_id()} | amoc_scenario:user_id()) -> pid().
gen_sup_from_userid({_Pid, Id}) ->
gen_sup_from_userid(Id);
gen_sup_from_userid(Id) ->
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?STORAGE),
Index = erlang:phash2(Id, SupCount) + 1,
element(Index, Supervisors).

%% assign which users each worker will be requested to add
-spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) ->
Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}.
assign_users_to_sups(SupCount, Supervisors, [Id | Ids], Acc) ->
Index = erlang:phash2(Id, SupCount) + 1,
ChosenSup = element(Index, Supervisors),
Vs = maps:get(ChosenSup, Acc),
NewAcc = Acc#{ChosenSup := [Id | Vs]},
assign_users_to_sups(SupCount, Supervisors, Ids, NewAcc);
assign_users_to_sups(_, _, [], Acc) ->
Acc.

%% assign how many users each worker will be requested to remove,
%% taking care of the fact that worker might not have enough users.
-spec assign_counts(count()) -> {count(), assignment()}.
assign_counts(Total) ->
#storage{user_count = Atomics, sups_indexed = Indexed} = persistent_term:get(?MODULE),
SupervisorsWithCounts = [ {Sup, atomics:get(Atomics, SupPos)} || {Sup, SupPos} <- Indexed ],
distribute(Total, SupervisorsWithCounts).
-spec get_pid({pid(), amoc_scenario:user_id()}) -> pid().
get_pid({Pid, _}) ->
Pid.

-spec distribute(count(), assignment()) -> {count(), assignment()}.
distribute(Total, SupervisorsWithCounts) ->
Expand Down
Loading

0 comments on commit 42d65c5

Please sign in to comment.