Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cets_status module #32

Merged
merged 9 commits into from
Sep 14, 2023
Merged
35 changes: 29 additions & 6 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
send_dump/4,
table_name/1,
other_nodes/1,
get_nodes_request/1,
other_pids/1,
pause/1,
unpause/2,
Expand All @@ -47,6 +48,7 @@
delete_object_request/2,
delete_objects_request/2,
wait_response/2,
wait_responses/2,
init/1,
handle_call/3,
handle_cast/2,
Expand All @@ -73,13 +75,15 @@
ping/1,
info/1,
other_nodes/1,
get_nodes_request/1,
insert_request/2,
insert_many_request/2,
delete_request/2,
delete_many_request/2,
delete_object_request/2,
delete_objects_request/2,
wait_response/2
wait_response/2,
wait_responses/2
]).

-include_lib("kernel/include/logger.hrl").
Expand Down Expand Up @@ -131,6 +135,7 @@
| table_name
| get_info
| other_servers
| get_nodes
| {unpause, reference()}
| get_leader
| {set_leader, boolean()}
Expand All @@ -156,7 +161,9 @@
handle_conflict => handle_conflict_fun(),
handle_wrong_leader => handle_wrong_leader()
}.
-type response_return() :: {reply, ok} | {error, term()} | timeout.
%% Reply is usually ok
-type response_return() :: {reply, Reply :: term()} | {error, {_, _}} | timeout.
-type response_timeout() :: timeout() | {abs, integer()}.

-export_type([
request_id/0,
Expand All @@ -166,7 +173,9 @@
long_msg/0,
info/0,
table_name/0,
servers/0
servers/0,
response_return/0,
response_timeout/0
]).

%% API functions
Expand Down Expand Up @@ -284,8 +293,16 @@ delete_objects_request(Server, Objects) ->
cets_call:async_operation(Server, {delete_objects, Objects}).

-spec wait_response(request_id(), timeout()) -> response_return().
wait_response(Mon, Timeout) ->
gen_server:wait_response(Mon, Timeout).
wait_response(ReqId, Timeout) ->
gen_server:wait_response(ReqId, Timeout).

%% @doc Waits for multiple responses
%% Returns results in the same order as ReqIds
%% Blocks for maximum Timeout milliseconds
-spec wait_responses([request_id()], response_timeout()) ->
[response_return()].
wait_responses(ReqIds, Timeout) ->
cets_call:wait_responses(ReqIds, Timeout).

-spec get_leader(server_ref()) -> server_pid().
get_leader(Tab) when is_atom(Tab) ->
Expand All @@ -304,10 +321,14 @@ get_leader(Server) ->
gen_server:call(Server, get_leader).

%% Get a list of other nodes in the cluster that are connected together.
-spec other_nodes(server_ref()) -> [node()].
-spec other_nodes(server_ref()) -> ordsets:ordset(node()).
other_nodes(Server) ->
lists:usort(pids_to_nodes(other_pids(Server))).

-spec get_nodes_request(server_ref()) -> request_id().
get_nodes_request(Server) ->
gen_server:send_request(Server, get_nodes).

%% Get a list of other CETS processes that are handling this table.
-spec other_pids(server_ref()) -> servers().
other_pids(Server) ->
Expand Down Expand Up @@ -377,6 +398,8 @@ handle_call({op, Op}, From, State = #{pause_monitors := [_ | _], backlog := Back
{noreply, State#{backlog := [{Op, From} | Backlog]}};
handle_call(other_servers, _From, State = #{other_servers := Servers}) ->
{reply, Servers, State};
handle_call(get_nodes, _From, State = #{other_servers := Servers}) ->
{reply, lists:usort([node() | pids_to_nodes(Servers)]), State};
handle_call(get_leader, _From, State = #{leader := Leader}) ->
{reply, Leader, State};
handle_call(sync, From, State = #{other_servers := Servers}) ->
Expand Down
14 changes: 14 additions & 0 deletions src/cets_call.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-export([async_operation/2]).
-export([sync_operation/2]).
-export([send_leader_op/2]).
-export([wait_responses/2]).

-include_lib("kernel/include/logger.hrl").

Expand Down Expand Up @@ -91,3 +92,16 @@ send_leader_op(Server, Op, Backoff) ->
_ ->
Res
end.

%% Waits for multiple responces at once
-spec wait_responses([gen_server:request_id()], cets:response_timeout()) ->
[cets:response_return()].
wait_responses([ReqId], Timeout) ->
[gen_server:receive_response(ReqId, Timeout)];
wait_responses(ReqIds, Timeout) when is_integer(Timeout) ->
Start = erlang:monotonic_time(millisecond),
wait_responses(ReqIds, {abs, Start + Timeout});
wait_responses(ReqIds, {abs, _} = Timeout) ->
[gen_server:receive_response(ReqId, Timeout) || ReqId <- ReqIds];
wait_responses(ReqIds, infinity) ->
[gen_server:receive_response(ReqId, infinity) || ReqId <- ReqIds].
35 changes: 32 additions & 3 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
-module(cets_discovery).
-behaviour(gen_server).

-export([start/1, start_link/1, add_table/2, info/1, system_info/1, wait_for_ready/2]).
-export([
start/1,
start_link/1,
add_table/2,
delete_table/2,
get_tables/1,
info/1,
system_info/1,
wait_for_ready/2
]).
-export([
init/1,
handle_call/3,
Expand All @@ -38,7 +47,15 @@
]).

-ignore_xref([
start/1, start_link/1, add_table/2, info/1, system_info/1, wait_for_ready/2, behaviour_info/1
start/1,
start_link/1,
add_table/2,
delete_table/2,
get_tables/1,
info/1,
system_info/1,
wait_for_ready/2,
behaviour_info/1
]).

-include_lib("kernel/include/logger.hrl").
Expand All @@ -47,7 +64,7 @@
-type get_nodes_result() :: {ok, [node()]} | {error, term()}.
-type retry_type() :: initial | after_error | regular.

-export_type([get_nodes_result/0]).
-export_type([get_nodes_result/0, system_info/0]).

-type from() :: {pid(), reference()}.
-type join_result() :: #{
Expand Down Expand Up @@ -107,6 +124,10 @@ start_common(F, Opts) ->
add_table(Server, Table) ->
gen_server:cast(Server, {add_table, Table}).

-spec delete_table(server(), cets:table_name()) -> ok.
delete_table(Server, Table) ->
gen_server:cast(Server, {delete_table, Table}).

-spec get_tables(server()) -> {ok, [cets:table_name()]}.
get_tables(Server) ->
gen_server:call(Server, get_tables).
Expand Down Expand Up @@ -177,6 +198,14 @@ handle_cast({add_table, Table}, State = #{tables := Tables}) ->
State2 = State#{tables := ordsets:add_element(Table, Tables)},
{noreply, State2}
end;
handle_cast({delete_table, Table}, State = #{tables := Tables}) ->
case lists:member(Table, Tables) of
true ->
State2 = State#{tables := ordsets:del_element(Table, Tables)},
{noreply, State2};
false ->
{noreply, State}
end;
handle_cast(Msg, State) ->
?LOG_ERROR(#{what => unexpected_cast, msg => Msg}),
{noreply, State}.
Expand Down
Loading