Skip to content

Commit

Permalink
Fixes and improvements to compaction jobs and to tasks app (#6276)
Browse files Browse the repository at this point in the history
- Move auto compaction code from kz_tasks_trigger module to
kt_compactor module.

- Make kz_tasks_trigger gen_server work as a local gen_server (not
using kz_globals).

- Fix compaction job's (csv tasks/auto compaction) call_id by
prefixing it with `YYYYMM-` so it can be retrieved in later months.

- Remove kz_globals usage from tasks app.
  • Loading branch information
harenson authored and jamesaimonetti committed Feb 29, 2020
1 parent bc4ad54 commit f21b4e9
Show file tree
Hide file tree
Showing 19 changed files with 275 additions and 196 deletions.
10 changes: 10 additions & 0 deletions applications/crossbar/priv/api/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -37501,6 +37501,16 @@
"minimum": 1,
"type": "integer"
},
"compaction_list_dbs_chunk_size": {
"default": 20,
"description": "How many dbs to read between pauses",
"type": "integer"
},
"compaction_list_dbs_pause_ms": {
"default": 200,
"description": "How long to pause before attempting to get the next chunk of dbs",
"type": "integer"
},
"crawler_delay_time_ms": {
"default": 60000,
"description": "tasks crawler delay time in milliseconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@
"minimum": 1,
"type": "integer"
},
"compaction_list_dbs_chunk_size": {
"default": 20,
"description": "How many dbs to read between pauses",
"type": "integer"
},
"compaction_list_dbs_pause_ms": {
"default": 200,
"description": "How long to pause before attempting to get the next chunk of dbs",
"type": "integer"
},
"crawler_delay_time_ms": {
"default": 60000,
"description": "tasks crawler delay time in milliseconds",
Expand Down
8 changes: 8 additions & 0 deletions applications/crossbar/priv/oas3/oas3-schemas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13479,6 +13479,14 @@
'description': tasks browse dbs interval in seconds
'minimum': 1
'type': integer
'compaction_list_dbs_chunk_size':
'default': 20
'description': How many dbs to read between pauses
'type': integer
'compaction_list_dbs_pause_ms':
'default': 200
'description': How long to pause before attempting to get the next chunk of dbs
'type': integer
'crawler_delay_time_ms':
'default': 60000
'description': tasks crawler delay time in milliseconds
Expand Down
26 changes: 13 additions & 13 deletions applications/tasks/src/kt_number_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link(?SERVER, [], []).
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

-spec stop() -> 'ok'.
stop() ->
gen_server:cast(?SERVER, 'stop').

-spec crawl_numbers() -> 'ok'.
crawl_numbers() ->
kz_log:put_callid(?SERVER),
kz_log:put_callid(?MODULE),
lager:debug("beginning a number crawl"),
lists:foreach(fun(Num) ->
crawl_number_db(Num),
Expand All @@ -89,8 +89,8 @@ crawl_numbers() ->
%%------------------------------------------------------------------------------
-spec init([]) -> {'ok', state()}.
init([]) ->
kz_log:put_callid(?SERVER),
lager:debug("started ~s", [?SERVER]),
kz_log:put_callid(?MODULE),
lager:debug("started ~s", [?MODULE]),
{'ok', #state{cleanup_ref = cleanup_timer()}}.

%%------------------------------------------------------------------------------
Expand Down Expand Up @@ -134,7 +134,7 @@ handle_info(_Msg, State) ->
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("~s terminating: ~p", [?SERVER, _Reason]).
lager:debug("~s terminating: ~p", [?MODULE, _Reason]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand All @@ -156,12 +156,12 @@ code_change(_OldVsn, State, _Extra) ->
cleanup_timer() ->
erlang:start_timer(?TIME_BETWEEN_CRAWLS, self(), 'ok').

-spec crawl_number_db(kz_term:ne_binary()) -> ok.
-spec crawl_number_db(kz_term:ne_binary()) -> 'ok'.
crawl_number_db(Db) ->
case kz_datamgr:all_docs(Db, [include_docs]) of
{error, _E} ->
case kz_datamgr:all_docs(Db, ['include_docs']) of
{'error', _E} ->
lager:debug("failed to crawl number db ~s: ~p", [Db, _E]);
{ok, JObjs} ->
{'ok', JObjs} ->
lager:debug("starting to crawl '~s'", [Db]),
_ = knm_pipe:pipe(knm_ops:from_jobjs(JObjs)
,[fun maybe_edit/1
Expand Down Expand Up @@ -194,16 +194,16 @@ maybe_edit_fold(PN, {ToRemove, ToSave}=To) ->
knm_phone_number:records().
maybe_remove(PN, ToRemove, Expiry) ->
case is_old_enough(PN, Expiry) of
false -> ToRemove;
true -> [PN|ToRemove]
'false' -> ToRemove;
'true' -> [PN|ToRemove]
end.

-spec maybe_transition_aging(knm_phone_number:record(), knm_phone_number:records(), integer()) ->
knm_phone_number:records().
maybe_transition_aging(PN, ToSave, Expiry) ->
case is_old_enough(PN, Expiry) of
false -> ToSave;
true ->
'false' -> ToSave;
'true' ->
lager:debug("transitioning number '~s' from ~s to ~s"
,[knm_phone_number:number(PN)
,?NUMBER_STATE_AGING
Expand Down
2 changes: 1 addition & 1 deletion applications/tasks/src/kt_port_request_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link(?MODULE, [], []).
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

-spec stop() -> 'ok'.
stop() ->
Expand Down
50 changes: 25 additions & 25 deletions applications/tasks/src/kz_account_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link(?SERVER, [], []).
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

-spec stop() -> ok.
-spec stop() -> 'ok'.
stop() ->
gen_server:cast(?SERVER, stop).
gen_server:cast(?SERVER, 'stop').

-spec check(kz_term:ne_binary()) -> 'ok'.
check(Account)
Expand All @@ -75,10 +75,10 @@ check(Account) ->
%% @doc Initializes the server.
%% @end
%%------------------------------------------------------------------------------
-spec init([]) -> {ok, state()}.
-spec init([]) -> {'ok', state()}.
init([]) ->
kz_log:put_callid(?SERVER),
lager:debug("started ~s", [?SERVER]),
kz_log:put_callid(?MODULE),
lager:debug("started ~s", [?MODULE]),
{'ok', #state{}}.

%%------------------------------------------------------------------------------
Expand All @@ -94,9 +94,9 @@ handle_call(_Request, _From, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast(stop, State) ->
handle_cast('stop', State) ->
lager:debug("crawler has been stopped"),
{stop, normal, State};
{'stop', 'normal', State};
handle_cast(_Msg, State) ->
lager:debug("unhandled cast: ~p", [_Msg]),
{'noreply', State}.
Expand All @@ -106,9 +106,9 @@ handle_cast(_Msg, State) ->
%% @end
%%------------------------------------------------------------------------------
-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = []
}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = []
}=State) ->
NewState =
case kz_datamgr:all_docs(?KZ_ACCOUNTS_DB) of
{'ok', JObjs} ->
Expand All @@ -123,24 +123,24 @@ handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
lager:warning("unable to list all docs in ~s: ~p", [?KZ_ACCOUNTS_DB, _R]),
State#state{cleanup_ref = cleanup_cycle_timer()}
end,
{noreply, NewState};
{'noreply', NewState};

handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId]
}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId]
}=State) ->
_ = crawl_account(AccountId),
lager:info("account crawler completed a full crawl"),
{noreply, State#state{cleanup_ref = cleanup_cycle_timer()
,account_ids = []
}};
{'noreply', State#state{cleanup_ref = cleanup_cycle_timer()
,account_ids = []
}};

handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId | AccountIds]
}=State) ->
handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref
,account_ids = [AccountId | AccountIds]
}=State) ->
_ = crawl_account(AccountId),
{noreply, State#state{cleanup_ref = cleanup_timer()
,account_ids = AccountIds
}};
{'noreply', State#state{cleanup_ref = cleanup_timer()
,account_ids = AccountIds
}};

handle_info(_Info, State) ->
lager:debug("unhandled msg: ~p", [_Info]),
Expand All @@ -155,7 +155,7 @@ handle_info(_Info, State) ->
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("~s terminating: ~p", [?SERVER, _Reason]).
lager:debug("~s terminating: ~p", [?MODULE, _Reason]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand Down
29 changes: 14 additions & 15 deletions applications/tasks/src/kz_notify_resend.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
}).
-type state() :: #state{}.

-define(NAME, ?MODULE).
-define(SERVER, {'via', 'kz_globals', ?NAME}).
-define(SERVER, ?MODULE).

-define(MOD_CONFIG_CAT, <<(?CONFIG_CAT)/binary, ".notify_resend">>).
-define(DEFAULT_TIMEOUT, 10 * ?MILLISECONDS_IN_SECOND).
Expand Down Expand Up @@ -100,13 +99,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
case gen_server:start_link(?SERVER, ?MODULE, [], []) of
{'error', {'already_started', Pid}}
when is_pid(Pid)->
erlang:link(Pid),
{'ok', Pid};
Other -> Other
end.
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

%%%=============================================================================
%%% gen_server callbacks
Expand All @@ -118,8 +111,8 @@ start_link() ->
%%------------------------------------------------------------------------------
-spec init([]) -> {'ok', state()}.
init([]) ->
kz_log:put_callid(?NAME),
lager:debug("~s has been started", [?NAME]),
kz_log:put_callid(?MODULE),
lager:debug("~s has been started", [?MODULE]),
{'ok', #state{timer_ref = set_timer()}}.

-spec stop() -> 'ok'.
Expand All @@ -128,9 +121,15 @@ stop() ->

-spec running() -> kz_json:objects().
running() ->
case kz_globals:where_is(?NAME) of
'undefined' -> [];
_ -> gen_server:call(?SERVER, 'running')
running(whereis(?MODULE)).

-spec running(kz_term:api_pid()) -> kz_json:objects().
running('undefined') ->
[];
running(Pid) ->
case is_process_alive(Pid) of
'true' -> gen_server:call(?SERVER, 'running');
'false' -> []
end.

-spec trigger_timeout() -> any().
Expand Down Expand Up @@ -223,7 +222,7 @@ handle_info(_Info, State) ->
%%------------------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->
lager:debug("terminating: ~p", [_Reason]).
lager:debug("~s terminating: ~p", [?MODULE, _Reason]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand Down
17 changes: 6 additions & 11 deletions applications/tasks/src/kz_tasks_scheduler.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%%%-----------------------------------------------------------------------------
%%% @copyright (C) 2016-2020, 2600Hz
%%% @doc Schedule one-off tasks only once per cluster
%%% @doc Schedule one-off tasks
%%% @author Pierre Fenoll
%%%
%%% This Source Code Form is subject to the terms of the Mozilla Public
Expand Down Expand Up @@ -47,7 +47,7 @@
-include_lib("kazoo_tasks/include/task_fields.hrl").
-include_lib("kazoo_stdlib/include/kazoo_json.hrl").

-define(SERVER, {'via', 'kz_globals', ?MODULE}).
-define(SERVER, ?MODULE).

-define(WAIT_AFTER_ROW
,kapps_config:get_non_neg_integer(?CONFIG_CAT, <<"wait_after_row_ms">>, 500)
Expand Down Expand Up @@ -86,12 +86,7 @@
%%------------------------------------------------------------------------------
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
case gen_server:start_link(?SERVER, ?MODULE, [], []) of
{'error', {'already_started', Pid}} ->
'true' = link(Pid),
{'ok', Pid};
Other -> Other
end.
gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []).

%%------------------------------------------------------------------------------
%% @doc
Expand Down Expand Up @@ -332,7 +327,7 @@ cleanup_task(API, Data) ->
-spec init([]) -> {'ok', state()}.
init([]) ->
_ = process_flag('trap_exit', 'true'),
lager:info("ensuring db ~s exists", [?KZ_TASKS_DB]),
lager:info("started ~s", [?MODULE]),
{'ok', #state{}}.

-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
Expand Down Expand Up @@ -638,8 +633,8 @@ task_api(Category, Action) ->
-spec worker_module(kz_json:object()) -> module().
worker_module(API) ->
case kz_tasks:input_mime(API) of
<<"none">> -> 'kz_task_worker_noinput';
_TextCSV -> 'kz_task_worker'
<<"none">> -> 'kt_task_worker_noinput';
_TextCSV -> 'kt_task_worker'
end.

%%% End of Module.
Loading

0 comments on commit f21b4e9

Please sign in to comment.