Skip to content

Commit

Permalink
Merge pull request #32 from esl/cets_status
Browse files Browse the repository at this point in the history
Add cets_status module
  • Loading branch information
NelsonVides authored Sep 14, 2023
2 parents ada5e7e + 7281810 commit def7da2
Show file tree
Hide file tree
Showing 5 changed files with 561 additions and 9 deletions.
35 changes: 29 additions & 6 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
send_dump/4,
table_name/1,
other_nodes/1,
get_nodes_request/1,
other_pids/1,
pause/1,
unpause/2,
Expand All @@ -47,6 +48,7 @@
delete_object_request/2,
delete_objects_request/2,
wait_response/2,
wait_responses/2,
init/1,
handle_call/3,
handle_cast/2,
Expand All @@ -73,13 +75,15 @@
ping/1,
info/1,
other_nodes/1,
get_nodes_request/1,
insert_request/2,
insert_many_request/2,
delete_request/2,
delete_many_request/2,
delete_object_request/2,
delete_objects_request/2,
wait_response/2
wait_response/2,
wait_responses/2
]).

-include_lib("kernel/include/logger.hrl").
Expand Down Expand Up @@ -131,6 +135,7 @@
| table_name
| get_info
| other_servers
| get_nodes
| {unpause, reference()}
| get_leader
| {set_leader, boolean()}
Expand All @@ -156,7 +161,9 @@
handle_conflict => handle_conflict_fun(),
handle_wrong_leader => handle_wrong_leader()
}.
-type response_return() :: {reply, ok} | {error, term()} | timeout.
%% Reply is usually ok
-type response_return() :: {reply, Reply :: term()} | {error, {_, _}} | timeout.
-type response_timeout() :: timeout() | {abs, integer()}.

-export_type([
request_id/0,
Expand All @@ -166,7 +173,9 @@
long_msg/0,
info/0,
table_name/0,
servers/0
servers/0,
response_return/0,
response_timeout/0
]).

%% API functions
Expand Down Expand Up @@ -284,8 +293,16 @@ delete_objects_request(Server, Objects) ->
cets_call:async_operation(Server, {delete_objects, Objects}).

-spec wait_response(request_id(), timeout()) -> response_return().
wait_response(Mon, Timeout) ->
gen_server:wait_response(Mon, Timeout).
wait_response(ReqId, Timeout) ->
gen_server:wait_response(ReqId, Timeout).

%% @doc Waits for multiple responses
%% Returns results in the same order as ReqIds
%% Blocks for maximum Timeout milliseconds
-spec wait_responses([request_id()], response_timeout()) ->
[response_return()].
wait_responses(ReqIds, Timeout) ->
cets_call:wait_responses(ReqIds, Timeout).

-spec get_leader(server_ref()) -> server_pid().
get_leader(Tab) when is_atom(Tab) ->
Expand All @@ -304,10 +321,14 @@ get_leader(Server) ->
gen_server:call(Server, get_leader).

%% Get a list of other nodes in the cluster that are connected together.
-spec other_nodes(server_ref()) -> [node()].
-spec other_nodes(server_ref()) -> ordsets:ordset(node()).
other_nodes(Server) ->
lists:usort(pids_to_nodes(other_pids(Server))).

-spec get_nodes_request(server_ref()) -> request_id().
get_nodes_request(Server) ->
gen_server:send_request(Server, get_nodes).

%% Get a list of other CETS processes that are handling this table.
-spec other_pids(server_ref()) -> servers().
other_pids(Server) ->
Expand Down Expand Up @@ -377,6 +398,8 @@ handle_call({op, Op}, From, State = #{pause_monitors := [_ | _], backlog := Back
{noreply, State#{backlog := [{Op, From} | Backlog]}};
handle_call(other_servers, _From, State = #{other_servers := Servers}) ->
{reply, Servers, State};
handle_call(get_nodes, _From, State = #{other_servers := Servers}) ->
{reply, lists:usort([node() | pids_to_nodes(Servers)]), State};
handle_call(get_leader, _From, State = #{leader := Leader}) ->
{reply, Leader, State};
handle_call(sync, From, State = #{other_servers := Servers}) ->
Expand Down
14 changes: 14 additions & 0 deletions src/cets_call.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-export([async_operation/2]).
-export([sync_operation/2]).
-export([send_leader_op/2]).
-export([wait_responses/2]).

-include_lib("kernel/include/logger.hrl").

Expand Down Expand Up @@ -91,3 +92,16 @@ send_leader_op(Server, Op, Backoff) ->
_ ->
Res
end.

%% Waits for multiple responces at once
-spec wait_responses([gen_server:request_id()], cets:response_timeout()) ->
[cets:response_return()].
wait_responses([ReqId], Timeout) ->
[gen_server:receive_response(ReqId, Timeout)];
wait_responses(ReqIds, Timeout) when is_integer(Timeout) ->
Start = erlang:monotonic_time(millisecond),
wait_responses(ReqIds, {abs, Start + Timeout});
wait_responses(ReqIds, {abs, _} = Timeout) ->
[gen_server:receive_response(ReqId, Timeout) || ReqId <- ReqIds];
wait_responses(ReqIds, infinity) ->
[gen_server:receive_response(ReqId, infinity) || ReqId <- ReqIds].
35 changes: 32 additions & 3 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
-module(cets_discovery).
-behaviour(gen_server).

-export([start/1, start_link/1, add_table/2, info/1, system_info/1, wait_for_ready/2]).
-export([
start/1,
start_link/1,
add_table/2,
delete_table/2,
get_tables/1,
info/1,
system_info/1,
wait_for_ready/2
]).
-export([
init/1,
handle_call/3,
Expand All @@ -38,7 +47,15 @@
]).

-ignore_xref([
start/1, start_link/1, add_table/2, info/1, system_info/1, wait_for_ready/2, behaviour_info/1
start/1,
start_link/1,
add_table/2,
delete_table/2,
get_tables/1,
info/1,
system_info/1,
wait_for_ready/2,
behaviour_info/1
]).

-include_lib("kernel/include/logger.hrl").
Expand All @@ -47,7 +64,7 @@
-type get_nodes_result() :: {ok, [node()]} | {error, term()}.
-type retry_type() :: initial | after_error | regular.

-export_type([get_nodes_result/0]).
-export_type([get_nodes_result/0, system_info/0]).

-type from() :: {pid(), reference()}.
-type join_result() :: #{
Expand Down Expand Up @@ -107,6 +124,10 @@ start_common(F, Opts) ->
add_table(Server, Table) ->
gen_server:cast(Server, {add_table, Table}).

-spec delete_table(server(), cets:table_name()) -> ok.
delete_table(Server, Table) ->
gen_server:cast(Server, {delete_table, Table}).

-spec get_tables(server()) -> {ok, [cets:table_name()]}.
get_tables(Server) ->
gen_server:call(Server, get_tables).
Expand Down Expand Up @@ -177,6 +198,14 @@ handle_cast({add_table, Table}, State = #{tables := Tables}) ->
State2 = State#{tables := ordsets:add_element(Table, Tables)},
{noreply, State2}
end;
handle_cast({delete_table, Table}, State = #{tables := Tables}) ->
case lists:member(Table, Tables) of
true ->
State2 = State#{tables := ordsets:del_element(Table, Tables)},
{noreply, State2};
false ->
{noreply, State}
end;
handle_cast(Msg, State) ->
?LOG_ERROR(#{what => unexpected_cast, msg => Msg}),
{noreply, State}.
Expand Down
Loading

0 comments on commit def7da2

Please sign in to comment.