From f21b4e9c035f09f0c1f2b0bb3d2429fb01bb4e3e Mon Sep 17 00:00:00 2001 From: Harenson Henao Date: Fri, 17 Jan 2020 00:35:10 +0000 Subject: [PATCH] Fixes and improvements to compaction jobs and to tasks app (#6276) - Move auto compaction code from kz_tasks_trigger module to kt_compactor module. - Make kz_tasks_trigger gen_server work as a local gen_server (not using kz_globals). - Fix compaction job's (csv tasks/auto compaction) call_id by prefixing it with `YYYYMM-` so it can be retrieved in later months. - Remove kz_globals usage from tasks app. --- applications/crossbar/priv/api/swagger.json | 10 ++ .../couchdb/schemas/system_config.tasks.json | 10 ++ .../crossbar/priv/oas3/oas3-schemas.yml | 8 + applications/tasks/src/kt_number_crawler.erl | 26 +-- .../tasks/src/kt_port_request_crawler.erl | 2 +- applications/tasks/src/kz_account_crawler.erl | 50 +++--- applications/tasks/src/kz_notify_resend.erl | 29 ++-- applications/tasks/src/kz_tasks_scheduler.erl | 17 +- applications/tasks/src/kz_tasks_trigger.erl | 78 +-------- .../src/modules/kt_compaction_reporter.erl | 16 +- .../tasks/src/modules/kt_compactor.erl | 163 ++++++++++++++---- .../tasks/src/modules/kt_compactor.hrl | 8 + .../tasks/src/modules/kt_compactor_worker.erl | 13 +- .../kt_task_worker.erl} | 2 +- .../kt_task_worker_noinput.erl} | 2 +- applications/tasks/src/tasks.app.src | 10 +- applications/tasks/src/tasks_maintenance.erl | 7 +- .../tasks/test/kt_compactor_tests.erl | 18 ++ core/kazoo_tasks/include/tasks.hrl | 2 + 19 files changed, 275 insertions(+), 196 deletions(-) rename applications/tasks/src/{kz_task_worker.erl => modules/kt_task_worker.erl} (99%) rename applications/tasks/src/{kz_task_worker_noinput.erl => modules/kt_task_worker_noinput.erl} (99%) diff --git a/applications/crossbar/priv/api/swagger.json b/applications/crossbar/priv/api/swagger.json index 57bf80f3695..1754ebca28c 100644 --- a/applications/crossbar/priv/api/swagger.json +++ b/applications/crossbar/priv/api/swagger.json @@ -37501,6 +37501,16 @@ "minimum": 1, "type": "integer" }, + "compaction_list_dbs_chunk_size": { + "default": 20, + "description": "How many dbs to read between pauses", + "type": "integer" + }, + "compaction_list_dbs_pause_ms": { + "default": 200, + "description": "How long to pause before attempting to get the next chunk of dbs", + "type": "integer" + }, "crawler_delay_time_ms": { "default": 60000, "description": "tasks crawler delay time in milliseconds", diff --git a/applications/crossbar/priv/couchdb/schemas/system_config.tasks.json b/applications/crossbar/priv/couchdb/schemas/system_config.tasks.json index 89ea7979e27..94891d5944a 100644 --- a/applications/crossbar/priv/couchdb/schemas/system_config.tasks.json +++ b/applications/crossbar/priv/couchdb/schemas/system_config.tasks.json @@ -21,6 +21,16 @@ "minimum": 1, "type": "integer" }, + "compaction_list_dbs_chunk_size": { + "default": 20, + "description": "How many dbs to read between pauses", + "type": "integer" + }, + "compaction_list_dbs_pause_ms": { + "default": 200, + "description": "How long to pause before attempting to get the next chunk of dbs", + "type": "integer" + }, "crawler_delay_time_ms": { "default": 60000, "description": "tasks crawler delay time in milliseconds", diff --git a/applications/crossbar/priv/oas3/oas3-schemas.yml b/applications/crossbar/priv/oas3/oas3-schemas.yml index 1a5ef22c1f8..adba4d708da 100644 --- a/applications/crossbar/priv/oas3/oas3-schemas.yml +++ b/applications/crossbar/priv/oas3/oas3-schemas.yml @@ -13479,6 +13479,14 @@ 'description': tasks browse dbs interval in seconds 'minimum': 1 'type': integer + 'compaction_list_dbs_chunk_size': + 'default': 20 + 'description': How many dbs to read between pauses + 'type': integer + 'compaction_list_dbs_pause_ms': + 'default': 200 + 'description': How long to pause before attempting to get the next chunk of dbs + 'type': integer 'crawler_delay_time_ms': 'default': 60000 'description': tasks crawler delay time in milliseconds diff --git a/applications/tasks/src/kt_number_crawler.erl b/applications/tasks/src/kt_number_crawler.erl index 7a1ff45eee9..b1aa189714d 100644 --- a/applications/tasks/src/kt_number_crawler.erl +++ b/applications/tasks/src/kt_number_crawler.erl @@ -60,7 +60,7 @@ %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - gen_server:start_link(?SERVER, [], []). + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). -spec stop() -> 'ok'. stop() -> @@ -68,7 +68,7 @@ stop() -> -spec crawl_numbers() -> 'ok'. crawl_numbers() -> - kz_log:put_callid(?SERVER), + kz_log:put_callid(?MODULE), lager:debug("beginning a number crawl"), lists:foreach(fun(Num) -> crawl_number_db(Num), @@ -89,8 +89,8 @@ crawl_numbers() -> %%------------------------------------------------------------------------------ -spec init([]) -> {'ok', state()}. init([]) -> - kz_log:put_callid(?SERVER), - lager:debug("started ~s", [?SERVER]), + kz_log:put_callid(?MODULE), + lager:debug("started ~s", [?MODULE]), {'ok', #state{cleanup_ref = cleanup_timer()}}. %%------------------------------------------------------------------------------ @@ -134,7 +134,7 @@ handle_info(_Msg, State) -> %%------------------------------------------------------------------------------ -spec terminate(any(), state()) -> 'ok'. terminate(_Reason, _State) -> - lager:debug("~s terminating: ~p", [?SERVER, _Reason]). + lager:debug("~s terminating: ~p", [?MODULE, _Reason]). %%------------------------------------------------------------------------------ %% @doc Convert process state when code is changed. @@ -156,12 +156,12 @@ code_change(_OldVsn, State, _Extra) -> cleanup_timer() -> erlang:start_timer(?TIME_BETWEEN_CRAWLS, self(), 'ok'). --spec crawl_number_db(kz_term:ne_binary()) -> ok. +-spec crawl_number_db(kz_term:ne_binary()) -> 'ok'. crawl_number_db(Db) -> - case kz_datamgr:all_docs(Db, [include_docs]) of - {error, _E} -> + case kz_datamgr:all_docs(Db, ['include_docs']) of + {'error', _E} -> lager:debug("failed to crawl number db ~s: ~p", [Db, _E]); - {ok, JObjs} -> + {'ok', JObjs} -> lager:debug("starting to crawl '~s'", [Db]), _ = knm_pipe:pipe(knm_ops:from_jobjs(JObjs) ,[fun maybe_edit/1 @@ -194,16 +194,16 @@ maybe_edit_fold(PN, {ToRemove, ToSave}=To) -> knm_phone_number:records(). maybe_remove(PN, ToRemove, Expiry) -> case is_old_enough(PN, Expiry) of - false -> ToRemove; - true -> [PN|ToRemove] + 'false' -> ToRemove; + 'true' -> [PN|ToRemove] end. -spec maybe_transition_aging(knm_phone_number:record(), knm_phone_number:records(), integer()) -> knm_phone_number:records(). maybe_transition_aging(PN, ToSave, Expiry) -> case is_old_enough(PN, Expiry) of - false -> ToSave; - true -> + 'false' -> ToSave; + 'true' -> lager:debug("transitioning number '~s' from ~s to ~s" ,[knm_phone_number:number(PN) ,?NUMBER_STATE_AGING diff --git a/applications/tasks/src/kt_port_request_crawler.erl b/applications/tasks/src/kt_port_request_crawler.erl index 1fd9bf8fdc3..959286fcaa2 100644 --- a/applications/tasks/src/kt_port_request_crawler.erl +++ b/applications/tasks/src/kt_port_request_crawler.erl @@ -46,7 +46,7 @@ %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - gen_server:start_link(?MODULE, [], []). + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). -spec stop() -> 'ok'. stop() -> diff --git a/applications/tasks/src/kz_account_crawler.erl b/applications/tasks/src/kz_account_crawler.erl index c2c8876865d..6c6612aec49 100644 --- a/applications/tasks/src/kz_account_crawler.erl +++ b/applications/tasks/src/kz_account_crawler.erl @@ -48,11 +48,11 @@ %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - gen_server:start_link(?SERVER, [], []). + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). --spec stop() -> ok. +-spec stop() -> 'ok'. stop() -> - gen_server:cast(?SERVER, stop). + gen_server:cast(?SERVER, 'stop'). -spec check(kz_term:ne_binary()) -> 'ok'. check(Account) @@ -75,10 +75,10 @@ check(Account) -> %% @doc Initializes the server. %% @end %%------------------------------------------------------------------------------ --spec init([]) -> {ok, state()}. +-spec init([]) -> {'ok', state()}. init([]) -> - kz_log:put_callid(?SERVER), - lager:debug("started ~s", [?SERVER]), + kz_log:put_callid(?MODULE), + lager:debug("started ~s", [?MODULE]), {'ok', #state{}}. %%------------------------------------------------------------------------------ @@ -94,9 +94,9 @@ handle_call(_Request, _From, State) -> %% @end %%------------------------------------------------------------------------------ -spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()). -handle_cast(stop, State) -> +handle_cast('stop', State) -> lager:debug("crawler has been stopped"), - {stop, normal, State}; + {'stop', 'normal', State}; handle_cast(_Msg, State) -> lager:debug("unhandled cast: ~p", [_Msg]), {'noreply', State}. @@ -106,9 +106,9 @@ handle_cast(_Msg, State) -> %% @end %%------------------------------------------------------------------------------ -spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()). -handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref - ,account_ids = [] - }=State) -> +handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref + ,account_ids = [] + }=State) -> NewState = case kz_datamgr:all_docs(?KZ_ACCOUNTS_DB) of {'ok', JObjs} -> @@ -123,24 +123,24 @@ handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref lager:warning("unable to list all docs in ~s: ~p", [?KZ_ACCOUNTS_DB, _R]), State#state{cleanup_ref = cleanup_cycle_timer()} end, - {noreply, NewState}; + {'noreply', NewState}; -handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref - ,account_ids = [AccountId] - }=State) -> +handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref + ,account_ids = [AccountId] + }=State) -> _ = crawl_account(AccountId), lager:info("account crawler completed a full crawl"), - {noreply, State#state{cleanup_ref = cleanup_cycle_timer() - ,account_ids = [] - }}; + {'noreply', State#state{cleanup_ref = cleanup_cycle_timer() + ,account_ids = [] + }}; -handle_info({timeout, Ref, _Msg}, #state{cleanup_ref = Ref - ,account_ids = [AccountId | AccountIds] - }=State) -> +handle_info({'timeout', Ref, _Msg}, #state{cleanup_ref = Ref + ,account_ids = [AccountId | AccountIds] + }=State) -> _ = crawl_account(AccountId), - {noreply, State#state{cleanup_ref = cleanup_timer() - ,account_ids = AccountIds - }}; + {'noreply', State#state{cleanup_ref = cleanup_timer() + ,account_ids = AccountIds + }}; handle_info(_Info, State) -> lager:debug("unhandled msg: ~p", [_Info]), @@ -155,7 +155,7 @@ handle_info(_Info, State) -> %%------------------------------------------------------------------------------ -spec terminate(any(), state()) -> 'ok'. terminate(_Reason, _State) -> - lager:debug("~s terminating: ~p", [?SERVER, _Reason]). + lager:debug("~s terminating: ~p", [?MODULE, _Reason]). %%------------------------------------------------------------------------------ %% @doc Convert process state when code is changed. diff --git a/applications/tasks/src/kz_notify_resend.erl b/applications/tasks/src/kz_notify_resend.erl index 81e1b7de3ff..7d2d053acf5 100644 --- a/applications/tasks/src/kz_notify_resend.erl +++ b/applications/tasks/src/kz_notify_resend.erl @@ -31,8 +31,7 @@ }). -type state() :: #state{}. --define(NAME, ?MODULE). --define(SERVER, {'via', 'kz_globals', ?NAME}). +-define(SERVER, ?MODULE). -define(MOD_CONFIG_CAT, <<(?CONFIG_CAT)/binary, ".notify_resend">>). -define(DEFAULT_TIMEOUT, 10 * ?MILLISECONDS_IN_SECOND). @@ -100,13 +99,7 @@ %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - case gen_server:start_link(?SERVER, ?MODULE, [], []) of - {'error', {'already_started', Pid}} - when is_pid(Pid)-> - erlang:link(Pid), - {'ok', Pid}; - Other -> Other - end. + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). %%%============================================================================= %%% gen_server callbacks @@ -118,8 +111,8 @@ start_link() -> %%------------------------------------------------------------------------------ -spec init([]) -> {'ok', state()}. init([]) -> - kz_log:put_callid(?NAME), - lager:debug("~s has been started", [?NAME]), + kz_log:put_callid(?MODULE), + lager:debug("~s has been started", [?MODULE]), {'ok', #state{timer_ref = set_timer()}}. -spec stop() -> 'ok'. @@ -128,9 +121,15 @@ stop() -> -spec running() -> kz_json:objects(). running() -> - case kz_globals:where_is(?NAME) of - 'undefined' -> []; - _ -> gen_server:call(?SERVER, 'running') + running(whereis(?MODULE)). + +-spec running(kz_term:api_pid()) -> kz_json:objects(). +running('undefined') -> + []; +running(Pid) -> + case is_process_alive(Pid) of + 'true' -> gen_server:call(?SERVER, 'running'); + 'false' -> [] end. -spec trigger_timeout() -> any(). @@ -223,7 +222,7 @@ handle_info(_Info, State) -> %%------------------------------------------------------------------------------ -spec terminate(any(), state()) -> 'ok'. terminate(_Reason, _State) -> - lager:debug("terminating: ~p", [_Reason]). + lager:debug("~s terminating: ~p", [?MODULE, _Reason]). %%------------------------------------------------------------------------------ %% @doc Convert process state when code is changed. diff --git a/applications/tasks/src/kz_tasks_scheduler.erl b/applications/tasks/src/kz_tasks_scheduler.erl index b5843070953..227630a7c89 100644 --- a/applications/tasks/src/kz_tasks_scheduler.erl +++ b/applications/tasks/src/kz_tasks_scheduler.erl @@ -1,6 +1,6 @@ %%%----------------------------------------------------------------------------- %%% @copyright (C) 2016-2020, 2600Hz -%%% @doc Schedule one-off tasks only once per cluster +%%% @doc Schedule one-off tasks %%% @author Pierre Fenoll %%% %%% This Source Code Form is subject to the terms of the Mozilla Public @@ -47,7 +47,7 @@ -include_lib("kazoo_tasks/include/task_fields.hrl"). -include_lib("kazoo_stdlib/include/kazoo_json.hrl"). --define(SERVER, {'via', 'kz_globals', ?MODULE}). +-define(SERVER, ?MODULE). -define(WAIT_AFTER_ROW ,kapps_config:get_non_neg_integer(?CONFIG_CAT, <<"wait_after_row_ms">>, 500) @@ -86,12 +86,7 @@ %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - case gen_server:start_link(?SERVER, ?MODULE, [], []) of - {'error', {'already_started', Pid}} -> - 'true' = link(Pid), - {'ok', Pid}; - Other -> Other - end. + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% @doc @@ -332,7 +327,7 @@ cleanup_task(API, Data) -> -spec init([]) -> {'ok', state()}. init([]) -> _ = process_flag('trap_exit', 'true'), - lager:info("ensuring db ~s exists", [?KZ_TASKS_DB]), + lager:info("started ~s", [?MODULE]), {'ok', #state{}}. -spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()). @@ -638,8 +633,8 @@ task_api(Category, Action) -> -spec worker_module(kz_json:object()) -> module(). worker_module(API) -> case kz_tasks:input_mime(API) of - <<"none">> -> 'kz_task_worker_noinput'; - _TextCSV -> 'kz_task_worker' + <<"none">> -> 'kt_task_worker_noinput'; + _TextCSV -> 'kt_task_worker' end. %%% End of Module. diff --git a/applications/tasks/src/kz_tasks_trigger.erl b/applications/tasks/src/kz_tasks_trigger.erl index 711d1581c04..1e0a14b7c6b 100644 --- a/applications/tasks/src/kz_tasks_trigger.erl +++ b/applications/tasks/src/kz_tasks_trigger.erl @@ -14,7 +14,6 @@ -export([start_link/0]). -export([status/0]). --export([browse_dbs_for_triggers/1]). %%% gen_server callbacks -export([init/1 @@ -34,7 +33,7 @@ -include("tasks.hrl"). --define(SERVER, {'via', 'kz_globals', ?MODULE}). +-define(SERVER, ?MODULE). -record(state, {minute_ref = minute_timer() :: reference() ,hour_ref = hour_timer() :: reference() @@ -66,13 +65,7 @@ status() -> %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - case gen_server:start_link(?SERVER, ?MODULE, [], []) of - {'error', {'already_started', Pid}} -> - 'true' = link(Pid), - {'ok', Pid}; - Other -> Other - end. - + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). %%%============================================================================= %%% gen_server callbacks @@ -116,7 +109,7 @@ handle_call(_Request, _From, State) -> %%------------------------------------------------------------------------------ -spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()). handle_cast({'cleanup_finished', Ref}, #state{browse_dbs_ref = Ref}=State) -> - lager:debug("cleanup finished for ~p, starting timer", [Ref]), + lager:info("cleanup finished for ~p, starting timer", [Ref]), {'noreply', State#state{browse_dbs_ref = browse_dbs_timer()}, 'hibernate'}; handle_cast(_Msg, State) -> @@ -148,8 +141,8 @@ handle_info({'timeout', Ref, _Msg}, #state{day_ref = Ref}=State) -> {'noreply', State#state{day_ref = day_timer()}}; handle_info({'timeout', Ref, _Msg}, #state{browse_dbs_ref = Ref}=State) -> - _Pid = kz_process:spawn(fun browse_dbs_for_triggers/1, [Ref]), - lager:debug("cleaning up in ~p(~p)", [_Pid, Ref]), + _ = kz_process:spawn(fun tasks_bindings:map/2, [?TRIGGER_AUTO_COMPACTION, Ref]), + lager:info("triggering auto compaction job with ref ~p", [Ref]), {'noreply', State}; handle_info(_Info, State) -> @@ -247,65 +240,4 @@ ref_to_id(Ref) -> <">> = Bin, Id. -%% ======================================================================================= -%% Start - Automatic Compaction Section -%% ======================================================================================= - -%%------------------------------------------------------------------------------ -%% @doc Entry point for starting the automatic compaction job. -%% -%% This functions gets triggered by the `browse_dbs_ref' based on `browse_dbs_timer' -%% function. By default it triggers the action 1 day after the timer starts. -%% @end -%%------------------------------------------------------------------------------ --spec browse_dbs_for_triggers(atom() | reference()) -> 'ok'. -browse_dbs_for_triggers(Ref) -> - CallId = <<"cleanup_pass_", (kz_binary:rand_hex(4))/binary>>, - kz_log:put_callid(CallId), - lager:debug("starting cleanup pass of databases"), - lager:debug("getting databases list and sorting them by disk size"), - Sorted = kt_compactor:get_all_dbs_and_sort_by_disk(), - TotalSorted = length(Sorted), - lager:debug("finished listing and sorting databases (~p found)", [TotalSorted]), - 'ok' = kt_compaction_reporter:start_tracking_job(self(), node(), CallId, Sorted), - F = fun({Db, _Sizes}, Counter) -> - lager:debug("compacting ~p ~p/~p (~p remaining)", - [Db, Counter, TotalSorted, (TotalSorted - Counter)] - ), - cleanup_pass(Db), - Counter + 1 - end, - _Counter = lists:foldl(F, 1, Sorted), - 'ok' = kt_compaction_reporter:stop_tracking_job(CallId), - kz_log:put_callid('undefined'), % Reset callid - lager:debug("pass completed for ~p", [Ref]), - gen_server:cast(?SERVER, {'cleanup_finished', Ref}). - --spec cleanup_pass(kz_term:ne_binary()) -> boolean(). -cleanup_pass(Db) -> - _ = tasks_bindings:map(db_to_trigger(Db), Db), - erlang:garbage_collect(self()). - --spec db_to_trigger(kz_term:ne_binary()) -> kz_term:ne_binary(). -db_to_trigger(Db) -> - Classifiers = [{fun kapps_util:is_account_db/1, ?TRIGGER_ACCOUNT} - ,{fun kapps_util:is_account_mod/1, ?TRIGGER_ACCOUNT_MOD} - ,{fun is_system_db/1, ?TRIGGER_SYSTEM} - ], - db_to_trigger(Db, Classifiers). - -db_to_trigger(_Db, []) -> ?TRIGGER_OTHER; -db_to_trigger(Db, [{Classifier, Trigger} | Classifiers]) -> - case Classifier(Db) of - 'true' -> Trigger; - 'false' -> db_to_trigger(Db, Classifiers) - end. - --spec is_system_db(kz_term:ne_binary()) -> boolean(). -is_system_db(Db) -> - lists:member(Db, ?KZ_SYSTEM_DBS). -%% ======================================================================================= -%% End - Automatic Compaction Section -%% ======================================================================================= - %%% End of Module. diff --git a/applications/tasks/src/modules/kt_compaction_reporter.erl b/applications/tasks/src/modules/kt_compaction_reporter.erl index 95f18740bcc..b9505e4d824 100644 --- a/applications/tasks/src/modules/kt_compaction_reporter.erl +++ b/applications/tasks/src/modules/kt_compaction_reporter.erl @@ -1,6 +1,9 @@ %%%----------------------------------------------------------------------------- %%% @copyright (C) 2020-, 2600Hz %%% @doc +%%% Collect and save/store compaction job's information for jobs started via sup commands, +%%% CSV JOBS app, or auto compaction trigger. +%%% %%% This Source Code Form is subject to the terms of the Mozilla Public %%% License, v. 2.0. If a copy of the MPL was not distributed with this %%% file, You can obtain one at https://mozilla.org/MPL/2.0/. @@ -74,12 +77,7 @@ %%------------------------------------------------------------------------------ -spec start_link() -> kz_types:startlink_ret(). start_link() -> - case gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []) of - {'error', {'already_started', Pid}} -> - 'true' = link(Pid), - {'ok', Pid}; - Other -> Other - end. + gen_server:start_link({'local', ?MODULE}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% @doc Start tracking a compaction job @@ -234,6 +232,7 @@ job_info(<>) -> %%------------------------------------------------------------------------------ -spec init([]) -> {'ok', state()}. init([]) -> + lager:info("started ~s", [?MODULE]), {'ok', #{}}. %%------------------------------------------------------------------------------ @@ -255,6 +254,7 @@ handle_call(_Request, _From, State) -> %%------------------------------------------------------------------------------ -spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()). handle_cast({'new_job', Pid, Node, CallId, DbsAndSizes}, State) -> + lager:info("start collecting data for compaction job ~p", [CallId]), TotalDbs = length(DbsAndSizes), Stats = #{'id' => CallId ,'found_dbs' => TotalDbs @@ -411,7 +411,7 @@ handle_info(_Info, State) -> -spec terminate(any(), state()) -> 'ok'. terminate(_Reason, _State) -> lager:debug("~s terminating with reason: ~p~n when state was: ~p" - ,[?MODULE, _Reason, _State] + ,[?SERVER, _Reason, _State] ). %%------------------------------------------------------------------------------ @@ -499,7 +499,7 @@ save_compaction_stats(#{'id' := Id lager:debug("saving stats after compaction job completion: ~p", [Stats]), {'ok', AccountId} = kapps_util:get_master_account_id(), {'ok', Doc} = kazoo_modb:save_doc(AccountId, kz_json:from_map(Map)), - lager:debug("created doc after compaction job completion: ~p", [Doc]), + lager:info("created doc after compaction job completion: ~p", [Doc]), 'ok'. -spec normalize_db(kz_term:ne_binary()) -> kz_term:ne_binary(). diff --git a/applications/tasks/src/modules/kt_compactor.erl b/applications/tasks/src/modules/kt_compactor.erl index 38ad9212cf0..55a4c4cb8b6 100644 --- a/applications/tasks/src/modules/kt_compactor.erl +++ b/applications/tasks/src/modules/kt_compactor.erl @@ -13,7 +13,7 @@ %% behaviour: tasks_provider --export([init/0, get_all_dbs_and_sort_by_disk/0]). +-export([init/0]). -export([compact_all/2 ,compact_node/3 @@ -26,6 +26,8 @@ ,do_compact_db/1 ]). +-export([browse_dbs_for_triggers/1]). + %% Triggerables -export([help/1, help/2, help/3 ,output_header/1 @@ -33,6 +35,7 @@ -ifdef(TEST). -export([sort_by_disk_size/1 + ,build_compaction_callid/1 ]). -endif. @@ -78,6 +81,10 @@ init() -> _ = case ?COMPACT_AUTOMATICALLY of 'false' -> lager:info("node ~s not configured to compact automatically", [node()]); 'true' -> + _ = tasks_bindings:bind(?TRIGGER_AUTO_COMPACTION + ,?MODULE + ,'browse_dbs_for_triggers' + ), lager:info("node ~s configured to compact automatically", [node()]), %% Need to use `do_compact_db/1' instead of `compact_db/1' because the %% the former uses `?HEUR_RATIO' for heuristic and the latter ignores @@ -162,11 +169,7 @@ compact_all(Extra, 'init') -> {'ok', is_allowed(Extra)}; compact_all(_Extra, 'true') -> %% Dbs to be compacted will be set at `do_compact_all/0' - Rows = track_job(<<"compact_all_", (kz_binary:rand_hex(4))/binary>> - ,fun do_compact_all/0 - ,[] - ), - {Rows, 'stop'}; + {track_job(<<"compact_all">>, fun do_compact_all/0, []), 'stop'}; compact_all(_Extra, 'false') -> {<<"compaction is only allowed by system administrators">>, 'stop'}. @@ -177,7 +180,7 @@ compact_node(_Extra, 'false', _Args) -> {<<"compaction is only allowed by system administrators">>, 'stop'}; compact_node(_Extra, 'true', #{<<"node">> := Node}=Row) -> %% Dbs to be compacted will be set at `do_compact_node/4' - Rows = track_job(<<"compact_node_", (kz_binary:rand_hex(4))/binary>> + Rows = track_job(<<"compact_node">> ,fun do_compact_node/2 ,[Node, heuristic_from_flag(maps:get(<<"force">>, Row))] ), @@ -189,10 +192,9 @@ compact_db(Extra, 'init', Args) -> compact_db(_Extra, 'false', _Args) -> {<<"compaction is only allowed by system administrators">>, 'stop'}; compact_db(_Extra, 'true', #{<<"database">> := Database}=Row) -> - Rows = track_job(<<"compact_db_", (kz_binary:rand_hex(4))/binary>> + Rows = track_job(<<"compact_db">> ,fun do_compact_db/2 ,[Database, heuristic_from_flag(maps:get(<<"force">>, Row))] - ,get_dbs_sizes([Database]) ), {Rows, 'true'}. @@ -208,14 +210,13 @@ compact_db(Database) -> maybe_track_compact_db(Db, Heur, <<"undefined">>) -> %% If not callid defined yet, then this function was call directly with a db name so %% it creates a new callid to track this db-only compaction job. - CallId = <<"compact_db_", (kz_binary:rand_hex(4))/binary>>, - track_job(CallId, fun do_compact_db/2, [Db, Heur], get_dbs_sizes([Db])); + track_job(<<"compact_db">>, fun do_compact_db/2, [Db, Heur], get_dbs_sizes([Db])); maybe_track_compact_db(Db, Heur, <<"sup_", _/binary>> = SupId) -> %% If callid starts with `sup_', then this function was called via a SUP command %% so it creates a new callid (remove the `@' sign + anything at the right of it) to %% track this db-only compaction job. - CallId = supid_to_callid(SupId), - track_job(CallId, fun do_compact_db/2, [Db, Heur], get_dbs_sizes([Db])); + JobType = supid_to_jobtype(SupId), + track_job(JobType, fun do_compact_db/2, [Db, Heur], get_dbs_sizes([Db])); maybe_track_compact_db(Db, Heur, CallId) -> %% If there is already a callid defined, then do_compact_db/2 will use it for updating %% the corresponding compaction's job stats. @@ -256,10 +257,9 @@ do_compact_all() -> case get_all_dbs_and_sort_by_disk() of [] -> lager:info("failed to find any dbs"); Sorted -> - lager:info("sorted: ~p", [Sorted]), + lager:info("starting do_compact_all execution, ~p dbs found", [length(Sorted)]), 'ok' = kt_compaction_reporter:set_job_dbs(CallId, Sorted), - SortedWithoutSizes = [Db || {Db, _Sizes} <- Sorted], - lists:foldl(fun do_compact_db_fold/2, [], SortedWithoutSizes) + lists:foldl(fun do_compact_db_fold/2, [], Sorted) end. -spec compact_node(kz_term:ne_binary()) -> 'ok'. @@ -271,11 +271,10 @@ compact_node(Node) -> -spec maybe_track_compact_node(kz_term:ne_binary(), heuristic(), kz_term:ne_binary()) -> rows(). maybe_track_compact_node(Node, Heur, <<"undefined">>) -> %% Dbs to be compacted will be set at `do_compact_node/4' - CallId = <<"compact_node_", (kz_binary:rand_hex(4))/binary>>, - track_job(CallId, fun do_compact_node/2, [Node, Heur]); + track_job(<<"compact_node">>, fun do_compact_node/2, [Node, Heur]); maybe_track_compact_node(Node, Heur, <<"sup_", _/binary>> = SupId) -> %% Triggered via SUP command - track_job(supid_to_callid(SupId), fun do_compact_node/2, [Node, Heur]); + track_job(supid_to_jobtype(SupId), fun do_compact_node/2, [Node, Heur]); maybe_track_compact_node(Node, Heur, _CallId) -> do_compact_node(Node, Heur). @@ -316,8 +315,10 @@ do_compact_node(Node, Heuristic, APIConn, AdminConn) -> do_compact_node(Node, Heuristic, APIConn, AdminConn, Databases) -> CallId = kz_log:get_callid(), lists:foldl(fun(Database, Acc) -> + lager:debug("setting current_db to ~p on compaction reporter", [Database]), 'ok' = kt_compaction_reporter:current_db(CallId, Database), NewAcc = do_compact_node_db(Node, Heuristic, APIConn, AdminConn, Database, Acc), + lager:debug("finished compacting ~p db on node ~p", [Database, Node]), 'ok' = kt_compaction_reporter:finished_db(CallId, Database, NewAcc), NewAcc end @@ -329,6 +330,7 @@ do_compact_node(Node, Heuristic, APIConn, AdminConn, Databases) -> do_compact_node_db(Node, Heuristic, APIConn, AdminConn, Database, Acc) -> Compactor = node_compactor(Node, Heuristic, APIConn, AdminConn, Database), Shards = kt_compactor_worker:compactor_shards(Compactor), + lager:info("adding ~p found shards to compaction reporter", [length(Shards)]), 'ok' = kt_compaction_reporter:add_found_shards(kz_log:get_callid(), length(Shards)), do_compact_node_db(Compactor, Acc). @@ -351,13 +353,16 @@ do_compact_node_db(Compactor) -> -spec do_compact_db(kz_term:ne_binary()) -> rows(). do_compact_db(Database) -> + lager:debug("about to start compacting ~p db", [Database]), do_compact_db(Database, ?HEUR_RATIO). -spec do_compact_db(kz_term:ne_binary(), heuristic()) -> rows(). do_compact_db(Database, Heuristic) -> do_compact_db_by_nodes(Database, Heuristic). --spec do_compact_db_fold(kz_term:ne_binary(), rows()) -> rows(). +-spec do_compact_db_fold(db_and_sizes() | kz_term:ne_binary(), rows()) -> rows(). +do_compact_db_fold({Db, _Sizes}, Rows) -> + do_compact_db_fold(Db, Rows); do_compact_db_fold(Database, Rows) -> Rows ++ do_compact_db(Database). @@ -384,6 +389,7 @@ do_compact_db_by_nodes(Database, Heuristic) -> -spec do_compact_db_by_node(kz_term:ne_binary(), heuristic(), kz_term:ne_binary(), rows()) -> rows(). do_compact_db_by_node(Node, Heuristic, Database, Acc) -> + lager:debug("about to start compacting ~p db on node ~p", [Database, Node]), #{'server' := {_App, #server{}=Conn}} = kzs_plan:plan(), case get_node_connections(Node, Conn) of {'error', _E} -> @@ -395,6 +401,7 @@ do_compact_db_by_node(Node, Heuristic, Database, Acc) -> -spec do_compact_db_by_node(kz_term:ne_binary(), heuristic(), kz_data:connection(), kz_data:connection(), kz_term:ne_binary(), rows()) -> rows(). do_compact_db_by_node(Node, Heuristic, APIConn, AdminConn, Database, Acc) -> + lager:debug("compacting ~p db on node ~p", [Database, Node]), case do_compact_node(Node, Heuristic, APIConn, AdminConn, [Database]) of [] -> Acc; [Row] -> [Row | Acc] @@ -467,19 +474,22 @@ get_dbs_and_sizes() -> -spec get_dbs_sizes(kz_term:ne_binaries()) -> [db_and_sizes()]. get_dbs_sizes(Dbs) -> #{'server' := {_App, #server{}=Conn}} = kzs_plan:plan(), - F = fun(Db, State) -> get_db_disk_and_data_fold(Conn, Db, State, 20) end, + F = fun(Db, State) -> + get_db_disk_and_data_fold(Conn, Db, State, ?COMPACTION_LIST_DBS_CHUNK_SIZE) + end, {DbsAndSizes, _} = lists:foldl(F, {[], 0}, Dbs), DbsAndSizes. -spec get_db_disk_and_data_fold(#server{} ,kz_term:ne_binary() ,{[db_and_sizes()], non_neg_integer()} - , pos_integer() + ,pos_integer() ) -> {[db_and_sizes()], pos_integer()}. get_db_disk_and_data_fold(Conn, UnencDb, {_, Counter} = State, ChunkSize) when Counter rem ChunkSize =:= 0 -> - %% Every `ChunkSize' handled requests, sleep 100ms (give the db a rest). - timer:sleep(100), + %% Every `ChunkSize' handled requests, sleep `?COMPACTION_LIST_DBS_PAUSE'ms (give the db a rest). + lager:debug("~p dbs read, resting for ~p ms", [Counter, ?COMPACTION_LIST_DBS_PAUSE]), + timer:sleep(?COMPACTION_LIST_DBS_PAUSE), do_get_db_disk_and_data_fold(Conn, UnencDb, State); get_db_disk_and_data_fold(Conn, UnencDb, State, _ChunkSize) -> do_get_db_disk_and_data_fold(Conn, UnencDb, State). @@ -514,16 +524,14 @@ sort_by_disk_size({_UnencDb1, _Else}, {_UnencDb2, {_DiskSize2, _}}) -> %% Else = 'false'. -spec track_job(kz_term:ne_binary(), function(), [term()]) -> rows(). -track_job(CallId, Fun, Args) -> - track_job(CallId, Fun, Args, []). +track_job(JobType, Fun, Args) -> + track_job(JobType, Fun, Args, []). -spec track_job(kz_term:ne_binary(), function(), [term()], dbs_and_sizes()) -> rows(). -track_job(_CallId, _Fun, _Args, []) -> - lager:info("no databases found to compact"), - []; -track_job(CallId, Fun, Args, Dbs) when is_function(Fun) - andalso is_list(Args) -> +track_job(JobType, Fun, Args, Dbs) when is_function(Fun) + andalso is_list(Args) -> try + CallId = build_compaction_callid(JobType), kz_log:put_callid(CallId), 'ok' = kt_compaction_reporter:start_tracking_job(self(), node(), CallId, Dbs), Rows = erlang:apply(Fun, Args), @@ -534,7 +542,94 @@ track_job(CallId, Fun, Args, Dbs) when is_function(Fun) 'error':{'badmatch', {'error','not_found'}} -> [] end. -%% SupId = <<"sup_0351@fqdn.hostname.com">>, CallId = <<"sup_0351">>. --spec supid_to_callid(kz_term:ne_binary()) -> kz_term:ne_binary(). -supid_to_callid(SupId) -> +%% SupId = <<"sup_0351@fqdn.hostname.com">>, JobType = <<"sup_0351">>. +-spec supid_to_jobtype(kz_term:ne_binary()) -> kz_term:ne_binary(). +supid_to_jobtype(SupId) -> hd(binary:split(SupId, <<"@">>)). + +%% ======================================================================================= +%% Start - Automatic Compaction Section +%% ======================================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Entry point for starting the automatic compaction job. +%% +%% This functions gets triggered by the `browse_dbs_ref' based on `browse_dbs_timer' +%% function. By default it triggers the action 1 day after the timer starts. +%% @end +%%------------------------------------------------------------------------------ +-spec browse_dbs_for_triggers(atom() | reference()) -> 'ok'. +browse_dbs_for_triggers(Ref) -> + CallId = build_compaction_callid(<<"cleanup_pass">>), + kz_log:put_callid(CallId), + lager:info("starting cleanup pass of databases"), + Dbs = maybe_list_and_sort_dbs_for_compaction(?COMPACT_AUTOMATICALLY, CallId), + _Counter = lists:foldl(fun trigger_db_cleanup/2, {length(Dbs), 1}, Dbs), + 'ok' = kt_compaction_reporter:stop_tracking_job(CallId), + kz_log:put_callid('undefined'), % Reset callid + lager:info("pass completed for ~p", [Ref]), + gen_server:cast('kz_tasks_trigger', {'cleanup_finished', Ref}). + +-spec build_compaction_callid(kz_term:ne_binary()) -> kz_term:ne_binary(). +build_compaction_callid(JobTypeBin) -> + {Year, Month, _} = erlang:date(), + %% <<"YYYYMM-jobtype_xxxxxxxx">> = CallId + <<(integer_to_binary(Year))/binary %% YYYY + ,(kz_binary:pad_left(integer_to_binary(Month), 2, <<"0">>))/binary %% MM + ,"-" + ,JobTypeBin/binary %% jobtype + ,"_" + ,(kz_binary:rand_hex(4))/binary %% xxxxxxxx + >>. + +-spec trigger_db_cleanup(db_and_sizes() | kz_term:ne_binary() + ,{pos_integer(), pos_integer()} + ) -> {pos_integer(), pos_integer()}. +trigger_db_cleanup({Db, _Sizes}, Acc) -> + trigger_db_cleanup(Db, Acc); +trigger_db_cleanup(Db, {TotalDbs, Counter}) -> + lager:debug("triggering ~p db compaction ~p/~p (~p remaining)", + [Db, Counter, TotalDbs, (TotalDbs - Counter)]), + cleanup_pass(Db), + {TotalDbs, Counter + 1}. + +-spec maybe_list_and_sort_dbs_for_compaction(boolean(), kz_term:ne_binary()) -> + [kz_term:ne_binary()]. +maybe_list_and_sort_dbs_for_compaction('true', CallId) -> + lager:debug("auto compaction enabled, getting databases list and sorting them by disk size"), + Sorted = get_all_dbs_and_sort_by_disk(), + lager:debug("finished listing and sorting databases (~p found)", [length(Sorted)]), + 'ok' = kt_compaction_reporter:start_tracking_job(self(), node(), CallId, Sorted), + Sorted; +maybe_list_and_sort_dbs_for_compaction('false', _CallId) -> + lager:debug("auto compaction disabled, skip sorting dbs by size"), + {'ok', Dbs} = kz_datamgr:db_info(), + lager:debug("finished listing databases (~p found)", [length(Dbs)]), + Dbs. + +-spec cleanup_pass(kz_term:ne_binary()) -> boolean(). +cleanup_pass(Db) -> + _ = tasks_bindings:map(db_to_trigger(Db), Db), + erlang:garbage_collect(self()). + +-spec db_to_trigger(kz_term:ne_binary()) -> kz_term:ne_binary(). +db_to_trigger(Db) -> + Classifiers = [{fun kapps_util:is_account_db/1, ?TRIGGER_ACCOUNT} + ,{fun kapps_util:is_account_mod/1, ?TRIGGER_ACCOUNT_MOD} + ,{fun is_system_db/1, ?TRIGGER_SYSTEM} + ], + db_to_trigger(Db, Classifiers). + +db_to_trigger(_Db, []) -> ?TRIGGER_OTHER; +db_to_trigger(Db, [{Classifier, Trigger} | Classifiers]) -> + case Classifier(Db) of + 'true' -> Trigger; + 'false' -> db_to_trigger(Db, Classifiers) + end. + +-spec is_system_db(kz_term:ne_binary()) -> boolean(). +is_system_db(Db) -> + lists:member(Db, ?KZ_SYSTEM_DBS). +%% ======================================================================================= +%% End - Automatic Compaction Section +%% ======================================================================================= diff --git a/applications/tasks/src/modules/kt_compactor.hrl b/applications/tasks/src/modules/kt_compactor.hrl index e2d556df087..4b787e79e88 100644 --- a/applications/tasks/src/modules/kt_compactor.hrl +++ b/applications/tasks/src/modules/kt_compactor.hrl @@ -37,6 +37,14 @@ -define(COMPACT_AUTOMATICALLY ,kapps_config:get_is_true(?SYSCONFIG_COUCH, <<"compact_automatically">>, 'false') ). +%% How many dbs to read between pauses. +-define(COMPACTION_LIST_DBS_CHUNK_SIZE + ,kapps_config:get_integer(?CONFIG_CAT, <<"compaction_list_dbs_chunk_size">>, 20) + ). +%% How long to pause before attempting to get the next chunk of dbs. +-define(COMPACTION_LIST_DBS_PAUSE + ,kapps_config:get_integer(?CONFIG_CAT, <<"compaction_list_dbs_pause_ms">>, 200) + ). -define(ADMIN_PORT ,kapps_config:get_integer(?SYSCONFIG_COUCH, <<"admin_port">>, 5986) diff --git a/applications/tasks/src/modules/kt_compactor_worker.erl b/applications/tasks/src/modules/kt_compactor_worker.erl index 357550d6b84..772a3852460 100644 --- a/applications/tasks/src/modules/kt_compactor_worker.erl +++ b/applications/tasks/src/modules/kt_compactor_worker.erl @@ -122,13 +122,13 @@ compact_shard(#compactor{shards=[Shard]}=Compactor) -> case get_db_disk_and_data(compactor_admin(Compactor), Shard) of 'undefined' -> - lager:info("beginning shard compaction"), + lager:debug("beginning shard compaction"), start_compacting_shard(Compactor); 'not_found' -> - lager:info("disk and data size not found, skip and return ok"), + lager:debug("disk and data size not found, skip and return ok"), 'ok'; {BeforeDisk, BeforeData} -> - lager:info("beginning shard compaction: ~p disk/~p data", [BeforeDisk, BeforeData]), + lager:debug("beginning shard compaction: ~p disk/~p data", [BeforeDisk, BeforeData]), start_compacting_shard(Compactor) end. @@ -201,7 +201,7 @@ wait_for_design_compaction(AdminConn, Shard, DDs, DD, {'error', {'conn_failed', 'ok' = timer:sleep(?SLEEP_BETWEEN_POLL), wait_for_design_compaction(AdminConn, Shard, DDs, DD, kz_couch_view:design_info(AdminConn, Shard, DD)); wait_for_design_compaction(AdminConn, Shard, DDs, _DD, {'error', 'not_found'}) -> - lager:info("compacting shard design docs not found"), + lager:debug("compacting shard design docs not found"), wait_for_design_compaction(AdminConn, Shard, DDs); wait_for_design_compaction(AdminConn, Shard, DDs, _DD, {'error', _E}) -> lager:warning("failed design status for '~s/~s': ~p", [Shard, _DD, _E]), @@ -222,7 +222,7 @@ wait_for_compaction(AdminConn, Shard) -> -spec wait_for_compaction(kz_data:connection(), kz_term:ne_binary(), db_info_resp()) -> 'ok'. wait_for_compaction(_AdminConn, _Shard, {'error', 'db_not_found'}) -> - lager:info("shard '~s' wasn't found on this connection: ~p", [_Shard, _AdminConn]); + lager:debug("shard '~s' wasn't found on this connection: ~p", [_Shard, _AdminConn]); wait_for_compaction(AdminConn, Shard, {'error', 'timeout'}) -> lager:warning("timed out querying db status; that seems irregular!"), 'ok' = timer:sleep(?SLEEP_BETWEEN_POLL * 2), @@ -306,7 +306,8 @@ min_data_met(_Data, _Min) -> min_ratio_met(Disk, Data, MinRatio) -> case (Disk - Data) / Disk * 100 of R when R > MinRatio -> - lager:debug("ratio ~.2f% is greater than min ratio: ~p%", [R, MinRatio]), + lager:debug("ratio ~.2f% (~p disk/~p data) is greater than min ratio: ~p%", + [R, Disk, Data, MinRatio]), 'true'; _R -> lager:debug("ratio ~.2f% ((~p-~p) / ~p * 100) is under min threshold ~p%", diff --git a/applications/tasks/src/kz_task_worker.erl b/applications/tasks/src/modules/kt_task_worker.erl similarity index 99% rename from applications/tasks/src/kz_task_worker.erl rename to applications/tasks/src/modules/kt_task_worker.erl index 14b1d11198b..f189cdda035 100644 --- a/applications/tasks/src/kz_task_worker.erl +++ b/applications/tasks/src/modules/kt_task_worker.erl @@ -9,7 +9,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(kz_task_worker). +-module(kt_task_worker). %% API -export([start/3]). diff --git a/applications/tasks/src/kz_task_worker_noinput.erl b/applications/tasks/src/modules/kt_task_worker_noinput.erl similarity index 99% rename from applications/tasks/src/kz_task_worker_noinput.erl rename to applications/tasks/src/modules/kt_task_worker_noinput.erl index 0c54ccdf052..76d5dca47bf 100644 --- a/applications/tasks/src/kz_task_worker_noinput.erl +++ b/applications/tasks/src/modules/kt_task_worker_noinput.erl @@ -9,7 +9,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(kz_task_worker_noinput). +-module(kt_task_worker_noinput). %% API -export([start/3]). diff --git a/applications/tasks/src/tasks.app.src b/applications/tasks/src/tasks.app.src index a37dae8dd54..eb126f28869 100644 --- a/applications/tasks/src/tasks.app.src +++ b/applications/tasks/src/tasks.app.src @@ -1,11 +1,11 @@ {application,tasks, [{applications,[couchbeam,kazoo,kazoo_amqp,kazoo_apps, kazoo_bindings,kazoo_couch,kazoo_csv,kazoo_data, - kazoo_documents,kazoo_globals,kazoo_ledgers, - kazoo_modb,kazoo_numbers,kazoo_services, - kazoo_stdlib,kazoo_tasks,kazoo_voicemail, - kazoo_web,kernel,lager,stdlib]}, - {description,"Tasks - Schedule one-offs cluster-wide tasks"}, + kazoo_documents,kazoo_ledgers,kazoo_modb, + kazoo_numbers,kazoo_services,kazoo_stdlib, + kazoo_tasks,kazoo_voicemail,kazoo_web, + kernel,lager,stdlib]}, + {description,"Tasks - Schedule one-offs tasks"}, {env,[{is_kazoo_app,true}]}, {mod,{tasks_app,[]}}, {modules,[]}, diff --git a/applications/tasks/src/tasks_maintenance.erl b/applications/tasks/src/tasks_maintenance.erl index 8299b00f54b..1aca5eb2351 100644 --- a/applications/tasks/src/tasks_maintenance.erl +++ b/applications/tasks/src/tasks_maintenance.erl @@ -134,7 +134,8 @@ remove(TaskId) -> -spec start_cleanup_pass() -> no_return. start_cleanup_pass() -> - _ = kz_tasks_trigger:browse_dbs_for_triggers(?MODULE), + _ = kz_process:spawn(fun kt_compactor:browse_dbs_for_triggers/1, [?MODULE]), + io:format("cleanup pass started~n"), no_return. -spec cleanup_soft_deletes(kz_term:text()) -> no_return. @@ -267,9 +268,9 @@ maybe_print_compaction_history({'ok', JObjs}) -> ,"finished_at" ,"exec_time" ], - HLine = "+-----------------------+--------+-----------+---------+------------+---------------------+---------------------+--------------+", + HLine = "+------------------------------+--------+-----------+---------+------------+---------------------+---------------------+--------------+", %% Format string for printing header and values of the table including "columns". - FStr = "| ~.21s | ~6.6s | ~9.9s | ~7.7s | ~10.10s | ~.19s | ~.19s | ~12.12s |~n", + FStr = "| ~.28s | ~6.6s | ~9.9s | ~7.7s | ~10.10s | ~.19s | ~.19s | ~12.12s |~n", %% Print top line of table, then prints the header and then another line below. io:format("~s~n" ++ FStr ++ "~s~n", [HLine] ++ Header ++ [HLine]), lists:foreach(fun(Obj) -> print_compaction_history_row(Obj, FStr) end, JObjs), diff --git a/applications/tasks/test/kt_compactor_tests.erl b/applications/tasks/test/kt_compactor_tests.erl index fd5bd2840db..aff81c9e5d5 100644 --- a/applications/tasks/test/kt_compactor_tests.erl +++ b/applications/tasks/test/kt_compactor_tests.erl @@ -41,6 +41,24 @@ sort_by_disk_size_test_() -> } ]. +build_compaction_callid_test_() -> + TestF = fun() -> kt_compactor:build_compaction_callid(<<"testing">>) end, + + {Year, Month, _} = erlang:date(), + YearBin = integer_to_binary(Year), + MonthBin = kz_binary:pad_left(integer_to_binary(Month), 2, <<"0">>), + + [{"Prefix with YYYYMM-" + ,?_assertMatch(<>, TestF()) + } + ,{"Suffix with random hex" + ,?_assertMatch(<<_Prefix:7/binary, "testing_", _Suffix:8/binary>>, TestF()) + } + ,{"Return unique callid every time" + ,?_assertNotEqual(TestF(), TestF()) + } + ]. + %% ======================================================================================= %% Helpers %% ======================================================================================= diff --git a/core/kazoo_tasks/include/tasks.hrl b/core/kazoo_tasks/include/tasks.hrl index 65c47dd629c..e9b42cd1d8f 100644 --- a/core/kazoo_tasks/include/tasks.hrl +++ b/core/kazoo_tasks/include/tasks.hrl @@ -29,6 +29,8 @@ -define(TRIGGER_OTHER, <<"tasks.triggers.other">>). -define(TRIGGER_SYSTEM, <<"tasks.triggers.system">>). +-define(TRIGGER_AUTO_COMPACTION, <<"tasks.triggers.cleanup_pass">>). + -define(TRIGGER_ALL_DBS, [?TRIGGER_ACCOUNT ,?TRIGGER_ACCOUNT_MOD ,?TRIGGER_SYSTEM