diff --git a/big_tests/tests/service_domain_db_SUITE.erl b/big_tests/tests/service_domain_db_SUITE.erl index b8f34a588c..45e5c1cda6 100644 --- a/big_tests/tests/service_domain_db_SUITE.erl +++ b/big_tests/tests/service_domain_db_SUITE.erl @@ -63,6 +63,7 @@ db_cases() -> [ db_out_of_sync_restarts_service, db_crash_on_initial_load_restarts_service, db_restarts_properly, + db_loads_domains_after_node_joins_cluster, cli_can_insert_domain, cli_can_disable_domain, cli_can_enable_domain, @@ -242,7 +243,9 @@ setup_service(Opts, Config) -> Pairs1 = [{<<"example.cfg">>, <<"type1">>}, {<<"erlang-solutions.com">>, <<"type2">>}, {<<"erlang-solutions.local">>, <<"type2">>}], - CommonTypes = [<<"type1">>, <<"type2">>, <<"dbgroup">>, <<"dbgroup2">>, <<"cfggroup">>], + %% <<"test type">> is the default type + CommonTypes = [<<"type1">>, <<"type2">>, <<"dbgroup">>, <<"dbgroup2">>, <<"cfggroup">>, + <<"test type">>], Types2 = [<<"mim2only">>|CommonTypes], init_with(mim(), Pairs1, CommonTypes), init_with(mim2(), [], Types2), @@ -543,6 +546,37 @@ db_restarts_properly(_) -> end, mongoose_helper:wait_until(F, true, #{time_left => timer:seconds(15)}). +db_loads_domains_after_node_joins_cluster(Config) -> + HostType = <<"test type">>, + ok = insert_domain(mim(), <<"example1.com">>, HostType), + ok = insert_domain(mim2(), <<"example2.com">>, HostType), + sync(), + SupPid1 = whereis_sup(), + CorePid1 = whereis_core(), + ServicePid1 = whereis_service(), + ct:log("Pids sup=~p core=~p service=~p", [SupPid1, CorePid1, ServicePid1]), + %% Check that DB is ok + Rows = rpc(mim(), mongoose_domain_sql, select_from, [0, 9999]), + [{_, <<"example1.com">>, HostType}, + {_, <<"example2.com">>, HostType}] = Rows, + %% WHEN Adding node into a cluster (and mim node restarting) + add_mim_to_cluster(Config), + service_enabled(mim()), + %% Core and service get restarted + SupPid2 = whereis_sup(), + CorePid2 = whereis_core(), + ServicePid2 = whereis_service(), + ct:log("Pids sup=~p core=~p service=~p", [SupPid2, CorePid2, ServicePid2]), + true = SupPid1 =/= SupPid2, + true = CorePid1 =/= CorePid2, + true = ServicePid1 =/= ServicePid2, + %% THEN Sync is successful + Rows = rpc(mim(), mongoose_domain_sql, select_from, [0, 9999]), + ok = insert_domain(mim(), <<"example3.com">>, HostType), + ok = insert_domain(mim2(), <<"example4.com">>, HostType), + sync(), + assert_domains_are_equal(HostType). + cli_can_insert_domain(Config) -> {"Added\n", 0} = ejabberdctl("insert_domain", [<<"example.db">>, <<"type1">>], Config), @@ -911,7 +945,9 @@ select_domain(Node, Domain) -> erase_database(Node) -> case mongoose_helper:is_rdbms_enabled(domain()) of - true -> rpc(Node, mongoose_domain_sql, erase_database, []); + true -> + prepare_test_queries(Node), + rpc(Node, mongoose_domain_sql, erase_database, []); false -> ok end. @@ -1101,3 +1137,37 @@ handler_opts(#{skip_auth := true}) -> []; handler_opts(_Params) -> [{password, <<"secret">>}, {username, <<"admin">>}]. + +assert_domains_are_equal(HostType) -> + Domains1 = lists:sort(get_domains_by_host_type(mim(), HostType)), + Domains2 = lists:sort(get_domains_by_host_type(mim2(), HostType)), + case Domains1 == Domains2 of + true -> ok; + false -> ct:fail({Domains1, Domains2}) + end. + +whereis_service() -> + rpc(mim(), erlang, whereis, [service_domain_db]). + +whereis_core() -> + rpc(mim(), erlang, whereis, [mongoose_domain_core]). + +whereis_sup() -> + rpc(mim(), erlang, whereis, [mongoose_domain_sup]). + +add_mim_to_cluster(Config) -> + leave_cluster(Config), + join_cluster(Config). + +leave_cluster(Config) -> + Cmd = "leave_cluster", + #{node := Node} = distributed_helper:mim(), + Args = ["--force"], + ejabberdctl_helper:ejabberdctl(Node, Cmd, Args, Config). + +join_cluster(Config) -> + Cmd = "join_cluster", + #{node := Node} = distributed_helper:mim(), + #{node := Node2} = distributed_helper:mim2(), + Args = ["--force", atom_to_list(Node2)], + ejabberdctl_helper:ejabberdctl(Node, Cmd, Args, Config). diff --git a/src/domain/mongoose_domain_core.erl b/src/domain/mongoose_domain_core.erl index 3c5cce8533..c358f9f9ee 100644 --- a/src/domain/mongoose_domain_core.erl +++ b/src/domain/mongoose_domain_core.erl @@ -56,12 +56,12 @@ start(Pairs, AllowedHostTypes) -> {?MODULE, {?MODULE, start_link, [Pairs, AllowedHostTypes]}, permanent, infinity, worker, [?MODULE]}, - just_ok(supervisor:start_child(ejabberd_sup, ChildSpec)). + just_ok(supervisor:start_child(mongoose_domain_sup, ChildSpec)). %% required for integration tests stop() -> - supervisor:terminate_child(ejabberd_sup, ?MODULE), - supervisor:delete_child(ejabberd_sup, ?MODULE), + supervisor:terminate_child(mongoose_domain_sup, ?MODULE), + supervisor:delete_child(mongoose_domain_sup, ?MODULE), ok. -endif. @@ -130,6 +130,7 @@ get_start_args() -> %% gen_server callbacks %%-------------------------------------------------------------------- init([Pairs, AllowedHostTypes]) -> + service_domain_db:reset_last_event_id(), ets:new(?TABLE, [set, named_table, protected, {read_concurrency, true}]), ets:new(?HOST_TYPE_TABLE, [set, named_table, protected, {read_concurrency, true}]), insert_host_types(?HOST_TYPE_TABLE, AllowedHostTypes), diff --git a/src/domain/mongoose_domain_db_cleaner.erl b/src/domain/mongoose_domain_db_cleaner.erl index 8057829312..266b0bc7f0 100644 --- a/src/domain/mongoose_domain_db_cleaner.erl +++ b/src/domain/mongoose_domain_db_cleaner.erl @@ -30,12 +30,12 @@ start(Opts) -> {?MODULE, {?MODULE, start_link, [Opts]}, permanent, infinity, worker, [?MODULE]}, - supervisor:start_child(ejabberd_sup, ChildSpec), + supervisor:start_child(mongoose_domain_sup, ChildSpec), ok. stop() -> - supervisor:terminate_child(ejabberd_sup, ?MODULE), - supervisor:delete_child(ejabberd_sup, ?MODULE), + supervisor:terminate_child(mongoose_domain_sup, ?MODULE), + supervisor:delete_child(mongoose_domain_sup, ?MODULE), ok. start_link(Opts) -> diff --git a/src/domain/mongoose_domain_loader.erl b/src/domain/mongoose_domain_loader.erl index 686b7832f1..02f5c50072 100644 --- a/src/domain/mongoose_domain_loader.erl +++ b/src/domain/mongoose_domain_loader.erl @@ -7,7 +7,7 @@ load_data_from_base(FromId, PageSize) -> try - load_data_from_base_loop(FromId, PageSize) + load_data_from_base_loop(FromId, PageSize, 0) catch Class:Reason:Stacktrace -> Text = <<"Loading initial domains from RDBMS failed">>, ?LOG_CRITICAL(#{what => load_domains_from_base_failed, @@ -15,23 +15,25 @@ load_data_from_base(FromId, PageSize) -> from_id => FromId, class => Class, reason => Reason, stacktrace => Stacktrace}), - service_domain_db:restart() + service_domain_db:restart(), + {ok, #{count => 0}} end. -load_data_from_base_loop(FromId, PageSize) -> +load_data_from_base_loop(FromId, PageSize, Count) -> %% Crash on init if select fails. case mongoose_domain_sql:select_from(FromId, PageSize) of - [] -> ok; + [] -> {ok, #{count => Count}}; Rows -> PageMaxId = row_to_id(lists:last(Rows)), insert_rows_to_core(Rows), - load_data_from_base_loop(PageMaxId, PageSize) + load_data_from_base_loop(PageMaxId, PageSize, Count + length(Rows)) end. remove_outdated_domains_from_core() -> CurrentSource = self(), OutdatedDomains = mongoose_domain_core:get_all_outdated(CurrentSource), - remove_domains(OutdatedDomains). + remove_domains(OutdatedDomains), + {ok, #{count => length(OutdatedDomains)}}. check_for_updates(FromId, PageSize) -> %% Ordered by the earliest events first diff --git a/src/domain/mongoose_domain_sql.erl b/src/domain/mongoose_domain_sql.erl index b96aa5b820..b047c19a4e 100644 --- a/src/domain/mongoose_domain_sql.erl +++ b/src/domain/mongoose_domain_sql.erl @@ -148,6 +148,7 @@ select_from(FromId, Limit) -> Pool = get_db_pool(), Args = rdbms_queries:add_limit_arg(Limit, [FromId]), {selected, Rows} = execute_successfully(Pool, domain_select_from, Args), + ?LOG_ERROR(#{what => select_from, rows => Rows, from_id => FromId, limit => Limit}), Rows. -spec select_updates_from(event_id(), limit()) -> [row()]. diff --git a/src/domain/mongoose_domain_sup.erl b/src/domain/mongoose_domain_sup.erl new file mode 100644 index 0000000000..72a739084d --- /dev/null +++ b/src/domain/mongoose_domain_sup.erl @@ -0,0 +1,17 @@ +-module(mongoose_domain_sup). +-export([start_link/0]). +-export([init/1]). + +-ignore_xref([start_link/0, init/1]). + +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => rest_for_one, + intensity => 100, period => 5, + shutdown => 1000}, + ChildSpecs = [], + {ok, { SupFlags, ChildSpecs }}. diff --git a/src/domain/mongoose_lazy_routing.erl b/src/domain/mongoose_lazy_routing.erl index 53460b206a..792651e475 100644 --- a/src/domain/mongoose_lazy_routing.erl +++ b/src/domain/mongoose_lazy_routing.erl @@ -80,12 +80,12 @@ stop() -> start() -> ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, infinity, worker, [?MODULE]}, - just_ok(supervisor:start_child(ejabberd_sup, ChildSpec)). + just_ok(supervisor:start_child(mongoose_domain_sup, ChildSpec)). %% required for integration tests stop() -> - supervisor:terminate_child(ejabberd_sup, ?MODULE), - supervisor:delete_child(ejabberd_sup, ?MODULE), + supervisor:terminate_child(mongoose_domain_sup, ?MODULE), + supervisor:delete_child(mongoose_domain_sup, ?MODULE), ok. -endif. diff --git a/src/domain/mongoose_subdomain_core.erl b/src/domain/mongoose_subdomain_core.erl index d649b2994a..ee7b1ac8d7 100644 --- a/src/domain/mongoose_subdomain_core.erl +++ b/src/domain/mongoose_subdomain_core.erl @@ -85,12 +85,12 @@ log_error(_Function, _Error) -> ok. start() -> ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, infinity, worker, [?MODULE]}, - just_ok(supervisor:start_child(ejabberd_sup, ChildSpec)). + just_ok(supervisor:start_child(mongoose_domain_sup, ChildSpec)). %% required for integration tests stop() -> - supervisor:terminate_child(ejabberd_sup, ?MODULE), - supervisor:delete_child(ejabberd_sup, ?MODULE), + supervisor:terminate_child(mongoose_domain_sup, ?MODULE), + supervisor:delete_child(mongoose_domain_sup, ?MODULE), ok. -endif. diff --git a/src/domain/service_domain_db.erl b/src/domain/service_domain_db.erl index 2b22efda22..222c792e4f 100644 --- a/src/domain/service_domain_db.erl +++ b/src/domain/service_domain_db.erl @@ -10,16 +10,14 @@ -define(LAST_EVENT_ID_KEY, {?MODULE, last_event_id}). -export([start/1, stop/0, restart/0, config_spec/0]). --export([start_link/0]). +-export([start_link/1]). -export([enabled/0]). -export([force_check_for_updates/0]). -export([sync/0, sync_local/0]). - -%% exported for integration tests only! -export([reset_last_event_id/0]). -ignore_xref([code_change/3, handle_call/3, handle_cast/2, handle_info/2, - init/1, start_link/0, sync/0, sync_local/0, terminate/2, reset_last_event_id/0]). + init/1, start_link/1, sync/0, sync_local/0, terminate/2, reset_last_event_id/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -29,19 +27,18 @@ %% Client code start(Opts) -> - mongoose_domain_sql:start(Opts), ChildSpec = {?MODULE, - {?MODULE, start_link, []}, + {?MODULE, start_link, [Opts]}, permanent, infinity, worker, [?MODULE]}, - supervisor:start_child(ejabberd_sup, ChildSpec), + supervisor:start_child(mongoose_domain_sup, ChildSpec), mongoose_domain_db_cleaner:start(Opts), ok. stop() -> mongoose_domain_db_cleaner:stop(), - supervisor:terminate_child(ejabberd_sup, ?MODULE), - supervisor:delete_child(ejabberd_sup, ?MODULE), + supervisor:terminate_child(mongoose_domain_sup, ?MODULE), + supervisor:delete_child(mongoose_domain_sup, ?MODULE), ok. restart() -> @@ -62,8 +59,8 @@ config_spec() -> validate = pool_name} }}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). enabled() -> mongoose_service:is_loaded(?MODULE). @@ -94,11 +91,11 @@ sync_local() -> %% --------------------------------------------------------------------------- %% Server callbacks -init([]) -> +init([Opts]) -> + mongoose_domain_sql:start(Opts), mongoose_domain_gaps:init(), ?PG_JOIN(?GROUP, self()), gen_server:cast(self(), initial_loading), - %% initial state will be set on initial_loading processing {ok, #{}}. handle_call(ping, _From, State) -> @@ -112,7 +109,7 @@ handle_cast(initial_loading, State) -> ?LOG_INFO(#{what => domains_loaded, last_event_id => LastEventId}), NewState = State#{last_event_id => LastEventId, check_for_updates_interval => 30000}, - {noreply, handle_check_for_updates(NewState)}; + {noreply, handle_check_for_updates(NewState, true)}; handle_cast(reset_and_shutdown, State) -> %% to ensure that domains table is re-read from %% scratch, we must reset the last event id. @@ -123,7 +120,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info(check_for_updates, State) -> - {noreply, handle_check_for_updates(State)}; + {noreply, handle_check_for_updates(State, false)}; handle_info(Info, State) -> ?UNEXPECTED_INFO(Info), {noreply, State}. @@ -140,27 +137,33 @@ code_change(_OldVsn, State, _Extra) -> initial_load(undefined) -> LastEventId = mongoose_domain_sql:get_max_event_id_or_set_dummy(), PageSize = 10000, - mongoose_domain_loader:load_data_from_base(0, PageSize), - mongoose_domain_loader:remove_outdated_domains_from_core(), + {ok, #{count := Count}} = mongoose_domain_loader:load_data_from_base(0, PageSize), + {ok, #{count := Outdated}} = mongoose_domain_loader:remove_outdated_domains_from_core(), set_last_event_id(LastEventId), + ?LOG_WARNING(#{what => initial_load, last_event_id => LastEventId, + domains_count => Count, outdated_count => Outdated}), LastEventId; initial_load(LastEventId) when is_integer(LastEventId) -> + ?LOG_WARNING(#{what => skip_initial_load, last_event_id => LastEventId}), LastEventId. %% Skip initial init handle_check_for_updates(State = #{last_event_id := LastEventId, - check_for_updates_interval := Interval}) -> - maybe_cancel_timer(State), + check_for_updates_interval := Interval}, + IsInitial) -> + maybe_cancel_timer(IsInitial, State), receive_all_check_for_updates(), PageSize = 1000, LastEventId2 = mongoose_domain_loader:check_for_updates(LastEventId, PageSize), maybe_set_last_event_id(LastEventId, LastEventId2), TRef = erlang:send_after(Interval, self(), check_for_updates), - State#{last_event_id => LastEventId2, check_for_updates => TRef}. + State#{last_event_id => LastEventId2, check_for_updates_tref => TRef}. -maybe_cancel_timer(#{check_for_updates_tref := TRef}) -> - erlang:cancel_timer(TRef); -maybe_cancel_timer(_) -> - ok. +maybe_cancel_timer(IsInitial, State) -> + TRef = maps:get(check_for_updates_tref, State, undefined), + case {IsInitial, TRef} of + {true, undefined} -> ok; %% TRef is not set the first time + {false, _} -> erlang:cancel_timer(TRef) + end. receive_all_check_for_updates() -> receive check_for_updates -> receive_all_check_for_updates() after 0 -> ok end. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 97670ad0e3..7bb6115359 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -153,6 +153,10 @@ init([]) -> {ejabberd_shaper_sup, {ejabberd_shaper_sup, start_link, []}, permanent, infinity, supervisor, [ejabberd_shaper_sup]}, + DomainSup = + {mongoose_domain_sup, + {mongoose_domain_sup, start_link, []}, + permanent, infinity, supervisor, [mongoose_domain_sup]}, {ok, {{one_for_one, 10, 1}, [Hooks, Cleaner, @@ -170,7 +174,8 @@ init([]) -> Listener, MucIQ, MAM, - ShaperSup]}}. + ShaperSup, + DomainSup]}}. start_child(ChildSpec) -> case supervisor:start_child(ejabberd_sup, ChildSpec) of