Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use stored worker names #181

Merged
merged 4 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions src/wpool_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@

-export_type([wpool/0]).

-define(WPOOL_TABLE, ?MODULE).
-define(WPOOL_WORKERS, wpool_worker_names).

%% ===================================================================
%% API functions
%% ===================================================================
%% @doc Creates the ets table that will hold the information about active pools
-spec create_table() -> ok.
create_table() ->
_ = ets:new(?MODULE,
_ = ets:new(?WPOOL_TABLE,
[public, named_table, set, {read_concurrency, true}, {keypos, #wpool.name}]),
_ = ets:new(?WPOOL_WORKERS, [public, named_table, set, {read_concurrency, true}]),
ok.

%% @doc Starts a supervisor with several {@link wpool_process}es as its children
Expand Down Expand Up @@ -152,7 +156,8 @@ broadcast(Sup, Cast) ->

-spec all() -> [wpool:name()].
all() ->
[Name || #wpool{name = Name} <- ets:tab2list(?MODULE), find_wpool(Name) /= undefined].
[Name
|| #wpool{name = Name} <- ets:tab2list(?WPOOL_TABLE), find_wpool(Name) /= undefined].

%% @doc Retrieves the pool stats for all pools
-spec stats() -> [wpool:stats()].
Expand Down Expand Up @@ -231,11 +236,11 @@ task({_TaskId, Started, Task}) ->
%% @doc the number of workers in the pool
-spec wpool_size(atom()) -> non_neg_integer() | undefined.
wpool_size(Name) ->
try ets:update_counter(?MODULE, Name, {#wpool.size, 0}) of
try ets:update_counter(?WPOOL_TABLE, Name, {#wpool.size, 0}) of
WpoolSize ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
ets:delete(?WPOOL_TABLE, Name),
undefined;
_ ->
WpoolSize
Expand Down Expand Up @@ -361,6 +366,11 @@ init({Name, Options}) ->
%% @private
-spec worker_name(wpool:name(), pos_integer()) -> atom().
worker_name(Sup, I) ->
[{_, Worker}] = ets:lookup(?WPOOL_WORKERS, {Sup, I}),
Worker.

-spec build_worker_name(wpool:name(), pos_integer()) -> atom().
build_worker_name(Sup, I) ->
list_to_atom(?MODULE_STRING ++ [$- | atom_to_list(Sup)] ++ [$- | integer_to_list(I)]).

%% ===================================================================
Expand Down Expand Up @@ -435,20 +445,22 @@ all_workers(Wpool) ->
undefined ->
exit(no_workers);
_ ->
[wpool_pool:worker_name(Wpool, N) || N <- lists:seq(1, WPoolSize)]
[worker_name(Wpool, N) || N <- lists:seq(1, WPoolSize)]
end.

%% ===================================================================
%% ETS functions
%% ===================================================================
store_wpool(Wpool) ->
true = ets:insert(?MODULE, Wpool),
store_wpool(Wpool = #wpool{name = Name, size = Size}) ->
true = ets:insert(?WPOOL_TABLE, Wpool),
[ets:insert(?WPOOL_WORKERS, {{Name, I}, build_worker_name(Name, I)})
|| I <- lists:seq(1, Size)],
Wpool.

move_wpool(Name) ->
try
WpoolSize = ets:update_counter(?MODULE, Name, {#wpool.size, 0}),
ets:update_counter(?MODULE, Name, {#wpool.next, 1, WpoolSize, 1})
WpoolSize = ets:update_counter(?WPOOL_TABLE, Name, {#wpool.size, 0}),
ets:update_counter(?WPOOL_TABLE, Name, {#wpool.next, 1, WpoolSize, 1})
catch
_:badarg ->
case build_wpool(Name) of
Expand All @@ -462,11 +474,11 @@ move_wpool(Name) ->
%% @doc Use this function to get the Worker pool record in a custom worker.
-spec find_wpool(atom()) -> undefined | wpool().
find_wpool(Name) ->
try ets:lookup(?MODULE, Name) of
try ets:lookup(?WPOOL_TABLE, Name) of
[Wpool | _] ->
case erlang:whereis(Name) of
undefined ->
ets:delete(?MODULE, Name),
ets:delete(?WPOOL_TABLE, Name),
undefined;
_ ->
Wpool
Expand Down
13 changes: 9 additions & 4 deletions test/wpool_pool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -470,24 +470,24 @@ ets_mess_up(_Config) ->
Pool = ets_mess_up,

ct:comment("Mess up with ets table..."),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),

ct:comment("Rebuild stats"),
1 = proplists:get_value(next_worker, wpool:stats(Pool)),

ct:comment("Mess up with ets table again..."),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),
{ok, ok} = wpool:call(Pool, {io, format, ["1!~n"]}, random_worker),

ct:comment("Mess up with ets table once more..."),
{ok, ok} = wpool:call(Pool, {io, format, ["2!~n"]}, next_worker),
2 = proplists:get_value(next_worker, wpool:stats(Pool)),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),
{ok, ok} = wpool:call(Pool, {io, format, ["3!~n"]}, next_worker),
1 = proplists:get_value(next_worker, wpool:stats(Pool)),

ct:comment("Mess up with ets table one final time..."),
true = ets:delete(wpool_pool, Pool),
ets_deletes(Pool),
_ = wpool_pool:find_wpool(Pool),

ct:comment("Now, delete the pool"),
Expand All @@ -509,6 +509,7 @@ ets_mess_up(_Config) ->

ct:comment("And now delete the ets table altogether"),
true = ets:delete(wpool_pool),
true = ets:delete(wpool_worker_names),
_ = wpool_pool:find_wpool(Pool),

wpool:stop(),
Expand Down Expand Up @@ -545,3 +546,7 @@ send_io_format(Pool) ->
worker_msg_queue_lengths(Pool) ->
lists:usort([proplists:get_value(message_queue_len, WS)
|| {_, WS} <- proplists:get_value(workers, wpool:stats(Pool))]).

ets_deletes(Pool) ->
true = ets:delete(wpool_pool, Pool),
[ets:delete(wpool_worker_names, {Pool, I}) || I <- lists:seq(1, ?WORKERS)].