Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fixes and improvements to compaction jobs and to tasks app #6276

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions applications/crossbar/priv/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -37501,6 +37501,16 @@
"minimum": 1,
"type": "integer"
},
"compaction_list_dbs_chunk_size": {
"default": 20,
"description": "How many dbs to read between pauses",
"type": "integer"
},
"compaction_list_dbs_pause_ms": {
"default": 200,
"description": "How long to pause before attempting to get the next chunk of dbs",
"type": "integer"
},
"crawler_delay_time_ms": {
"default": 60000,
"description": "tasks crawler delay time in milliseconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
"minimum": 1,
"type": "integer"
},
"compaction_list_dbs_chunk_size": {
"default": 20,
"description": "How many dbs to read between pauses",
"type": "integer"
},
"compaction_list_dbs_pause_ms": {
"default": 200,
"description": "How long to pause before attempting to get the next chunk of dbs",
"type": "integer"
},
"crawler_delay_time_ms": {
"default": 60000,
"description": "tasks crawler delay time in milliseconds",
Expand Down
8 changes: 8 additions & 0 deletions applications/crossbar/priv/oas3/oas3-schemas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13479,6 +13479,14 @@
'description': tasks browse dbs interval in seconds
'minimum': 1
'type': integer
'compaction_list_dbs_chunk_size':
'default': 20
'description': How many dbs to read between pauses
'type': integer
'compaction_list_dbs_pause_ms':
'default': 200
'description': How long to pause before attempting to get the next chunk of dbs
'type': integer
'crawler_delay_time_ms':
'default': 60000
'description': tasks crawler delay time in milliseconds
Expand Down
26 changes: 13 additions & 13 deletions applications/tasks/src/kt_number_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link(?SERVER, [], []).
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

-spec stop() -> 'ok'.
stop() ->
gen_server:cast(?SERVER, 'stop').

-spec crawl_numbers() -> 'ok'.
crawl_numbers() ->
kz_log:put_callid(?SERVER),
kz_log:put_callid(?MODULE),
lager:debug("beginning a number crawl"),
lists:foreach(fun(Num) ->
crawl_number_db(Num),
Expand All @@ -89,8 +89,8 @@ crawl_numbers() ->
%%------------------------------------------------------------------------------
-spec init([]) -> {'ok', state()}.
init([]) ->
kz_log:put_callid(?SERVER),
lager:debug("started ~s", [?SERVER]),
kz_log:put_callid(?MODULE),
lager:debug("started ~s", [?MODULE]),
{'ok', #state{cleanup_ref = cleanup_timer()}}.

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -134,7 +134,7 @@ handle_info(_Msg, State) ->
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("~s terminating: ~p", [?SERVER, _Reason]).
lager:debug("~s terminating: ~p", [?MODULE, _Reason]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand All @@ -156,12 +156,12 @@ code_change(_OldVsn, State, _Extra) ->
cleanup_timer() ->
erlang:start_timer(?TIME_BETWEEN_CRAWLS, self(), 'ok').

-spec crawl_number_db(kz_term:ne_binary()) -> ok.
-spec crawl_number_db(kz_term:ne_binary()) -> 'ok'.
crawl_number_db(Db) ->
case kz_datamgr:all_docs(Db, [include_docs]) of
{error, _E} ->
case kz_datamgr:all_docs(Db, ['include_docs']) of
{'error', _E} ->
lager:debug("failed to crawl number db ~s: ~p", [Db, _E]);
{ok, JObjs} ->
{'ok', JObjs} ->
lager:debug("starting to crawl '~s'", [Db]),
_ = knm_pipe:pipe(knm_ops:from_jobjs(JObjs)
,[fun maybe_edit/1
Expand Down Expand Up @@ -194,16 +194,16 @@ maybe_edit_fold(PN, {ToRemove, ToSave}=To) ->
knm_phone_number:records().
maybe_remove(PN, ToRemove, Expiry) ->
case is_old_enough(PN, Expiry) of
false -> ToRemove;
true -> [PN|ToRemove]
'false' -> ToRemove;
'true' -> [PN|ToRemove]
end.

-spec maybe_transition_aging(knm_phone_number:record(), knm_phone_number:records(), integer()) ->
knm_phone_number:records().
maybe_transition_aging(PN, ToSave, Expiry) ->
case is_old_enough(PN, Expiry) of
false -> ToSave;
true ->
'false' -> ToSave;
'true' ->
lager:debug("transitioning number '~s' from ~s to ~s"
,[knm_phone_number:number(PN)
,?NUMBER_STATE_AGING
Expand Down
2 changes: 1 addition & 1 deletion applications/tasks/src/kt_port_request_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link(?MODULE, [], []).
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

-spec stop() -> 'ok'.
stop() ->
Expand Down
50 changes: 25 additions & 25 deletions applications/tasks/src/kz_account_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link(?SERVER, [], []).
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

-spec stop() -> ok.
-spec stop() -> 'ok'.
stop() ->
gen_server:cast(?SERVER, stop).
gen_server:cast(?SERVER, 'stop').

-spec check(kz_term:ne_binary()) -> 'ok'.
check(Account)
Expand All @@ -75,10 +75,10 @@ check(Account) ->
%% @doc Initializes the server.
%% @end
%%------------------------------------------------------------------------------
-spec init([]) -> {ok, state()}.
-spec init([]) -> {'ok', state()}.
init([]) ->
kz_log:put_callid(?SERVER),
lager:debug("started ~s", [?SERVER]),
kz_log:put_callid(?MODULE),
lager:debug("started ~s", [?MODULE]),
{'ok', #state{}}.

%%------------------------------------------------------------------------------
Expand All @@ -94,9 +94,9 @@ handle_call(_Request, _From, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast(stop, State) ->
handle_cast('stop', State) ->
lager:debug("crawler has been stopped"),
{stop, normal, State};
{'stop', 'normal', State};
handle_cast(_Msg, State) ->
lager:debug("unhandled cast: ~p", [_Msg]),
{'noreply', State}.
Expand All @@ -106,9 +106,9 @@ handle_cast(_Msg, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = []
}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = []
}=State) ->
NewState =
case kz_datamgr:all_docs(?KZ_ACCOUNTS_DB) of
{'ok', JObjs} ->
Expand All @@ -123,24 +123,24 @@ handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
lager:warning("unable to list all docs in ~s: ~p", [?KZ_ACCOUNTS_DB, _R]),
State#state{cleanup_ref = cleanup_cycle_timer()}
end,
{noreply, NewState};
{'noreply', NewState};

handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId]
}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId]
}=State) ->
_ = crawl_account(AccountId),
lager:info("account crawler completed a full crawl"),
{noreply, State#state{cleanup_ref = cleanup_cycle_timer()
,account_ids = []
}};
{'noreply', State#state{cleanup_ref = cleanup_cycle_timer()
,account_ids = []
}};

handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId | AccountIds]
}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId | AccountIds]
}=State) ->
_ = crawl_account(AccountId),
{noreply, State#state{cleanup_ref = cleanup_timer()
,account_ids = AccountIds
}};
{'noreply', State#state{cleanup_ref = cleanup_timer()
,account_ids = AccountIds
}};

handle_info(_Info, State) ->
lager:debug("unhandled msg: ~p", [_Info]),
Expand All @@ -155,7 +155,7 @@ handle_info(_Info, State) ->
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("~s terminating: ~p", [?SERVER, _Reason]).
lager:debug("~s terminating: ~p", [?MODULE, _Reason]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand Down
29 changes: 14 additions & 15 deletions applications/tasks/src/kz_notify_resend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
}).
-type state() :: #state{}.

-define(NAME, ?MODULE).
-define(SERVER, {'via', 'kz_globals', ?NAME}).
-define(SERVER, ?MODULE).

-define(MOD_CONFIG_CAT, <<(?CONFIG_CAT)/binary, ".notify_resend">>).
-define(DEFAULT_TIMEOUT, 10 * ?MILLISECONDS_IN_SECOND).
Expand Down Expand Up @@ -100,13 +99,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
case gen_server:start_link(?SERVER, ?MODULE, [], []) of
{'error', {'already_started', Pid}}
when is_pid(Pid)->
erlang:link(Pid),
{'ok', Pid};
Other -> Other
end.
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

%%%=============================================================================
%%% gen_server callbacks
Expand All @@ -118,8 +111,8 @@ start_link() ->
%%------------------------------------------------------------------------------
-spec init([]) -> {'ok', state()}.
init([]) ->
kz_log:put_callid(?NAME),
lager:debug("~s has been started", [?NAME]),
kz_log:put_callid(?MODULE),
lager:debug("~s has been started", [?MODULE]),
{'ok', #state{timer_ref = set_timer()}}.

-spec stop() -> 'ok'.
Expand All @@ -128,9 +121,15 @@ stop() ->

-spec running() -> kz_json:objects().
running() ->
case kz_globals:where_is(?NAME) of
'undefined' -> [];
_ -> gen_server:call(?SERVER, 'running')
running(whereis(?MODULE)).

-spec running(kz_term:api_pid()) -> kz_json:objects().
running('undefined') ->
[];
running(Pid) ->
case is_process_alive(Pid) of
'true' -> gen_server:call(?SERVER, 'running');
'false' -> []
end.

-spec trigger_timeout() -> any().
Expand Down Expand Up @@ -223,7 +222,7 @@ handle_info(_Info, State) ->
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("terminating: ~p", [_Reason]).
lager:debug("~s terminating: ~p", [?MODULE, _Reason]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand Down
17 changes: 6 additions & 11 deletions applications/tasks/src/kz_tasks_scheduler.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%%%-----------------------------------------------------------------------------
%%% @copyright (C) 2016-2020, 2600Hz
%%% @doc Schedule one-off tasks only once per cluster
%%% @doc Schedule one-off tasks
%%% @author Pierre Fenoll
%%%
%%% This Source Code Form is subject to the terms of the Mozilla Public
Expand Down Expand Up @@ -47,7 +47,7 @@
-include_lib("kazoo_tasks/include/task_fields.hrl").
-include_lib("kazoo_stdlib/include/kazoo_json.hrl").

-define(SERVER, {'via', 'kz_globals', ?MODULE}).
-define(SERVER, ?MODULE).

-define(WAIT_AFTER_ROW
,kapps_config:get_non_neg_integer(?CONFIG_CAT, <<"wait_after_row_ms">>, 500)
Expand Down Expand Up @@ -86,12 +86,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
case gen_server:start_link(?SERVER, ?MODULE, [], []) of
{'error', {'already_started', Pid}} ->
'true' = link(Pid),
{'ok', Pid};
Other -> Other
end.
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

%%------------------------------------------------------------------------------
%% @doc
Expand Down Expand Up @@ -332,7 +327,7 @@ cleanup_task(API, Data) ->
-spec init([]) -> {'ok', state()}.
init([]) ->
_ = process_flag('trap_exit', 'true'),
lager:info("ensuring db ~s exists", [?KZ_TASKS_DB]),
lager:info("started ~s", [?MODULE]),
{'ok', #state{}}.

-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
Expand Down Expand Up @@ -638,8 +633,8 @@ task_api(Category, Action) ->
-spec worker_module(kz_json:object()) -> module().
worker_module(API) ->
case kz_tasks:input_mime(API) of
<<"none">> -> 'kz_task_worker_noinput';
_TextCSV -> 'kz_task_worker'
<<"none">> -> 'kt_task_worker_noinput';
_TextCSV -> 'kt_task_worker'
end.

%%% End of Module.
Loading