diff --git a/big_tests/tests/inbox_SUITE.erl b/big_tests/tests/inbox_SUITE.erl index 21d99f574c..106001c9d4 100644 --- a/big_tests/tests/inbox_SUITE.erl +++ b/big_tests/tests/inbox_SUITE.erl @@ -119,27 +119,29 @@ groups() -> get_with_start_timestamp, get_with_end_timestamp ]}, - {regular, [], + {bin, [], [ - {group, generic}, - {group, one_to_one}, - {group, muclight}, - {group, muclight_config}, - {group, muc}, - {group, timestamps} + timeout_cleaner_flush_all, + rest_api_bin_flush_all, + rest_api_bin_flush_user, + xmpp_bin_flush, + bin_is_not_included_by_default ]}, - {async_pools, [], - [ - {group, generic}, - {group, one_to_one}, - {group, muclight}, - {group, muclight_config}, - {group, muc}, - {group, timestamps} - ]} + {regular, [], test_groups()}, + {async_pools, [], [{group, bin} | test_groups()]} ], inbox_helper:maybe_run_in_parallel(Gs). +test_groups() -> + [ + {group, generic}, + {group, one_to_one}, + {group, muclight}, + {group, muclight_config}, + {group, muc}, + {group, timestamps} + ]. + suite() -> escalus:suite(). @@ -193,6 +195,10 @@ end_per_group(muc, Config) -> end_per_group(_GroupName, Config) -> Config. +init_per_testcase(timeout_cleaner_flush_all, Config) -> + clear_inbox_all(), + inbox_helper:reload_inbox_option(Config, [{bin_ttl, 0}, {bin_clean_after, 10}]), + escalus:init_per_testcase(timeout_cleaner_flush_all, Config); init_per_testcase(TS, Config) when TS =:= create_groupchat_no_affiliation_stored; TS =:= system_message_is_correctly_avoided -> @@ -218,6 +224,9 @@ init_per_testcase(no_stored_and_remain_after_kicked, Config) -> init_per_testcase(CaseName, Config) -> escalus:init_per_testcase(CaseName, Config). +end_per_testcase(timeout_cleaner_flush_all, Config) -> + inbox_helper:restore_inbox_option(Config), + escalus:end_per_testcase(timeout_cleaner_flush_all, Config); end_per_testcase(TS, Config) when TS =:= create_groupchat_no_affiliation_stored; TS =:= system_message_is_correctly_avoided -> @@ -279,7 +288,7 @@ returns_valid_form(Config) -> value := <<"desc">>, options := OrderOptions } } = Form, [<<"asc">>, <<"desc">>] = lists:sort(OrderOptions), - [<<"all">>, <<"archive">>, <<"inbox">>, <<"other">>] = lists:sort(BoxOptions), + [<<"all">>, <<"archive">>, <<"bin">>, <<"inbox">>, <<"other">>] = lists:sort(BoxOptions), #{ <<"hidden_read">> := #{ type := <<"text-single">> } } = Form end). @@ -318,8 +327,8 @@ returns_error_when_no_reset_field_jid(Config) -> returns_error_when_first_bad_form_field_encountered(Config) -> escalus:fresh_story(Config, [{alice, 1}], fun(Alice) -> - Stanza = inbox_helper:make_inbox_stanza( #{ <<"start">> => <<"invalid">>, - <<"end">> => <<"invalid">>}, false), + Stanza = inbox_helper:make_inbox_stanza(#{<<"start">> => <<"invalid">>, + <<"end">> => <<"invalid">>}, false), escalus:send(Alice, Stanza), [ResIQ] = escalus:wait_for_stanzas(Alice, 1), escalus_pred:is_iq_error(ResIQ), @@ -329,7 +338,7 @@ returns_error_when_first_bad_form_field_encountered(Config) -> returns_error_when_unknown_field_sent(Config) -> escalus:fresh_story(Config, [{alice, 1}], fun(Alice) -> - Stanza = inbox_helper:make_inbox_stanza( #{ <<"unknown_field">> => <<"unknown_field_value">> }, false), + Stanza = inbox_helper:make_inbox_stanza(#{<<"unknown_field">> => <<"unknown_field_value">>}, false), escalus:send(Alice, Stanza), [ResIQ] = escalus:wait_for_stanzas(Alice, 1), escalus_pred:is_iq_error(ResIQ), @@ -569,8 +578,6 @@ total_unread_count_and_active_convs_are_zero_at_no_activity(Config) -> inbox_helper:get_inbox(Kate, #{count => 0, unread_messages => 0, active_conversations => 0}) end). - - try_to_reset_unread_counter_with_bad_marker(Config) -> escalus:fresh_story(Config, [{kate, 1}, {mike, 1}], fun(Kate, Mike) -> Msg1 = escalus_stanza:set_id(escalus_stanza:chat_to(Mike, <<"okey dockey">>), <<"111">>), @@ -625,7 +632,7 @@ simple_groupchat_stored_in_all_inbox(Config) -> RoomJid = room_bin_jid(Room), AliceRoomJid = <>, Stanza = escalus_stanza:set_id( - escalus_stanza:groupchat_to(RoomJid, Msg), Id), + escalus_stanza:groupchat_to(RoomJid, Msg), Id), %% Alice sends message to a room escalus:send(Alice, Stanza), inbox_helper:wait_for_groupchat_msg(Users), @@ -706,16 +713,12 @@ non_reset_marker_should_not_affect_muclight_inbox(Config) -> AliceJid = inbox_helper:to_bare_lower(Alice), BobJid = inbox_helper:to_bare_lower(Bob), KateJid = inbox_helper:to_bare_lower(Kate), - Room = inbox_helper:create_room(Alice, [Bob, Kate]), - RoomJid = room_bin_jid(Room), - AliceRoomJid = <>, - Msg = <<"marker time!">>, - Users = [Alice, Bob, Kate], % %% WHEN DONE - Stanza1 = escalus_stanza:set_id( - escalus_stanza:groupchat_to(RoomJid, Msg), escalus_stanza:id()), - escalus:send(Alice, Stanza1), - inbox_helper:wait_for_groupchat_msg(Users), + Id = <<"some_ID">>, + Msg = <<"marker time!">>, + RoomName = pubsub_tools:pubsub_node_name(), + RoomJid = inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, Id), + AliceRoomJid = <>, % %% AND MARKED WRONG inbox_helper:mark_last_muclight_message(Bob, [Alice, Bob, Kate], <<"received">>), inbox_helper:mark_last_muclight_message(Kate, [Alice, Bob, Kate], <<"acknowledged">>), @@ -734,20 +737,15 @@ non_reset_marker_should_not_affect_muclight_inbox(Config) -> groupchat_reset_stanza_resets_inbox(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> % %% WITH + Id = <<"some_ID">>, + Msg = <<"marker time!">>, + RoomName = pubsub_tools:pubsub_node_name(), AliceJid = inbox_helper:to_bare_lower(Alice), BobJid = inbox_helper:to_bare_lower(Bob), KateJid = inbox_helper:to_bare_lower(Kate), - Room = inbox_helper:create_room(Alice, [Bob, Kate]), - RoomJid = room_bin_jid(Room), - AliceRoomJid = <>, % %% WHEN A MESSAGE IS SENT - MsgStanza = escalus_stanza:set_id( - escalus_stanza:groupchat_to(RoomJid, <<"marker time!">>), <<"some_ID">>), - escalus:send(Alice, MsgStanza), - inbox_helper:wait_for_groupchat_msg([Alice, Bob, Kate]), - % verify that Bob has the message on inbox - check_inbox(Bob, [#conv{unread = 1, from = AliceRoomJid, - to = BobJid, content = <<"marker time!">>}]), + RoomJid = inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, Id), + AliceRoomJid = <>, % %% AND WHEN SEND RESET FOR ROOM inbox_helper:reset_inbox(Bob, RoomJid), % %% THEN INBOX IS RESET FOR BOB, WITHOUT FORWARDING @@ -774,7 +772,6 @@ create_groupchat(Config) -> inbox_helper:create_room_and_check_inbox(Bob, [Alice, Kate], RoomName) end). - %% this test combines options: %% ... %%{aff_changes, false}, @@ -810,33 +807,30 @@ create_groupchat_no_affiliation_stored(Config) -> check_inbox(Kate, [#conv{unread = 1, from = AliceRoomJid, to = KateJid, content = Msg}]) end). - %% this test combines options: %% ... %%{aff_changes, true}, %%{remove_on_kicked, true}, leave_and_remove_conversation(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> - Msg = <<"Hi Room!">>, - Id = <<"MyID">>, - Users = [Alice, Bob, Kate], + RoomName = pubsub_tools:pubsub_node_name(), AliceJid = inbox_helper:to_bare_lower(Alice), + BobJid = inbox_helper:to_bare_lower(Bob), KateJid = inbox_helper:to_bare_lower(Kate), - Room = inbox_helper:create_room(Alice, [Bob, Kate]), - RoomJid = room_bin_jid(Room), + %% Alice creates a room and send msg + Msg = <<"Hi all">>, + Id = <<"leave-id">>, + RoomJid = inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, Id), AliceRoomJid = <>, - Stanza = escalus_stanza:set_id( - escalus_stanza:groupchat_to(RoomJid, Msg), Id), - %% Alice sends msg to room - escalus:send(Alice, Stanza), - inbox_helper:wait_for_groupchat_msg(Users), %% Bob leaves the room - muc_light_helper:user_leave(Room, Bob, [{Alice, owner}, {Kate, member}]), + muc_light_helper:user_leave(RoomName, Bob, [{Alice, owner}, {Kate, member}]), %% Alice and Kate have one message check_inbox(Alice, [#conv{unread = 0, from = AliceRoomJid, to = AliceJid, content = Msg}]), check_inbox(Kate, [#conv{unread = 1, from = AliceRoomJid, to = KateJid, content = Msg}]), %% Bob doesn't have conversation in his inbox - check_inbox(Bob, []) + check_inbox(Bob, [], #{box => inbox}), + if_async_check_bin(Config, Bob, [#conv{unread = 2, from = RoomJid, to = BobJid, + verify = fun inbox_helper:verify_is_none_aff_change/2}]) end). %% this test combines options: @@ -845,16 +839,15 @@ leave_and_remove_conversation(Config) -> %%{remove_on_kicked, false}, leave_and_store_conversation(Config) -> escalus:story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> - RoomName = <<"kicking-room">>, + RoomName = pubsub_tools:pubsub_node_name(), AliceJid = inbox_helper:to_bare_lower(Alice), BobJid = inbox_helper:to_bare_lower(Bob), KateJid = inbox_helper:to_bare_lower(Kate), - RoomJid = room_bin_jid(RoomName), - AliceRoomJid = <>, - Msg = <<"Hi all">>, %% Alice creates a room and send msg - inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, - Msg, <<"leave-id">>), + Msg = <<"Hi all">>, + Id = <<"leave-id">>, + RoomJid = inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, Id), + AliceRoomJid = <>, %% Bob leaves room muc_light_helper:user_leave(RoomName, Bob, [{Alice, owner}, {Kate, member}]), %% Alice and Kate have conversation @@ -873,6 +866,7 @@ leave_and_store_conversation(Config) -> no_aff_stored_and_remove_on_kicked(Config) -> escalus:story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> AliceJid = inbox_helper:to_bare_lower(Alice), + BobJid = inbox_helper:to_bare_lower(Bob), KateJid = inbox_helper:to_bare_lower(Kate), RoomJid = room_bin_jid(?ROOM3), AliceRoomJid = <>, @@ -889,10 +883,10 @@ no_aff_stored_and_remove_on_kicked(Config) -> check_inbox(Alice, [#conv{unread = 0, from = AliceRoomJid, to = AliceJid, content = Msg}]), check_inbox(Kate, [#conv{unread = 1, from = AliceRoomJid, to = KateJid, content = Msg}]), %% Bob doesnt have a conversation in inbox - check_inbox(Bob, []) + check_inbox(Bob, [], #{box => inbox}), + if_async_check_bin(Config, Bob, [#conv{unread = 1, from = AliceRoomJid, to = BobJid, content = Msg}]) end). - %% this test combines options: %% ... %%{aff_changes, true}, @@ -917,21 +911,20 @@ no_stored_and_remain_after_kicked(Config) -> check_inbox(Alice, [#conv{unread = 0, from = AliceRoomJid, to = AliceJid, content = Msg}]), check_inbox(Kate, [#conv{unread = 1, from = AliceRoomJid, to = KateJid, content = Msg}]), %% Bob have a conversation in inbox. First unread is message from Alice, the second the affiliation change - check_inbox(Bob, [#conv{ unread = 2, from = RoomJid, to = BobJid, - verify = fun inbox_helper:verify_is_none_aff_change/2}]) + check_inbox(Bob, [#conv{unread = 2, from = RoomJid, to = BobJid, + verify = fun inbox_helper:verify_is_none_aff_change/2}]) end). - groupchat_markers_one_reset_room_created(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + Id = <<"random-id">>, Msg = <<"Welcome guys">>, - RoomName = <<"markers_room">>, - RoomJid = room_bin_jid(RoomName), + RoomName = pubsub_tools:pubsub_node_name(), AliceJid = inbox_helper:to_bare_lower(Alice), - AliceRoomJid = <>, BobJid = inbox_helper:to_bare_lower(Bob), KateJid = inbox_helper:to_bare_lower(Kate), - inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, <<"1-id">>), + RoomJid = inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, Id), + AliceRoomJid = <>, %% Now Bob sends marker inbox_helper:mark_last_muclight_message(Bob, [Alice, Bob, Kate]), %% The crew ask for inbox second time. Only Kate has unread messages @@ -942,12 +935,12 @@ groupchat_markers_one_reset_room_created(Config) -> groupchat_markers_all_reset_room_created(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> - RoomName = <<"markers_room2">>, + Id = <<"random-id">>, + Msg = <<"Mark me!">>, + RoomName = pubsub_tools:pubsub_node_name(), AliceJid = inbox_helper:to_bare_lower(Alice), - RoomJid = room_bin_jid(RoomName), + RoomJid = inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, Id), AliceRoomJid = <>, - Msg = <<"Mark me!">>, - inbox_helper:create_room_send_msg_check_inbox(Alice, [Bob, Kate], RoomName, Msg, <<"2-id">>), [inbox_helper:mark_last_muclight_message(U, [Alice, Bob, Kate]) || U <- [Bob, Kate]], inbox_helper:foreach_check_inbox([Bob, Kate, Alice], 0, AliceRoomJid, Msg) end). @@ -1307,6 +1300,85 @@ get_with_end_timestamp(Config) -> end). +%% Bin flushes tests +bin_is_not_included_by_default(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + RoomName = create_room_and_make_users_leave(Alice, Bob, Kate), + RoomJid = room_bin_jid(RoomName), + BobJid = inbox_helper:to_bare_lower(Bob), + Convs = [#conv{unread = 1, from = RoomJid, to = BobJid, + verify = fun inbox_helper:verify_is_none_aff_change/2}], + %% Fetching all does include it + check_inbox(Bob, Convs, #{box => all}), + %% Fetching without explicit box name skips the bin + check_inbox(Bob, [], #{}) + end). + +rest_api_bin_flush_user(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + create_room_and_make_users_leave(Alice, Bob, Kate), + %% It is not in his bin anymore after triggering a bin flush + BobName = escalus_utils:get_username(Bob), + BobDomain = escalus_utils:get_server(Bob), + Path = <<"/inbox", "/", (BobDomain)/binary, "/", (BobName)/binary, "/0/bin">>, + {{<<"200">>, <<"OK">>}, NumOfRows} = rest_helper:delete(admin, Path), + ?assertEqual(1, NumOfRows), + check_inbox(Bob, [], #{box => bin}) + end). + +rest_api_bin_flush_all(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + create_room_and_make_users_leave(Alice, Bob, Kate), + %% It is not in any bin anymore after triggering a bin flush + HostTypePath = uri_string:normalize(#{path => domain_helper:host_type()}), + Path = <<"/inbox/", HostTypePath/binary, "/0/bin">>, + {{<<"200">>, <<"OK">>}, NumOfRows} = rest_helper:delete(admin, Path), + ?assertEqual(2, NumOfRows), + check_inbox(Bob, [], #{box => bin}), + check_inbox(Kate, [], #{box => bin}) + end). + +timeout_cleaner_flush_all(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + create_room_and_make_users_leave(Alice, Bob, Kate), + %% It is eventually not in any bin thanks to the periodic cleanouts + check_inbox(Bob, [], #{box => bin}), + check_inbox(Kate, [], #{box => bin}) + end). + +xmpp_bin_flush(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + create_room_and_make_users_leave(Alice, Bob, Kate), + %% It is eventually not in any bin thanks to the periodic cleanouts + %% Bob requests flush through xmpp + Iq = escalus_stanza:iq(<<"set">>, + [#xmlel{name = <<"empty-bin">>, + attrs = [{<<"xmlns">>, inbox_helper:inbox_ns()}], + children = []}]), + escalus:send(Bob, Iq), + escalus:assert(is_iq_result, [Iq], escalus:wait_for_stanza(Bob)), + check_inbox(Bob, [], #{box => bin}) + end). + + +%% helpers +create_room_and_make_users_leave(Alice, Bob, Kate) -> + RoomName = pubsub_tools:pubsub_node_name(), + inbox_helper:create_room_and_check_inbox(Alice, [Bob, Kate], RoomName), + %% Bob leaves the room + muc_light_helper:user_leave(RoomName, Bob, [{Alice, owner}, {Kate, member}]), + muc_light_helper:user_leave(RoomName, Kate, [{Alice, owner}]), + %% Bob doesn't have conversation in his inbox, nor Kate + check_inbox(Bob, [], #{box => inbox}), + check_inbox(Kate, [], #{box => inbox}), + RoomName. + +if_async_check_bin(Config, Bob, Convs) -> + case maps:get(backend, ?config(inbox_opts, Config), rdbms) of + rdbms -> ok; + rdbms_async -> + check_inbox(Bob, Convs, #{box => bin}) + end. start_hook_listener() -> TestCasePid = self(), diff --git a/big_tests/tests/inbox_extensions_SUITE.erl b/big_tests/tests/inbox_extensions_SUITE.erl index fa9c19ca38..fb55b005d7 100644 --- a/big_tests/tests/inbox_extensions_SUITE.erl +++ b/big_tests/tests/inbox_extensions_SUITE.erl @@ -78,6 +78,7 @@ groups() -> box_full_archive_can_be_fetched_queryid, box_and_archive_box_has_preference, box_other_does_get_fetched, + box_all_full_fetch, % archive archive_active_entry_gets_archived, archive_archived_entry_gets_active_on_request, @@ -417,6 +418,16 @@ box_other_does_get_fetched(Config) -> inbox_helper:check_inbox(Bob, [], #{box => archive}) end). +box_all_full_fetch(Config) -> + escalus:fresh_story(Config, [{alice, 1}, {bob, 1}, {kate, 1}], fun(Alice, Bob, Kate) -> + % Alice sends a message to Bob and Kate + #{ Alice := AliceConvs } = inbox_helper:given_conversations_between(Alice, [Bob, Kate]), + inbox_helper:check_inbox(Alice, AliceConvs), + set_inbox_properties(Alice, Bob, [{box, archive}]), + set_inbox_properties(Alice, Kate, [{box, other}]), + inbox_helper:check_inbox(Alice, AliceConvs, #{box => all}) + end). + % archive archive_active_entry_gets_archived(Config) -> escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice, Bob) -> diff --git a/big_tests/tests/inbox_helper.erl b/big_tests/tests/inbox_helper.erl index 2111889aa4..32582acedb 100644 --- a/big_tests/tests/inbox_helper.erl +++ b/big_tests/tests/inbox_helper.erl @@ -124,11 +124,13 @@ inbox_opts() -> config_parser_helper:default_mod_config(mod_inbox). inbox_opts(regular) -> - (inbox_opts())#{boxes => [<<"inbox">>, <<"archive">>, <<"other">>]}; + DefOps = #{boxes := Boxes} = inbox_opts(), + DefOps#{boxes := Boxes ++ [<<"other">>]}; inbox_opts(async_pools) -> - (inbox_opts())#{backend => rdbms_async, - async_writer => #{pool_size => 4}, - boxes => [<<"inbox">>, <<"archive">>, <<"other">>]}. + DefOps = #{boxes := Boxes} = inbox_opts(), + DefOps#{backend => rdbms_async, + async_writer => #{pool_size => 1}, + boxes => Boxes ++ [<<"other">>]}. skip_or_run_inbox_tests(TestCases) -> case (not ct_helper:is_ct_running()) @@ -148,6 +150,8 @@ maybe_run_in_parallel(Gs) -> insert_parallels(Gs) -> Fun = fun({muclight_config, Conf, Tests}) -> {muclight_config, Conf, Tests}; + ({bin, Conf, Tests}) -> + {bin, Conf, Tests}; ({regular, Conf, Tests}) -> {regular, Conf, Tests}; ({async_pools, Conf, Tests}) -> @@ -159,7 +163,8 @@ insert_parallels(Gs) -> inbox_modules(Backend) -> [ - {mod_inbox, inbox_opts(Backend)} + {mod_inbox, inbox_opts(Backend)}, + {mod_inbox_commands, #{}} ]. muclight_modules() -> @@ -689,7 +694,8 @@ create_room_send_msg_check_inbox(Owner, MemberList, RoomName, Msg, Id) -> OwnerRoomJid = <>, %% Owner sent the message so he has unread set to 0 check_inbox(Owner, [#conv{unread = 0, from = OwnerRoomJid, to = OwnerJid, content = Msg}]), - foreach_check_inbox(MemberList, 1, OwnerRoomJid, Msg). + foreach_check_inbox(MemberList, 1, OwnerRoomJid, Msg), + RoomJid. verify_is_owner_aff_change(Client, Msg) -> verify_muc_light_aff_msg(Msg, [{Client, owner}]). diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index edc10bd615..14c28116c0 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -170,7 +170,8 @@ init_per_testcase(CaseName, Config) -> end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; CN =:= gc_repeat_after_never_means_no_cleaning; CN =:= gc_repeat_after_timeout_does_clean -> - rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]), + Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]), + rpc(mim(), ejabberd_sup, stop_child, [Name]), escalus:end_per_testcase(CN, Config); end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> unregister_handler(), diff --git a/doc/configuration/Modules.md b/doc/configuration/Modules.md index d84e64d404..e43136acb7 100644 --- a/doc/configuration/Modules.md +++ b/doc/configuration/Modules.md @@ -106,6 +106,9 @@ Implements [XEP-0363: HTTP File Upload](https://xmpp.org/extensions/xep-0363.htm ### [mod_inbox](../modules/mod_inbox.md) Implements custom inbox XEP +### [mod_inbox_commands](../modules/mod_inbox_commands.md) +Exposes administrative commands for the [inbox](../modules/mod_inbox.md) + ### [mod_global_distrib](../modules/mod_global_distrib.md) Enables sharing a single XMPP domain between distinct datacenters (**experimental**). diff --git a/doc/modules/mod_inbox.md b/doc/modules/mod_inbox.md index a92e319468..45670a7fd1 100644 --- a/doc/modules/mod_inbox.md +++ b/doc/modules/mod_inbox.md @@ -30,7 +30,23 @@ Number of workers in the pool. More than the number of available schedulers is r A list of supported inbox boxes by the server. This can be used by clients to classify their inbox entries in any way that fits the end-user. The strings provided here will be used verbatim in the IQ query as described in [Inbox – Filtering and Ordering](../open-extensions/inbox.md#filtering-and-ordering). !!! note - `inbox` and `archive` are always enabled, and therefore don't need to be specified here. + `inbox`, `archive`, and `bin` are reserved box names and are always enabled, therefore they don't need to –and must not– be specified in this section. `all` has a special meaning in the box query and therefore is also not allowed as a box name. + + If the asynchronous backend is configured, automatic removals become moves to the `bin` box, also called "Trash bin". This is to ensure eventual consistency. Then the bin can be emptied, either on a [user request](../open-extensions/inbox.md#examples-emptying-the-trash-bin), or through an [admin API endpoint](../mod_inbox_commands#admin-endpoint). + +#### `modules.mod_inbox.bin_ttl` +* **Syntax:** non-negative integer, expressed in days. +* **Default:** `30` +* **Example:** `modules.mod_inbox.bin_ttl = 7` + +How old entries in the bin can be before the automatic bin cleaner collects them. A value of `7` would mean that entries that have been in the bin for more than 7 days will be cleaned on the next bin collection. + +#### `modules.mod_inbox.bin_clean_after` +* **Syntax:** non-negative integer, expressed in hours +* **Default:** `1` +* **Example:** `modules.mod_inbox.bin_clean_after = 24` + +How often the automatic garbage collection runs over the bin. ### `modules.mod_inbox.reset_markers` * **Syntax:** array of strings, out of `"displayed"`, `"received"`, `"acknowledged"` @@ -93,6 +109,7 @@ or [affiliation](https://xmpp.org/extensions/xep-0045.html#affil) change. ```toml [modules.mod_inbox] + backend = "rdbms_async" reset_markers = ["displayed"] aff_changes = true remove_on_kicked = true diff --git a/doc/modules/mod_inbox_commands.md b/doc/modules/mod_inbox_commands.md new file mode 100644 index 0000000000..18713594bd --- /dev/null +++ b/doc/modules/mod_inbox_commands.md @@ -0,0 +1,47 @@ +## Configuration +This module contains command definitions which are loaded when the module is activated. +There are no options to be provided, therefore the following entry in the config file is sufficient: + +```toml +[modules.mod_inbox_commands] +``` + +## Admin endpoint + +### Bin flush for a user +To clean the bin for a given user, the following admin API request can be triggered: + +```http +DELETE /api/inbox////bin, +``` +where `` and `` are the domain and name parts of the user's jid, respectively, and `` is the required number of days for an entry to be considered old enough to be removed, zero allowed (which clears all). + +The result would be a `200` with the number of rows that were removed as the body, or a corresponding error. For example, if only one entry was cleaned: +```http +HTTP/1.1 200 OK +server: Cowboy, +date: Wed, 30 Mar 2022 14:06:20 GMT, +content-type: application/json, +content-length: 1 + +1 +``` + +### Global bin flush +If all the bins were desired to be cleared, the following API can be used instead: + +```http +DELETE /api/inbox///bin, +``` +where as before, `` is the required number of days for an entry to be considered old enough to be removed, and `` is the host type where inbox is configured. + +The result would look analogously: +```http +HTTP/1.1 200 OK +server: Cowboy, +date: Wed, 30 Mar 2022 14:06:20 GMT, +content-type: application/json, +content-length: 1 + +42 +``` diff --git a/doc/open-extensions/inbox.md b/doc/open-extensions/inbox.md index 5d13df375d..2160482863 100644 --- a/doc/open-extensions/inbox.md +++ b/doc/open-extensions/inbox.md @@ -59,6 +59,7 @@ The inbox is fetched using regular XMPP [Data Forms]. To request the supported f + @@ -120,7 +121,7 @@ A client may specify the following parameters: * variable `end`: End date for the result set (value: ISO timestamp) * variable `order`: Order by timestamp (values: `asc`, `desc`) * variable `hidden_read`: Show only conversations with unread messages (values: `true`, `false`) -* variable `box`: Indicate which box is desired. Supported are `all`, `inbox`, and `archive`. More boxes can be implemented, see [mod_inbox – Boxes](../modules/mod_inbox.md#modulesmod_inboxboxes) +* variable `box`: Indicate which box is desired. Supported are `all`, `inbox`, `archive` and `bin`. More boxes can be implemented, see [mod_inbox – Boxes](../modules/mod_inbox.md#modulesmod_inboxboxes). If not provided, all except the bin are returned. * variable `archive` [deprecated, prefer `box`]: whether to query the archive inbox. `true` means querying only the archive box, `false` means querying only the active box. If the flag is not set, it is assumed all entries are requested. This is kept for backwards compatibility reasons, use the `box` flag instead. They are encoded inside a standard XMPP [Data Forms] format. @@ -152,11 +153,11 @@ where `Max` is a non-negative integer. Given an entry, certain properties are defined for such an entry: ### Box -Clients usually have two different boxes for the inbox: the regular one, simply called the inbox (or the active inbox), and an archive box, where clients can manually throw conversations they don't want displayed in the default UI. +Clients usually have two different boxes for the inbox: the regular one, simply called the inbox (or the active inbox), and an archive box, where clients can manually throw conversations they don't want displayed in the default UI. A third box is the trash bin, where deleted entries go and are cleaned up in regular intervals. It is expected that entries will reside in the archive until they're either manually moved back to the active box, or they receive a new message: in such case the entry should jump back to the active box automatically. -More boxes can be implemented, see [mod_inbox#boxes](../modules/mod_inbox.md#modulesmod_inboxboxes). Movement between boxes can be achieved through the right XMPP IQ, no automatic movements are developed as in the case of inbox-archive. +More boxes can be implemented, see [mod_inbox#boxes](../modules/mod_inbox.md#modulesmod_inboxboxes). Movement between boxes can be achieved through the right XMPP IQ, no more automatic movements are developed as in the case of inbox-archive. ### Read Entries keep a count of unread messages that is incremented automatically upon receiving a new message, and (in the current implementation) set to zero upon receiving either a message by one-self, or an appropriate chat marker as defined in [XEP-0333](https://xmpp.org/extensions/xep-0333.html) (which markers reset the count is a matter of configuration, see [doc](../modules/mod_inbox.md#modulesmod_inboxreset_markers)). @@ -187,6 +188,12 @@ The server would respond with: + + + + + + @@ -313,6 +320,23 @@ If the client had sent an invalid number (negative, or NaN), the server would an ``` +### Examples: emptying the trash bin +A user can empty his trash bin, through the following request: +```xml + + + +``` +On success, the server would return how many entries where dropped as in: +```xml + + + 2 + + +``` +The server might answer with a corresponding error message, might anything go wrong. + ### Examples: muting an entry To mute an entry for a full day (86400 seconds in a day, 604800 in a week, for example), a client can send: ```xml @@ -388,7 +412,7 @@ And similarly, to set a conversation as unread: ``` -### Deprecated entry stanza: +### Deprecated reset entry stanza: You can reset the inbox with the following stanza: ```xml diff --git a/include/mod_inbox.hrl b/include/mod_inbox.hrl index d236aced6c..710596b1a6 100644 --- a/include/mod_inbox.hrl +++ b/include/mod_inbox.hrl @@ -1,5 +1,3 @@ --type content() :: binary(). - -type id() :: binary(). -type marker() :: binary(). diff --git a/mkdocs.yml b/mkdocs.yml index 796818f28b..491a592391 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -112,6 +112,7 @@ nav: - 'mod_global_distrib': 'modules/mod_global_distrib.md' - 'mod_http_upload': 'modules/mod_http_upload.md' - 'mod_inbox': 'modules/mod_inbox.md' + - 'mod_inbox_commands': 'modules/mod_inbox_commands.md' - 'mod_jingle_sip': 'modules/mod_jingle_sip.md' - 'mod_keystore': 'modules/mod_keystore.md' - 'mod_last': 'modules/mod_last.md' diff --git a/priv/mssql2012.sql b/priv/mssql2012.sql index 70e95ac333..668de8abae 100644 --- a/priv/mssql2012.sql +++ b/priv/mssql2012.sql @@ -518,6 +518,12 @@ GO CREATE INDEX i_inbox_su_ts ON inbox(lserver, luser, timestamp); GO +CREATE INDEX i_inbox_us_box ON inbox(lserver, luser, box); +GO + +CREATE INDEX i_inbox_box ON inbox(box); +GO + CREATE TABLE dbo.pubsub_nodes ( nidx BIGINT IDENTITY(1,1) PRIMARY KEY, p_key NVARCHAR(150) NOT NULL, diff --git a/priv/mysql.sql b/priv/mysql.sql index 811ce9132a..41d8489bda 100644 --- a/priv/mysql.sql +++ b/priv/mysql.sql @@ -398,6 +398,8 @@ CREATE TABLE inbox ( PRIMARY KEY(lserver, luser, remote_bare_jid)); CREATE INDEX i_inbox USING BTREE ON inbox(lserver, luser, timestamp); +CREATE INDEX i_inbox_us_box USING BTREE ON inbox(lserver, luser, box); +CREATE INDEX i_inbox_box USING BTREE ON inbox(box); CREATE TABLE pubsub_nodes ( nidx BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, diff --git a/priv/pg.sql b/priv/pg.sql index 7046a61667..23f8ba5c9d 100644 --- a/priv/pg.sql +++ b/priv/pg.sql @@ -357,9 +357,9 @@ CREATE TABLE inbox ( unread_count INT NOT NULL, PRIMARY KEY(lserver, luser, remote_bare_jid)); -CREATE INDEX i_inbox_timestamp - ON inbox - USING BTREE(lserver, luser, timestamp); +CREATE INDEX i_inbox_timestamp ON inbox USING BTREE(lserver, luser, timestamp); +CREATE INDEX i_inbox_us_box ON inbox USING BTREE(lserver, luser, box); +CREATE INDEX i_inbox_box ON inbox (box) WHERE (box = 'bin'); CREATE TABLE pubsub_nodes ( nidx BIGSERIAL PRIMARY KEY, diff --git a/src/inbox/mod_inbox.erl b/src/inbox/mod_inbox.erl index e75efe095d..038cbad831 100644 --- a/src/inbox/mod_inbox.erl +++ b/src/inbox/mod_inbox.erl @@ -31,7 +31,7 @@ ]). -ignore_xref([ - behaviour_info/1, disco_local_features/1, filter_local_packet/1, get_personal_data/3, + disco_local_features/1, filter_local_packet/1, get_personal_data/3, inbox_unread_count/2, remove_domain/3, remove_user/3, user_send_packet/4 ]). @@ -94,18 +94,20 @@ start(HostType, #{iqdisc := IQDisc, groupchat := MucTypes} = Opts) -> fun ?MODULE:process_iq/5, #{}, IQDisc), gen_iq_handler:add_iq_handler_for_domain(HostType, ?NS_ESL_INBOX_CONVERSATION, ejabberd_sm, fun mod_inbox_entries:process_iq_conversation/5, #{}, IQDisc), + start_cleaner(HostType, Opts), ok. -spec stop(HostType :: mongooseim:host_type()) -> ok. stop(HostType) -> + gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_INBOX, ejabberd_sm), + gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_INBOX_CONVERSATION, ejabberd_sm), + ejabberd_hooks:delete(hooks(HostType)), + stop_cleaner(HostType), mod_inbox_muc:stop(HostType), case mongoose_config:get_opt([{modules, HostType}, ?MODULE, backend]) of rdbms_async -> mod_inbox_rdbms_async:stop(HostType); _ -> ok - end, - ejabberd_hooks:delete(hooks(HostType)), - gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_INBOX, ejabberd_sm), - gen_iq_handler:remove_iq_handler_for_domain(HostType, ?NS_ESL_INBOX_CONVERSATION, ejabberd_sm). + end. -spec supported_features() -> [atom()]. supported_features() -> @@ -123,6 +125,9 @@ config_spec() -> validate = {enum, [muc, muclight]}}}, <<"boxes">> => #list{items = #option{type = binary, validate = non_empty}, validate = unique}, + <<"bin_ttl">> => #option{type = integer, validate = non_negative}, + <<"bin_clean_after">> => #option{type = integer, validate = non_negative, + process = fun timer:hours/1}, <<"aff_changes">> => #option{type = boolean}, <<"remove_on_kicked">> => #option{type = boolean}, <<"iqdisc">> => mongoose_config_spec:iqdisc() @@ -130,6 +135,8 @@ config_spec() -> defaults = #{<<"backend">> => rdbms, <<"groupchat">> => [muclight], <<"boxes">> => [], + <<"bin_ttl">> => 30, % 30 days + <<"bin_clean_after">> => timer:hours(1), <<"aff_changes">> => true, <<"remove_on_kicked">> => true, <<"reset_markers">> => [<<"displayed">>], @@ -148,10 +155,28 @@ async_config_spec() -> }. process_inbox_boxes(Config = #{boxes := Boxes}) -> - false = lists:any(fun(Name) -> Name =:= <<"inbox">> orelse Name =:= <<"archive">> end, Boxes), - AllBoxes = [<<"inbox">>, <<"archive">> | Boxes ], + false = lists:any(fun(<<"all">>) -> true; + (<<"inbox">>) -> true; + (<<"archive">>) -> true; + (<<"bin">>) -> true; + (_) -> false + end, Boxes), + AllBoxes = [<<"inbox">>, <<"archive">>, <<"bin">> | Boxes ], Config#{boxes := AllBoxes}. +%% Cleaner gen_server callbacks +start_cleaner(HostType, #{bin_ttl := TTL, bin_clean_after := Interval}) -> + Name = gen_mod:get_module_proc(HostType, ?MODULE), + WOpts = #{host_type => HostType, action => fun mod_inbox_commands:flush_global_bin/2, + opts => TTL, interval => Interval}, + MFA = {mongoose_collector, start_link, [Name, WOpts]}, + ChildSpec = {Name, MFA, permanent, 5000, worker, [?MODULE]}, + ejabberd_sup:start_child(ChildSpec). + +stop_cleaner(HostType) -> + Name = gen_mod:get_module_proc(HostType, ?MODULE), + ejabberd_sup:stop_child(Name). + %%%%%%%%%%%%%%%%%%% %% Process IQ -spec process_iq(Acc :: mongoose_acc:t(), @@ -163,6 +188,12 @@ process_iq(Acc, _From, _To, #iq{type = get, sub_el = SubEl} = IQ, #{host_type := Form = build_inbox_form(HostType), SubElWithForm = SubEl#xmlel{ children = [Form] }, {Acc, IQ#iq{type = result, sub_el = SubElWithForm}}; +process_iq(Acc, #jid{luser = LUser, lserver = LServer}, + _To, #iq{type = set, sub_el = #xmlel{name = <<"empty-bin">>}} = IQ, + #{host_type := HostType}) -> + TS = mongoose_acc:timestamp(Acc), + NumRemRows = mod_inbox_backend:empty_user_bin(HostType, LServer, LUser, TS), + {Acc, IQ#iq{type = result, sub_el = [build_empty_bin(NumRemRows)]}}; process_iq(Acc, From, _To, #iq{type = set, sub_el = QueryEl} = IQ, _Extra) -> HostType = mongoose_acc:host_type(Acc), LUser = From#jid.luser, @@ -312,6 +343,12 @@ process_message(HostType, From, To, Message, _TS, Dir, Type) -> %%%%%%%%%%%%%%%%%%% %% Stanza builders +build_empty_bin(Num) -> + #xmlel{name = <<"empty-bin">>, + attrs = [{<<"xmlns">>, ?NS_ESL_INBOX}], + children = [#xmlel{name = <<"num">>, + children = [#xmlcdata{content = integer_to_binary(Num)}]}]}. + -spec build_inbox_message(mongoose_acc:t(), inbox_res(), jlib:iq()) -> exml:element(). build_inbox_message(Acc, InboxRes, IQ) -> #xmlel{name = <<"message">>, attrs = [{<<"id">>, mongoose_bin:gen_from_timestamp()}], @@ -345,7 +382,7 @@ build_result_iq(List) -> %% iq-get -spec build_inbox_form(mongooseim:host_type()) -> exml:element(). build_inbox_form(HostType) -> - AllBoxes = all_valid_boxes_for_query(HostType), + AllBoxes = mod_inbox_utils:all_valid_boxes_for_query(HostType), OrderOptions = [ {<<"Ascending by timestamp">>, <<"asc">>}, {<<"Descending by timestamp">>, <<"desc">>} @@ -354,9 +391,9 @@ build_inbox_form(HostType) -> jlib:form_field({<<"FORM_TYPE">>, <<"hidden">>, ?NS_ESL_INBOX}), text_single_form_field(<<"start">>), text_single_form_field(<<"end">>), - list_single_form_field(<<"order">>, <<"desc">>, OrderOptions), text_single_form_field(<<"hidden_read">>, <<"false">>), - list_single_form_field(<<"box">>, <<"all">>, AllBoxes), + mod_inbox_utils:list_single_form_field(<<"order">>, <<"desc">>, OrderOptions), + mod_inbox_utils:list_single_form_field(<<"box">>, <<"all">>, AllBoxes), jlib:form_field({<<"archive">>, <<"boolean">>, <<"false">>}) ], #xmlel{name = <<"x">>, @@ -372,33 +409,6 @@ text_single_form_field(Var, DefaultValue) -> #xmlel{name = <<"field">>, attrs = [{<<"var">>, Var}, {<<"type">>, <<"text-single">>}, {<<"value">>, DefaultValue}]}. --spec list_single_form_field(Var :: binary(), - Default :: binary(), - Options :: [ Option | {Label, Value}]) -> exml:element() when - Option :: binary(), Label :: binary(), Value :: binary(). -list_single_form_field(Var, Default, Options) -> - Value = form_field_value(Default), - #xmlel{ - name = <<"field">>, - attrs = [{<<"var">>, Var}, {<<"type">>, <<"list-single">>}], - children = [Value | [ form_field_option(Option) || Option <- Options ]] - }. - --spec form_field_option(Option | {Label, Value}) -> exml:element() when - Option :: binary(), Label :: binary(), Value :: binary(). -form_field_option({Label, Value}) -> - #xmlel{ - name = <<"option">>, - attrs = [{<<"label">>, Label}], - children = [form_field_value(Value)] - }; -form_field_option(Option) -> - form_field_option({Option, Option}). - --spec form_field_value(Value :: binary()) -> exml:element(). -form_field_value(Value) -> - #xmlel{name = <<"value">>, children = [#xmlcdata{content = Value}]}. - %%%%%%%%%%%%%%%%%%% %% iq-set -spec query_to_params(mongooseim:host_type(), QueryEl :: exml:element()) -> @@ -501,7 +511,7 @@ fields_to_params(HostType, [{<<"box">>, [Value]} | RFields], Acc) -> case validate_box(HostType, Value) of false -> ?LOG_WARNING(#{what => inbox_invalid_form_field, - field => archive, value => Value}), + field => box, value => Value}), {error, bad_request, invalid_field_value(<<"box">>, Value)}; true -> fields_to_params(HostType, RFields, Acc#{ box => Value }) @@ -520,7 +530,7 @@ binary_to_order(<<"asc">>) -> asc; binary_to_order(_) -> error. validate_box(HostType, Box) -> - AllBoxes = all_valid_boxes_for_query(HostType), + AllBoxes = mod_inbox_utils:all_valid_boxes_for_query(HostType), lists:member(Box, AllBoxes). invalid_field_value(Field, Value) -> @@ -582,6 +592,3 @@ inbox_owner_exists(Acc, _, To, incoming, MessageType) -> % filter_local_packet inbox_owner_exists(Acc, From, _, outgoing, _) -> % user_send_packet HostType = mongoose_acc:host_type(Acc), ejabberd_auth:does_user_exist(HostType, From, stored). - -all_valid_boxes_for_query(HostType) -> - [<<"all">> | gen_mod:get_module_opt(HostType, ?MODULE, boxes)]. diff --git a/src/inbox/mod_inbox_backend.erl b/src/inbox/mod_inbox_backend.erl index a5b19c2ede..202d11903e 100644 --- a/src/inbox/mod_inbox_backend.erl +++ b/src/inbox/mod_inbox_backend.erl @@ -10,12 +10,14 @@ remove_domain/2, set_inbox/6, remove_inbox_row/2, + empty_user_bin/4, + empty_global_bin/2, set_inbox_incr_unread/5, get_inbox_unread/2, get_full_entry/2, get_entry_properties/2, set_entry_properties/3, - reset_unread/3]). + reset_unread/4]). -define(MAIN_MODULE, mod_inbox). @@ -42,7 +44,7 @@ mod_inbox:write_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - Content :: content(), + Content :: exml:element(), Count :: integer(), MsgId :: binary(), Timestamp :: integer(). @@ -51,18 +53,29 @@ HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(). +-callback empty_user_bin(HostType, LServer, LUser, TS) -> non_neg_integer() when + HostType :: mongooseim:host_type(), + LServer :: jid:lserver(), + LUser :: jid:luser(), + TS :: integer(). + +-callback empty_global_bin(HostType, TS) -> non_neg_integer() when + HostType :: mongooseim:host_type(), + TS :: integer(). + -callback set_inbox_incr_unread(HostType, InboxEntryKey, Content, MsgId, Timestamp) -> mod_inbox:count_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - Content :: content(), + Content :: exml:element(), MsgId :: binary(), Timestamp :: integer(). --callback reset_unread(HostType, InboxEntryKey, MsgId) -> mod_inbox:write_res() when +-callback reset_unread(HostType, InboxEntryKey, MsgId, TS) -> mod_inbox:write_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - MsgId :: binary(). + MsgId :: binary() | undefined, + TS :: integer(). -callback get_inbox_unread(HostType, InboxEntryKey) -> {ok, integer()} when HostType :: mongooseim:host_type(), @@ -120,7 +133,7 @@ remove_domain(HostType, LServer) -> mod_inbox:write_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - Content :: content(), + Content :: exml:element(), Count :: integer(), MsgId :: binary(), Timestamp :: integer(). @@ -135,23 +148,40 @@ remove_inbox_row(HostType, InboxEntryKey) -> Args = [HostType, InboxEntryKey], mongoose_backend:call_tracked(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). +-spec empty_user_bin(HostType, LServer, LUser, TS) -> non_neg_integer() when + HostType :: mongooseim:host_type(), + LServer :: jid:lserver(), + LUser :: jid:luser(), + TS :: integer(). +empty_user_bin(HostType, LServer, LUser, TS) -> + Args = [HostType, LServer, LUser, TS], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + +-spec empty_global_bin(HostType, TS) -> non_neg_integer() when + HostType :: mongooseim:host_type(), + TS :: integer(). +empty_global_bin(HostType, TS) -> + Args = [HostType, TS], + mongoose_backend:call(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). + -spec set_inbox_incr_unread(HostType, InboxEntryKey, Content, MsgId, Timestamp) -> mod_inbox:count_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - Content :: content(), + Content :: exml:element(), MsgId :: binary(), Timestamp :: integer(). set_inbox_incr_unread(HostType, InboxEntryKey, Content, MsgId, Timestamp) -> Args = [HostType, InboxEntryKey, Content, MsgId, Timestamp], mongoose_backend:call_tracked(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). --spec reset_unread(HostType, InboxEntryKey, MsgId) -> mod_inbox:write_res() when +-spec reset_unread(HostType, InboxEntryKey, MsgId, TS) -> mod_inbox:write_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - MsgId :: binary() | undefined. -reset_unread(HostType, InboxEntryKey, MsgId) -> - Args = [HostType, InboxEntryKey, MsgId], + MsgId :: binary() | undefined, + TS :: integer(). +reset_unread(HostType, InboxEntryKey, MsgId, TS) -> + Args = [HostType, InboxEntryKey, MsgId, TS], mongoose_backend:call_tracked(HostType, ?MAIN_MODULE, ?FUNCTION_NAME, Args). -spec get_inbox_unread(HostType, InboxEntryKey) -> {ok, integer()} when diff --git a/src/inbox/mod_inbox_commands.erl b/src/inbox/mod_inbox_commands.erl new file mode 100644 index 0000000000..8621595e05 --- /dev/null +++ b/src/inbox/mod_inbox_commands.erl @@ -0,0 +1,61 @@ +-module(mod_inbox_commands). + +-behaviour(gen_mod). + +%% gen_mod +-export([start/2, stop/1, supported_features/0]). + +-export([flush_user_bin/3, flush_global_bin/2]). +-ignore_xref([flush_user_bin/3, flush_global_bin/2]). + +%% Initialisation +-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. +start(_, _) -> + mongoose_commands:register(commands()). + +stop(_) -> + mongoose_commands:unregister(commands()). + +-spec supported_features() -> [atom()]. +supported_features() -> + [dynamic_domains]. + +%% Clean commands +commands() -> + [ + [{name, inbox_flush_user_bin}, + {category, <<"inbox">>}, + {subcategory, <<"bin">>}, + {desc, <<"Empty the bin for a user">>}, + {module, ?MODULE}, + {function, flush_user_bin}, + {action, delete}, + {identifiers, [domain, name, since]}, + {args, [{domain, binary}, + {name, binary}, + {since, integer}]}, + {result, {num, integer}}], + [{name, inbox_flush_global_bin}, + {category, <<"inbox">>}, + {subcategory, <<"bin">>}, + {desc, <<"Empty the inbox bin globally">>}, + {module, ?MODULE}, + {function, flush_global_bin}, + {action, delete}, + {identifiers, [host_type, since]}, + {args, [{host_type, binary}, + {since, integer}]}, + {result, {num, integer}}] + ]. + +flush_user_bin(Domain, Name, Days) -> + {LU, LS} = jid:to_lus(jid:make_bare(Name, Domain)), + {ok, HostType} = mongoose_domain_api:get_host_type(LS), + Now = erlang:system_time(microsecond), + FromTS = mod_inbox_utils:calculate_ts_from(Now, Days), + mod_inbox_backend:empty_user_bin(HostType, LS, LU, FromTS). + +flush_global_bin(HostType, Days) -> + Now = erlang:system_time(microsecond), + FromTS = mod_inbox_utils:calculate_ts_from(Now, Days), + mod_inbox_backend:empty_global_bin(HostType, FromTS). diff --git a/src/inbox/mod_inbox_entries.erl b/src/inbox/mod_inbox_entries.erl index 801880f43a..08f987468e 100644 --- a/src/inbox/mod_inbox_entries.erl +++ b/src/inbox/mod_inbox_entries.erl @@ -38,7 +38,8 @@ process_iq_conversation(Acc, From, _To, #iq{type = set, process_iq_conversation_get(Acc, IQ, From, SubEl) -> case mod_inbox_utils:extract_attr_jid(SubEl) of {error, _} -> - Form = build_inbox_entry_form(), + HostType = mongoose_acc:host_type(Acc), + Form = build_inbox_entry_form(HostType), SubElWithForm = SubEl#xmlel{children = [Form]}, {Acc, IQ#iq{type = result, sub_el = SubElWithForm}}; EntryJID -> @@ -53,13 +54,14 @@ maybe_get_full_entry(SubEl) -> _ -> only_properties end. --spec build_inbox_entry_form() -> exml:element(). -build_inbox_entry_form() -> +-spec build_inbox_entry_form(mongooseim:host_type()) -> exml:element(). +build_inbox_entry_form(HostType) -> + AllBoxes = mod_inbox_utils:all_valid_boxes_for_query(HostType), #xmlel{name = <<"x">>, attrs = [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"form">>}], children = [jlib:form_field({<<"FORM_TYPE">>, <<"hidden">>, ?NS_ESL_INBOX_CONVERSATION}), - jlib:form_field({<<"box">>, <<"list-single">>, <<"all">>}), + mod_inbox_utils:list_single_form_field(<<"box">>, <<"all">>, AllBoxes), jlib:form_field({<<"archive">>, <<"boolean">>, <<"false">>}), jlib:form_field({<<"read">>, <<"boolean">>, <<"false">>}), jlib:form_field({<<"mute">>, <<"text-single">>, <<"0">>})]}. @@ -144,8 +146,7 @@ maybe_process_reset_stanza(Acc, From, IQ, ResetStanza) -> end. process_reset_stanza(Acc, From, IQ, _ResetStanza, InterlocutorJID) -> - HostType = mongoose_acc:host_type(Acc), - ok = mod_inbox_utils:reset_unread_count_to_zero(HostType, From, InterlocutorJID), + ok = mod_inbox_utils:reset_unread_count_to_zero(Acc, From, InterlocutorJID), Res = IQ#iq{type = result, sub_el = [#xmlel{name = <<"reset">>, attrs = [{<<"xmlns">>, ?NS_ESL_INBOX_CONVERSATION}], diff --git a/src/inbox/mod_inbox_muc.erl b/src/inbox/mod_inbox_muc.erl index d1aa2bcbde..0af2e8a5f0 100644 --- a/src/inbox/mod_inbox_muc.erl +++ b/src/inbox/mod_inbox_muc.erl @@ -97,8 +97,8 @@ direction(From, To) -> To :: receiver_bare_user_jid(), Packet :: packet(). handle_outgoing_message(HostType, Room, To, Packet) -> - maybe_reset_unread_count(HostType, To, Room, Packet), Acc = mongoose_acc:new(#{location => ?LOCATION, lserver => To#jid.lserver, host_type => HostType}), + maybe_reset_unread_count(HostType, To, Room, Packet, Acc), maybe_write_to_inbox(HostType, To, Room, Packet, Acc, fun write_to_sender_inbox/5). -spec handle_incoming_message(HostType, Room, To, Packet) -> term() when @@ -110,8 +110,8 @@ handle_incoming_message(HostType, Room, To, Packet) -> Acc = mongoose_acc:new(#{location => ?LOCATION, lserver => To#jid.lserver, host_type => HostType}), maybe_write_to_inbox(HostType, Room, To, Packet, Acc, fun write_to_receiver_inbox/5). -maybe_reset_unread_count(HostType, User, Room, Packet) -> - mod_inbox_utils:maybe_reset_unread_count(HostType, User, Room, Packet). +maybe_reset_unread_count(HostType, User, Room, Packet, Acc) -> + mod_inbox_utils:maybe_reset_unread_count(HostType, User, Room, Packet, Acc). maybe_write_to_inbox(HostType, User, Remote, Packet, Acc, WriteF) -> mod_inbox_utils:maybe_write_to_inbox(HostType, User, Remote, Packet, Acc, WriteF). diff --git a/src/inbox/mod_inbox_muclight.erl b/src/inbox/mod_inbox_muclight.erl index 9f905444f0..c7e703d3de 100644 --- a/src/inbox/mod_inbox_muclight.erl +++ b/src/inbox/mod_inbox_muclight.erl @@ -24,8 +24,8 @@ Packet :: exml:element(), Acc :: mongoose_acc:t()) -> mod_inbox:count_res(). -handle_outgoing_message(HostType, User, Room, Packet, _TS) -> - mod_inbox_utils:maybe_reset_unread_count(HostType, User, Room, Packet). +handle_outgoing_message(HostType, User, Room, Packet, Acc) -> + mod_inbox_utils:maybe_reset_unread_count(HostType, User, Room, Packet, Acc). -spec handle_incoming_message(HostType :: mongooseim:host_type(), RoomUser :: jid:jid(), diff --git a/src/inbox/mod_inbox_one2one.erl b/src/inbox/mod_inbox_one2one.erl index b713ad295c..872632964f 100644 --- a/src/inbox/mod_inbox_one2one.erl +++ b/src/inbox/mod_inbox_one2one.erl @@ -16,7 +16,7 @@ Acc :: mongoose_acc:t()) -> mod_inbox:count_res(). handle_outgoing_message(HostType, User, Remote, Packet, Acc) -> - mod_inbox_utils:maybe_reset_unread_count(HostType, User, Remote, Packet), + mod_inbox_utils:maybe_reset_unread_count(HostType, User, Remote, Packet, Acc), mod_inbox_utils:maybe_write_to_inbox( HostType, User, Remote, Packet, Acc, fun mod_inbox_utils:write_to_sender_inbox/5). diff --git a/src/inbox/mod_inbox_rdbms.erl b/src/inbox/mod_inbox_rdbms.erl index b01f193d18..6e7ea11a74 100644 --- a/src/inbox/mod_inbox_rdbms.erl +++ b/src/inbox/mod_inbox_rdbms.erl @@ -16,7 +16,9 @@ init/2, set_inbox/6, set_inbox_incr_unread/5, - reset_unread/3, + reset_unread/4, + empty_user_bin/4, + empty_global_bin/2, remove_inbox_row/2, remove_domain/2, clear_inbox/3, @@ -43,13 +45,19 @@ init(HostType, _Options) -> [luser, lserver, remote_bare_jid], <<"SELECT box, muted_until, unread_count FROM inbox ", RowCond/binary>>), mongoose_rdbms:prepare(inbox_reset_unread, inbox, - [luser, lserver, remote_bare_jid], - <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary>>), + [luser, lserver, remote_bare_jid, timestamp], + <<"UPDATE inbox SET unread_count = 0 ", + RowCond/binary, " AND timestamp <= ?">>), mongoose_rdbms:prepare(inbox_reset_unread_msg, inbox, - [luser, lserver, remote_bare_jid, msg_id], + [luser, lserver, remote_bare_jid, msg_id, timestamp], <<"UPDATE inbox SET unread_count = 0 ", RowCond/binary, - " AND msg_id = ?">>), + " AND msg_id = ? AND timestamp <= ?">>), % removals + mongoose_rdbms:prepare(inbox_clean_global_bin, inbox, [timestamp], + <<"DELETE FROM inbox WHERE box='bin' AND timestamp < ?">>), + mongoose_rdbms:prepare(inbox_clean_user_bin, inbox, [lserver, luser, timestamp], + <<"DELETE FROM inbox WHERE", + " lserver = ? AND luser = ? AND box='bin' AND timestamp < ?">>), mongoose_rdbms:prepare(inbox_delete_row, inbox, [luser, lserver, remote_bare_jid], <<"DELETE FROM inbox ", RowCond/binary>>), @@ -60,11 +68,11 @@ init(HostType, _Options) -> [lserver], <<"DELETE FROM inbox WHERE lserver = ?">>), % upserts UniqueKeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>], - InsertFields = UniqueKeyFields ++ [<<"msg_id">>, <<"content">>, <<"unread_count">>, <<"timestamp">>], + InsertFields = UniqueKeyFields ++ [<<"msg_id">>, <<"box">>, <<"content">>, <<"unread_count">>, <<"timestamp">>], rdbms_queries:prepare_upsert(HostType, inbox_upsert, inbox, InsertFields, [<<"msg_id">>, - {assignment, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>}, + {expression, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN ? ELSE inbox.box END">>}, <<"content">>, <<"unread_count">>, <<"timestamp">>], @@ -72,7 +80,7 @@ init(HostType, _Options) -> rdbms_queries:prepare_upsert(HostType, inbox_upsert_incr_unread, inbox, InsertFields, [<<"msg_id">>, - {assignment, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>}, + {expression, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN ? ELSE inbox.box END">>}, <<"content">>, {expression, <<"unread_count">>, <<"inbox.unread_count + ?">>}, <<"timestamp">>], @@ -100,23 +108,40 @@ get_inbox_unread(HostType, {LUser, LServer, RemBareJID}) -> %% so we have to add +1 {ok, Val + 1}. --spec set_inbox(HostType, InboxEntryKey, Content, Count, MsgId, Timestamp) -> +-spec set_inbox(HostType, InboxEntryKey, Packet, Count, MsgId, Timestamp) -> mod_inbox:write_res() when HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - Content :: binary(), + Packet :: exml:element(), Count :: integer(), MsgId :: binary(), Timestamp :: integer(). -set_inbox(HostType, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp) -> +set_inbox(HostType, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp) -> + Content = exml:to_binary(Packet), Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Count, Timestamp], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Count, Timestamp], + Update = [MsgId, <<"inbox">>, Content, Count, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, <<"inbox">>, Content, Count, Timestamp], Res = rdbms_queries:execute_upsert(HostType, inbox_upsert, Insert, Update, Unique), %% MySQL returns 1 when an upsert is an insert %% and 2, when an upsert acts as update check_result_is_expected(Res, [1, 2]). +-spec empty_user_bin(HostType :: mongooseim:host_type(), + LServer :: jid:lserver(), + LUser :: jid:luser(), + TS :: integer()) -> non_neg_integer(). +empty_user_bin(HostType, LServer, LUser, TS) -> + {updated, BinN} = mongoose_rdbms:execute_successfully( + HostType, inbox_clean_user_bin, [LServer, LUser, TS]), + mongoose_rdbms:result_to_integer(BinN). + +-spec empty_global_bin(HostType :: mongooseim:host_type(), + TS :: integer()) -> non_neg_integer(). +empty_global_bin(HostType, TS) -> + {updated, BinN} = mongoose_rdbms:execute_successfully( + HostType, inbox_clean_global_bin, [TS]), + mongoose_rdbms:result_to_integer(BinN). + -spec remove_inbox_row(HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key()) -> mod_inbox:write_res(). remove_inbox_row(HostType, {LUser, LServer, LToBareJid}) -> @@ -130,29 +155,31 @@ remove_domain(HostType, LServer) -> ok. -spec set_inbox_incr_unread( - mongooseim:host_type(), mod_inbox:entry_key(), binary(), binary(), integer()) -> + mongooseim:host_type(), mod_inbox:entry_key(), exml:element(), binary(), integer()) -> mod_inbox:count_res(). -set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) -> - set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp, 1). +set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) -> + set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp, 1). -spec set_inbox_incr_unread(HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - Content :: binary(), + Packet :: exml:element(), MsgId :: binary(), Timestamp :: integer(), Incrs :: pos_integer()) -> mod_inbox:count_res(). -set_inbox_incr_unread(HostType, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs) -> +set_inbox_incr_unread(HostType, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs) -> + Content = exml:to_binary(Packet), Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Incrs, Timestamp], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Incrs, Timestamp], + Update = [MsgId, <<"inbox">>, Content, Incrs, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, <<"inbox">>, Content, Incrs, Timestamp], Res = rdbms_queries:execute_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique), check_result(Res). -spec reset_unread(HostType :: mongooseim:host_type(), InboxEntryKey :: mod_inbox:entry_key(), - MsgId :: binary() | undefined) -> mod_inbox:write_res(). -reset_unread(HostType, {LUser, LServer, LToBareJid}, MsgId) -> - Res = execute_reset_unread(HostType, LUser, LServer, LToBareJid, MsgId), + MsgId :: binary() | undefined, + TS :: integer()) -> mod_inbox:write_res(). +reset_unread(HostType, {LUser, LServer, LToBareJid}, MsgId, TS) -> + Res = execute_reset_unread(HostType, LUser, LServer, LToBareJid, MsgId, TS), check_result(Res). -spec clear_inbox(HostType :: mongooseim:host_type(), @@ -280,7 +307,10 @@ lookup_query_columns(Params) -> -spec lookup_arg_keys(mod_inbox:get_inbox_params()) -> [atom()]. lookup_arg_keys(Params) -> - lists:filter(fun(Key) -> maps:is_key(Key, Params) end, [start, 'end', box]). + lists:filter( + fun(box) -> maps:is_key(box, Params) andalso maps:get(box, Params, undefined) =/= <<"all">>; + (Key) -> maps:is_key(Key, Params) + end, [start, 'end', box]). -spec lookup_query_name(mod_inbox:get_inbox_params()) -> atom(). lookup_query_name(Params) -> @@ -299,6 +329,9 @@ param_to_column('end') -> timestamp; param_to_column(box) -> box. -spec param_id(Key :: atom(), Value :: any()) -> string(). +param_id(box, undefined) -> "_no_bin"; +param_id(box, <<"all">>) -> ""; +param_id(box, _) -> "_box"; param_id(_, undefined) -> ""; param_id(order, desc) -> "_desc"; param_id(order, asc) -> "_asc"; @@ -306,8 +339,7 @@ param_id(limit, _) -> "_lim"; param_id(start, _) -> "_start"; param_id('end', _) -> "_end"; param_id(hidden_read, true) -> "_hr"; -param_id(hidden_read, false) -> ""; -param_id(box, _) -> "_box". +param_id(hidden_read, false) -> "". -spec order_to_sql(Order :: asc | desc) -> binary(). order_to_sql(asc) -> <<"ASC">>; @@ -326,6 +358,10 @@ lookup_sql_condition('end', Timestamp) when is_integer(Timestamp) -> " AND timestamp <= ?"; lookup_sql_condition(hidden_read, true) -> " AND unread_count > 0"; +lookup_sql_condition(box, undefined) -> + " AND box <> 'bin'"; +lookup_sql_condition(box, <<"all">>) -> + ""; lookup_sql_condition(box, Val) when is_binary(Val) -> " AND box = ?"; lookup_sql_condition(_, _) -> @@ -395,14 +431,14 @@ execute_select_properties(HostType, LUser, LServer, RemBareJID) -> mongoose_rdbms:execute_successfully(HostType, inbox_select_properties, [LUser, LServer, RemBareJID]). --spec execute_reset_unread(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid(), binary() | undefined) -> +-spec execute_reset_unread(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid(), binary() | undefined, integer()) -> mongoose_rdbms:query_result(). -execute_reset_unread(HostType, LUser, LServer, RemBareJID, undefined) -> +execute_reset_unread(HostType, LUser, LServer, RemBareJID, undefined, TS) -> mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread, - [LUser, LServer, RemBareJID]); -execute_reset_unread(HostType, LUser, LServer, RemBareJID, MsgId) -> + [LUser, LServer, RemBareJID, TS]); +execute_reset_unread(HostType, LUser, LServer, RemBareJID, MsgId, TS) -> mongoose_rdbms:execute_successfully(HostType, inbox_reset_unread_msg, - [LUser, LServer, RemBareJID, MsgId]). + [LUser, LServer, RemBareJID, MsgId, TS]). -spec execute_delete(mongooseim:host_type(), jid:luser(), jid:lserver(), jid:literal_jid()) -> diff --git a/src/inbox/mod_inbox_rdbms_async.erl b/src/inbox/mod_inbox_rdbms_async.erl index 436f953476..f96c240b10 100644 --- a/src/inbox/mod_inbox_rdbms_async.erl +++ b/src/inbox/mod_inbox_rdbms_async.erl @@ -6,18 +6,21 @@ -behaviour(mod_inbox_backend). -behaviour(mongoose_aggregator_worker). +-type box() :: binary(). -type task() :: - {set_inbox, mod_inbox:entry_key(), content(), pos_integer(), id(), integer()} | - {set_inbox_incr_unread, mod_inbox:entry_key(), content(), id(), integer(), Incrs :: pos_integer()} | + {set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} | + {set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} | {remove_inbox_row, mod_inbox:entry_key()} | - {reset_unread, mod_inbox:entry_key(), id()}. + {reset_unread, mod_inbox:entry_key(), id(), integer()}. %% API -export([init/2, set_inbox/6, set_inbox_incr_unread/5, - reset_unread/3, + reset_unread/4, remove_inbox_row/2, + empty_user_bin/4, + empty_global_bin/2, remove_domain/2, clear_inbox/3, get_inbox/4, @@ -35,6 +38,7 @@ init(HostType, Opts) -> AsyncOpts = prepare_pool_opts(Opts), mod_inbox_rdbms:init(HostType, Opts), + prepare_deletes(HostType, Opts), start_pool(HostType, AsyncOpts), ok. @@ -47,6 +51,13 @@ prepare_pool_opts(#{async_writer := AsyncOpts}) -> aggregate_callback => fun ?MODULE:aggregate/3, verify_callback => fun ?MODULE:verify/3}. +prepare_deletes(_HostType, _Opts) -> + mongoose_rdbms:prepare(inbox_move_conversation_to_bin, inbox, + [luser, lserver, remote_bare_jid], + <<"UPDATE inbox SET box='bin'", + " WHERE luser = ? AND lserver = ? AND remote_bare_jid = ?">>), + ok. + -spec start_pool(mongooseim:host_type(), mongoose_async_pools:pool_opts()) -> term(). start_pool(HostType, Opts) -> mongoose_async_pools:start_pool(HostType, inbox, Opts). @@ -56,25 +67,24 @@ start_pool(HostType, Opts) -> request(Task, _Extra = #{host_type := HostType}) -> request_one(HostType, Task). -request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Content, Count, MsgId, Timestamp}) -> +request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) -> + Content = exml:to_binary(Packet), Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Count, Timestamp], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Count, Timestamp], + Update = [MsgId, Box, Content, Count, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp], rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique); - -request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Content, MsgId, Timestamp, Incrs}) -> +request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) -> + Content = exml:to_binary(Packet), Unique = [LUser, LServer, LToBareJid], - Update = [MsgId, Content, Incrs, Timestamp], - Insert = [LUser, LServer, LToBareJid, MsgId, Content, Incrs, Timestamp], + Update = [MsgId, Box, Content, Incrs, Timestamp], + Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp], rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique); - +request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) -> + mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]); +request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) -> + mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]); request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) -> - mongoose_rdbms:execute_request(HostType, inbox_delete_row, [LUser, LServer, LToBareJid]); - -request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined}) -> - mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid]); -request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId}) -> - mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId]). + mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]). -spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}. aggregate(Current, NewTask, _Extra) -> @@ -92,23 +102,23 @@ verify(Answer, InboxTask, _Extra) -> %% async callbacks -spec set_inbox(mongooseim:host_type(), mod_inbox:entry_key(), - content(), Count :: integer(), id(), Timestamp :: integer()) -> + exml:element(), Count :: integer(), id(), Timestamp :: integer()) -> mod_inbox:write_res(). -set_inbox(HostType, Entry, Content, Count, MsgId, Timestamp) -> - Params = {set_inbox, Entry, Content, Count, MsgId, Timestamp}, +set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) -> + Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>}, mongoose_async_pools:put_task(HostType, inbox, Entry, Params). -spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(), - Content :: binary(), MsgId :: binary(), Timestamp :: integer()) -> + exml:element(), MsgId :: binary(), Timestamp :: integer()) -> mod_inbox:count_res(). -set_inbox_incr_unread(HostType, Entry, Content, MsgId, Timestamp) -> - Params = {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, 1}, +set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) -> + Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>}, mongoose_async_pools:put_task(HostType, inbox, Entry, Params). --spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined) -> +-spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) -> mod_inbox:write_res(). -reset_unread(HostType, Entry, MsgId) -> - Params = {reset_unread, Entry, MsgId}, +reset_unread(HostType, Entry, MsgId, TS) -> + Params = {reset_unread, Entry, MsgId, TS}, mongoose_async_pools:put_task(HostType, inbox, Entry, Params). -spec remove_inbox_row(mongooseim:host_type(), mod_inbox:entry_key()) -> mod_inbox:write_res(). @@ -151,68 +161,99 @@ get_entry_properties(HostType, Entry) -> set_entry_properties(HostType, Entry, Properties) -> mod_inbox_rdbms:set_entry_properties(HostType, Entry, Properties). +-spec empty_user_bin(HostType :: mongooseim:host_type(), + LServer :: jid:lserver(), + LUser :: jid:luser(), + TS :: integer()) -> non_neg_integer(). +empty_user_bin(HostType, LServer, LUser, TS) -> + mod_inbox_rdbms:empty_user_bin(HostType, LServer, LUser, TS). + +-spec empty_global_bin(HostType :: mongooseim:host_type(), + TS :: integer()) -> non_neg_integer(). +empty_global_bin(HostType, TS) -> + mod_inbox_rdbms:empty_global_bin(HostType, TS). + -spec aggregate(CurrentlyAccumulatedTask :: task(), NewTask :: task()) -> FinalTask :: task(). %%% First task being processed, just take that one aggregate(undefined, Task) -> Task; -%%% if new task is remove_inbox, ignore all previous requests and just remove -aggregate(_, {remove_inbox_row, Entry}) -> - {remove_inbox_row, Entry}; +%%% if new task is remove_row, do the previous with an updated box + % {reset_unread, mod_inbox:entry_key(), id(), integer()}. +aggregate({reset_unread, _, _, _}, + {remove_inbox_row, _} = OldTask) -> + OldTask; +aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _}, + {remove_inbox_row, _}) -> + {set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>}; +aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}, + {remove_inbox_row, _}) -> + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>}; +aggregate(_, {remove_inbox_row, _} = OldTask) -> + OldTask; %%% if the last task was remove_row, this task should now only be an insert -aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _}) -> +aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) -> OldTask; -aggregate({remove_inbox_row, _}, {set_inbox, _, _, _, _, _} = NewTask) -> - NewTask; -aggregate({remove_inbox_row, _}, {set_inbox_incr_unread, _, _, _, _, _} = NewTask) -> - NewTask; +aggregate({remove_inbox_row, _}, + {set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) -> + {set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>}; +aggregate({remove_inbox_row, _}, + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) -> + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>}; %%% If the last task was a reset_unread, % we prefer explicit resets, % then adhoc newer resets, % then we accumulate inserts %% an undefined means an explicit request to reset, it has priority -aggregate({reset_unread, _, _}, {reset_unread, _, undefined} = NewTask) -> +aggregate({reset_unread, _, _, _}, {reset_unread, _, undefined, _} = NewTask) -> NewTask; %% an undefined means an explicit request to reset, it has priority -aggregate({reset_unread, _, undefined} = OldTask, {reset_unread, _, _}) -> +aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) -> OldTask; %% both are adhoc, we prefer the newer -aggregate({reset_unread, _, _}, {reset_unread, _, _} = NewTask) -> +aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) -> NewTask; -aggregate({reset_unread, _, _}, {set_inbox, _, _, _, _, _} = NewTask) -> +aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) -> NewTask; %% Here `Count` becomes an absolute value instead of an increment -aggregate({reset_unread, _, _}, {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) -> - {set_inbox, Entry, Content, Incrs, MsgId, Timestamp}; +aggregate({reset_unread, _, _, _}, + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) -> + {set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box}; %%% If the last task was a set_inbox %% Reset is an explicit reset-to-zero, so do reset the counter -aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, undefined}) -> - {set_inbox, Entry, Content, 0, MsgId, Timestamp}; +aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box}, + {reset_unread, _, undefined, _}) -> + {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; %% Reset refers to that same set_inbox -aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, MsgId}) -> - {set_inbox, Entry, Content, 0, MsgId, Timestamp}; +aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box}, + {reset_unread, _, MsgId, _}) -> + {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; %% Reset refers to some other set_inbox -aggregate({set_inbox, _, _, _, _, _} = OldTask, {reset_unread, _, _}) -> +aggregate({set_inbox, _, _, _, _, _, _} = OldTask, + {reset_unread, _, _, _}) -> OldTask; -aggregate({set_inbox, _, _, Count, _, _, _}, - {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) -> - {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs}; +aggregate({set_inbox, _, _, Count, _, _, _, _}, + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) -> + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box}; %%% If the last task was a set_inbox_incr_unread % we're resetting on this message: -aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _}, {reset_unread, _, MsgId}) -> - {set_inbox, Entry, Content, 0, MsgId, Timestamp}; -aggregate({set_inbox_incr_unread, _, _, _, _, _} = OldTask, {reset_unread, _, _}) -> +aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box}, + {reset_unread, _, MsgId, _}) -> + {set_inbox, Entry, Content, 0, MsgId, Timestamp, Box}; +aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask, + {reset_unread, _, _, _}) -> OldTask; % prefer newest row, but accumulate increment -aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2}, - {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1}) -> - {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2}; +aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _}, + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) -> + {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box}; -aggregate({set_inbox_incr_unread, _, _, MsgId, _, _}, {set_inbox, _, _, _, MsgId, _} = NewTask) -> +aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _}, + {set_inbox, _, _, _, MsgId, _, _} = NewTask) -> NewTask; aggregate(_OldTask, NewTask) -> diff --git a/src/inbox/mod_inbox_utils.erl b/src/inbox/mod_inbox_utils.erl index 400afe8560..c57f97a89d 100644 --- a/src/inbox/mod_inbox_utils.erl +++ b/src/inbox/mod_inbox_utils.erl @@ -18,7 +18,7 @@ %%%%%%%%%%%%%%%%%%% %% DB Operations shared by mod_inbox_one2one and mod_inbox_muclight --export([maybe_reset_unread_count/4, +-export([maybe_reset_unread_count/5, reset_unread_count_to_zero/3, maybe_write_to_inbox/6, write_to_sender_inbox/5, @@ -34,7 +34,10 @@ binary_to_bool/1, bool_to_binary/1, build_inbox_entry_key/2, - build_forward_el/1 + build_forward_el/1, + all_valid_boxes_for_query/1, + list_single_form_field/3, + calculate_ts_from/2 ]). -ignore_xref([get_reset_markers/1, if_chat_marker_get_id/2]). @@ -42,28 +45,33 @@ -spec maybe_reset_unread_count(HostType :: mongooseim:host_type(), User :: jid:jid(), Remote :: jid:jid(), - Packet :: exml:element()) -> ok. -maybe_reset_unread_count(HostType, User, Remote, Packet) -> + Packet :: exml:element(), + Acc :: mongoose_acc:t()) -> ok. +maybe_reset_unread_count(HostType, User, Remote, Packet, Acc) -> ResetMarkers = get_reset_markers(HostType), case if_chat_marker_get_id(Packet, ResetMarkers) of undefined -> ok; Id -> - reset_unread_count(HostType, User, Remote, Id) + TS = mongoose_acc:timestamp(Acc), + reset_unread_count(HostType, User, Remote, Id, TS) end. --spec reset_unread_count_to_zero(mongooseim:host_type(), jid:jid(), jid:jid()) -> ok. -reset_unread_count_to_zero(HostType, From, Remote) -> +-spec reset_unread_count_to_zero(mongoose_acc:t(), jid:jid(), jid:jid()) -> ok. +reset_unread_count_to_zero(Acc, From, Remote) -> + TS = mongoose_acc:timestamp(Acc), + HostType = mongoose_acc:host_type(Acc), InboxEntryKey = build_inbox_entry_key(From, Remote), - ok = mod_inbox_backend:reset_unread(HostType, InboxEntryKey, undefined). + ok = mod_inbox_backend:reset_unread(HostType, InboxEntryKey, undefined, TS). -spec reset_unread_count(HostType ::mongooseim:host_type(), From :: jid:jid(), Remote :: jid:jid(), - MsgId :: id()) -> ok. -reset_unread_count(HostType, From, Remote, MsgId) -> + MsgId :: id(), + TS :: integer()) -> ok. +reset_unread_count(HostType, From, Remote, MsgId, TS) -> InboxEntryKey = build_inbox_entry_key(From, Remote), - ok = mod_inbox_backend:reset_unread(HostType, InboxEntryKey, MsgId). + ok = mod_inbox_backend:reset_unread(HostType, InboxEntryKey, MsgId, TS). -spec write_to_sender_inbox(HostType :: mongooseim:host_type(), Sender :: jid:jid(), @@ -72,12 +80,11 @@ reset_unread_count(HostType, From, Remote, MsgId) -> Acc :: mongoose_acc:t()) -> ok. write_to_sender_inbox(HostType, Sender, Receiver, Packet, Acc) -> MsgId = get_msg_id(Packet), - Content = exml:to_binary(Packet), Timestamp = mongoose_acc:timestamp(Acc), %% no unread for a user because he writes new messages which assumes he read all previous messages. Count = 0, InboxEntryKey = build_inbox_entry_key(Sender, Receiver), - mod_inbox_backend:set_inbox(HostType, InboxEntryKey, Content, Count, MsgId, Timestamp). + mod_inbox_backend:set_inbox(HostType, InboxEntryKey, Packet, Count, MsgId, Timestamp). -spec write_to_receiver_inbox(HostType :: mongooseim:host_type(), Sender :: jid:jid(), @@ -86,11 +93,10 @@ write_to_sender_inbox(HostType, Sender, Receiver, Packet, Acc) -> Acc :: mongoose_acc:t()) -> ok | {ok, integer()}. write_to_receiver_inbox(HostType, Sender, Receiver, Packet, Acc) -> MsgId = get_msg_id(Packet), - Content = exml:to_binary(Packet), Timestamp = mongoose_acc:timestamp(Acc), InboxEntryKey = build_inbox_entry_key(Receiver, Sender), mod_inbox_backend:set_inbox_incr_unread(HostType, InboxEntryKey, - Content, MsgId, Timestamp). + Packet, MsgId, Timestamp). -spec clear_inbox(HostType :: mongooseim:host_type(), User :: jid:user(), @@ -218,3 +224,38 @@ build_forward_el(#{msg := Content, timestamp := Timestamp}) -> build_delay_el(Timestamp) -> TS = calendar:system_time_to_rfc3339(Timestamp, [{offset, "Z"}, {unit, microsecond}]), jlib:timestamp_to_xml(TS, undefined, undefined). + +all_valid_boxes_for_query(HostType) -> + [<<"all">> | gen_mod:get_module_opt(HostType, mod_inbox, boxes)]. + +-spec list_single_form_field(Var :: binary(), + Default :: binary(), + Options :: [ Option | {Label, Value}]) -> exml:element() when + Option :: binary(), Label :: binary(), Value :: binary(). +list_single_form_field(Var, Default, Options) -> + Value = form_field_value(Default), + #xmlel{ + name = <<"field">>, + attrs = [{<<"var">>, Var}, {<<"type">>, <<"list-single">>}], + children = [Value | [ form_field_option(Option) || Option <- Options ]] + }. + +-spec form_field_option(Option | {Label, Value}) -> exml:element() when + Option :: binary(), Label :: binary(), Value :: binary(). +form_field_option({Label, Value}) -> + #xmlel{ + name = <<"option">>, + attrs = [{<<"label">>, Label}], + children = [form_field_value(Value)] + }; +form_field_option(Option) -> + form_field_option({Option, Option}). + +-spec form_field_value(Value :: binary()) -> exml:element(). +form_field_value(Value) -> + #xmlel{name = <<"value">>, children = [#xmlcdata{content = Value}]}. + +-spec calculate_ts_from(integer(), non_neg_integer()) -> integer(). +calculate_ts_from(Now, Days) -> + DaysInMicroSeconds = 86400000000 * Days, % 8.64e+10 microseconds in a day + Now - DaysInMicroSeconds. diff --git a/src/mod_commands.erl b/src/mod_commands.erl index fd15f38233..856ab44b25 100644 --- a/src/mod_commands.erl +++ b/src/mod_commands.erl @@ -98,7 +98,7 @@ commands() -> {function, unregister}, {action, delete}, {args, [{host, binary}, {user, binary}]}, - {result, {msg, binary}} + {result, ok} ], [ {name, list_sessions}, @@ -118,7 +118,7 @@ commands() -> {function, kick_session}, {action, delete}, {args, [{host, binary}, {user, binary}, {res, binary}]}, - {result, {msg, binary}} + {result, ok} ], [ {name, list_contacts}, diff --git a/src/mongoose_api_admin.erl b/src/mongoose_api_admin.erl index 4996aec074..1338f44c57 100644 --- a/src/mongoose_api_admin.erl +++ b/src/mongoose_api_admin.erl @@ -32,8 +32,7 @@ -include("mongoose_api.hrl"). -include("mongoose.hrl"). --import(mongoose_api_common, [error_response/3, - error_response/4, +-import(mongoose_api_common, [error_response/4, action_to_method/1, method_to_action/1, error_code/1, diff --git a/src/mongoose_api_common.erl b/src/mongoose_api_common.erl index 751fab2857..b8e077874b 100644 --- a/src/mongoose_api_common.erl +++ b/src/mongoose_api_common.erl @@ -153,6 +153,13 @@ handle_request(Method, Command, Args, Req, #http_api_state{entity = Entity} = St Req :: cowboy_req:req(), State :: http_api_state(), Return :: {any(), cowboy_req:req(), http_api_state()}. +handle_result(<<"DELETE">>, ok, Req, State) -> + Req2 = cowboy_req:reply(204, Req), + {stop, Req2, State}; +handle_result(<<"DELETE">>, {ok, Res}, Req, State) -> + Req2 = cowboy_req:set_resp_body(jiffy:encode(Res), Req), + Req3 = cowboy_req:reply(200, Req2), + {jiffy:encode(Res), Req3, State}; handle_result(Verb, ok, Req, State) -> handle_result(Verb, {ok, nocontent}, Req, State); handle_result(<<"GET">>, {ok, Result}, Req, State) -> @@ -165,10 +172,6 @@ handle_result(<<"POST">>, {ok, Res}, Req, State) -> Req2 = cowboy_req:set_resp_body(Res, Req), Req3 = maybe_add_location_header(Res, binary_to_list(Path), Req2), {stop, Req3, State}; -%% Ignore the returned value from a command for DELETE methods -handle_result(<<"DELETE">>, {ok, _Res}, Req, State) -> - Req2 = cowboy_req:reply(204, Req), - {stop, Req2, State}; handle_result(<<"PUT">>, {ok, nocontent}, Req, State) -> Req2 = cowboy_req:reply(204, Req), {stop, Req2, State}; diff --git a/src/mongoose_collector.erl b/src/mongoose_collector.erl new file mode 100644 index 0000000000..d0a0fabf75 --- /dev/null +++ b/src/mongoose_collector.erl @@ -0,0 +1,53 @@ +-module(mongoose_collector). + +-include("mongoose_logger.hrl"). + +%% gen_server callbacks +-behaviour(gen_server). +-export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2]). + +-ignore_xref([start_link/2]). + +-record(watchdog, { + host_type :: mongooseim:host_type(), + action :: fun((mongooseim:host_type(), map()) -> term()), + opts :: term(), + interval :: pos_integer(), + timer_ref :: undefined | reference() + }). + +start_link(Name, Opts) -> + gen_server:start_link({local, Name}, ?MODULE, Opts, []). + +init(#{host_type := HostType, + action := Fun, + opts := Opts, + interval := Interval}) when is_function(Fun, 2) -> + State = #watchdog{host_type = HostType, + action = Fun, + opts = Opts, + interval = Interval, + timer_ref = undefined}, + {ok, schedule_check(State)}. + +handle_call(Msg, From, State) -> + ?UNEXPECTED_CALL(Msg, From), + {reply, ok, State}. + +handle_cast(Msg, State) -> + ?UNEXPECTED_CAST(Msg), + {noreply, State}. + +handle_info({timeout, Ref, run_action}, + #watchdog{timer_ref = Ref} = State) -> + run_action(State), + {noreply, schedule_check(State)}; +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info), + {noreply, State}. + +schedule_check(State = #watchdog{interval = Interval}) -> + State#watchdog{timer_ref = erlang:start_timer(Interval, self(), run_action)}. + +run_action(#watchdog{host_type = HostType, action = Fun, opts = Opts}) -> + Fun(HostType, Opts). diff --git a/src/stream_management/mod_stream_management_mnesia.erl b/src/stream_management/mod_stream_management_mnesia.erl index 43fe67065b..419e322141 100644 --- a/src/stream_management/mod_stream_management_mnesia.erl +++ b/src/stream_management/mod_stream_management_mnesia.erl @@ -1,9 +1,7 @@ -module(mod_stream_management_mnesia). -behaviour(mod_stream_management_backend). --behaviour(gen_server). -include("mongoose.hrl"). --include("jlib.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). -export([init/2, @@ -16,16 +14,7 @@ delete_stale_h/2]). %% Internal exports --export([start_link/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2]). - --ignore_xref([start_link/1]). - --record(smgc_state, - {gc_repeat_after :: non_neg_integer(), - gc_geriatric :: non_neg_integer() }). +-export([clear_table/2]). -record(stream_mgmt_stale_h, {smid :: mod_stream_management:smid(), @@ -36,22 +25,22 @@ {smid :: mod_stream_management:smid(), sid :: ejabberd_sm:sid() }). -init(_HostType, #{stale_h := StaleOpts}) -> +init(HostType, #{stale_h := StaleOpts}) -> mnesia:create_table(sm_session, [{ram_copies, [node()]}, {attributes, record_info(fields, sm_session)}]), mnesia:add_table_index(sm_session, sid), mnesia:add_table_copy(sm_session, node(), ram_copies), - maybe_init_stale_h(StaleOpts), + maybe_init_stale_h(HostType, StaleOpts), ok. -maybe_init_stale_h(StaleOpts = #{enabled := true}) -> +maybe_init_stale_h(HostType, StaleOpts = #{enabled := true}) -> ?LOG_INFO(#{what => stream_mgmt_stale_h_start}), mnesia:create_table(stream_mgmt_stale_h, [{ram_copies, [node()]}, {attributes, record_info(fields, stream_mgmt_stale_h)}]), mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies), - start_cleaner(StaleOpts); -maybe_init_stale_h(_) -> ok. + start_cleaner(HostType, StaleOpts); +maybe_init_stale_h(_, _) -> ok. -spec register_smid(HostType, SMID, SID) -> ok | {error, term()} when @@ -126,41 +115,16 @@ delete_stale_h(_HostType, SMID) -> %% stale_h cleaning logic -start_cleaner(Opts) -> - MFA = {?MODULE, start_link, [Opts]}, - ChildSpec = {stream_management_stale_h, MFA, permanent, 5000, worker, [?MODULE]}, +start_cleaner(HostType, #{repeat_after := Interval, geriatric := TTL}) -> + Name = gen_mod:get_module_proc(HostType, stream_management_stale_h), + WOpts = #{host_type => HostType, action => fun ?MODULE:clear_table/2, + opts => TTL, interval => Interval}, + MFA = {mongoose_collector, start_link, [Name, WOpts]}, + ChildSpec = {Name, MFA, permanent, 5000, worker, [?MODULE]}, %% TODO cleaner should be a service ejabberd_sup:start_child(ChildSpec). -start_link(Opts) -> - gen_server:start_link({local, stream_management_stale_h}, ?MODULE, Opts, []). - -init(#{repeat_after := RepeatAfter, geriatric := GeriatricAge}) -> - State = #smgc_state{gc_repeat_after = RepeatAfter, - gc_geriatric = GeriatricAge}, - schedule_check(State), - {ok, State}. - -handle_call(Msg, From, State) -> - ?UNEXPECTED_CALL(Msg, From), - {reply, ok, State}. - -handle_cast(Msg, State) -> - ?UNEXPECTED_CAST(Msg), - {noreply, State}. - -handle_info(check, #smgc_state{gc_geriatric = GeriatricAge} = State) -> - clear_table(GeriatricAge), - schedule_check(State), - {noreply, State}; -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info), - {noreply, State}. - -schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) -> - erlang:send_after(timer:seconds(RepeatAfter), self(), check). - -clear_table(GeriatricAge) -> +clear_table(_HostType, GeriatricAge) -> TimeToDie = erlang:monotonic_time(second) - GeriatricAge, MS = ets:fun2ms(fun(#stream_mgmt_stale_h{stamp = S}) when S < TimeToDie -> true end), ets:select_delete(stream_mgmt_stale_h, MS). diff --git a/test/common/config_parser_helper.erl b/test/common/config_parser_helper.erl index 838585eee4..a633e687c5 100644 --- a/test/common/config_parser_helper.erl +++ b/test/common/config_parser_helper.erl @@ -547,7 +547,9 @@ all_modules() -> mod_inbox => #{backend => rdbms, async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)}, - boxes => [<<"inbox">>, <<"archive">>], + boxes => [<<"inbox">>, <<"archive">>, <<"bin">>], + bin_ttl => 30, + bin_clean_after => timer:hours(1), iqdisc => no_queue, aff_changes => true, groupchat => [muclight], @@ -870,7 +872,9 @@ default_mod_config(mod_http_upload) -> default_mod_config(mod_inbox) -> #{backend => rdbms, async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)}, - boxes => [<<"inbox">>, <<"archive">>], + boxes => [<<"inbox">>, <<"archive">>, <<"bin">>], + bin_ttl => 30, + bin_clean_after => timer:hours(1), groupchat => [muclight], aff_changes => true, remove_on_kicked => true, diff --git a/test/config_parser_SUITE.erl b/test/config_parser_SUITE.erl index 4ddb3d7c93..5bf73e0d95 100644 --- a/test/config_parser_SUITE.erl +++ b/test/config_parser_SUITE.erl @@ -1737,17 +1737,22 @@ mod_inbox(_Config) -> ?cfgh(P ++ [reset_markers], ChatMarkers, T(#{<<"reset_markers">> => ChatMarkers})), ?cfgh(P ++ [groupchat], [muc, muclight], T(#{<<"groupchat">> => [<<"muc">>, <<"muclight">>]})), ?cfgh(P ++ [boxes], - [<<"inbox">>, <<"archive">>, <<"favourites">>, <<"spam">>], + [<<"inbox">>, <<"archive">>, <<"bin">>, <<"favourites">>, <<"spam">>], T(#{<<"boxes">> => [<<"favourites">>, <<"spam">>]})), + ?cfgh(P ++ [bin_ttl], 30, T(#{<<"bin_ttl">> => 30})), + ?cfgh(P ++ [bin_clean_after], 43200000, T(#{<<"bin_clean_after">> => 12})), ?cfgh(P ++ [aff_changes], true, T(#{<<"aff_changes">> => true})), ?cfgh(P ++ [remove_on_kicked], false, T(#{<<"remove_on_kicked">> => false})), ?errh(T(#{<<"backend">> => <<"nodejs">>})), + ?errh(T(#{<<"pool_size">> => -1})), ?errh(T(#{<<"reset_markers">> => 1})), ?errh(T(#{<<"reset_markers">> => [<<"destroyed">>]})), ?errh(T(#{<<"groupchat">> => [<<"test">>]})), ?errh(T(#{<<"boxes">> => [<<"archive">>]})), ?errh(T(#{<<"boxes">> => [<<"duplicate">>, <<"duplicate">>]})), ?errh(T(#{<<"boxes">> => <<"test">>})), + ?errh(T(#{<<"bin_ttl">> => true})), + ?errh(T(#{<<"bin_clean_after">> => -1})), ?errh(T(#{<<"aff_changes">> => 1})), ?errh(T(#{<<"remove_on_kicked">> => 1})).