Skip to content

Commit

Permalink
Introduce the bin box and turn async deletes to bin moves
Browse files Browse the repository at this point in the history
This fixes the fact that the asynchronous inbox is currently dangerously
inconsistent. If on two nodes one writes new messages to a user's
conversation with a room, and in another node that user is removed from
the room, the order in which those events are flushed into the DB is now
not ensured, and the risk of the removal being flushed before the
insert, and therefore the insert recovering a conversation forever, is
much higher than in the synchronous inbox.

With the concept of boxes, the async backend can instead have a deleted
box, called bin, so that removals aren't really removals, but just moves
to another box, which are atomic and can interleave with modifications
to that entry's other columns. Then asynchronously the deleted boxes can
be collected, much like email clears the Bin after so many days.
  • Loading branch information
NelsonVides committed Mar 29, 2022
1 parent 5a54f59 commit 2bb7f5f
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 56 deletions.
10 changes: 6 additions & 4 deletions big_tests/tests/inbox_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ inbox_opts() ->
config_parser_helper:default_mod_config(mod_inbox).

inbox_opts(regular) ->
(inbox_opts())#{boxes => [<<"inbox">>, <<"archive">>, <<"other">>]};
DefOps = #{boxes := Boxes} = inbox_opts(),
DefOps#{boxes := Boxes ++ [<<"other">>]};
inbox_opts(async_pools) ->
(inbox_opts())#{backend => rdbms_async,
async_writer => #{pool_size => 2},
boxes => [<<"inbox">>, <<"archive">>, <<"other">>]}.
DefOps = #{boxes := Boxes} = inbox_opts(),
DefOps#{backend => rdbms_async,
async_writer => #{pool_size => 1},
boxes => Boxes ++ [<<"other">>]}.

skip_or_run_inbox_tests(TestCases) ->
case (not ct_helper:is_ct_running())
Expand Down
2 changes: 1 addition & 1 deletion doc/modules/mod_inbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Number of workers in the pool. More than the number of available schedulers is r
A list of supported inbox boxes by the server. This can be used by clients to classify their inbox entries in any way that fits the end-user. The strings provided here will be used verbatim in the IQ query as described in [Inbox – Filtering and Ordering](../open-extensions/inbox.md#filtering-and-ordering).

!!! note
`inbox` and `archive` are always enabled, and therefore don't need to be specified here.
`inbox`, `archive`, and `bin` are reserved box names and are always enabled, therefore they don't need –and must not– to be specified in this section.

### `modules.mod_inbox.reset_markers`
* **Syntax:** array of strings, out of `"displayed"`, `"received"`, `"acknowledged"`
Expand Down
8 changes: 6 additions & 2 deletions src/inbox/mod_inbox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,12 @@ async_config_spec() ->
}.

process_inbox_boxes(Config = #{boxes := Boxes}) ->
false = lists:any(fun(Name) -> Name =:= <<"inbox">> orelse Name =:= <<"archive">> end, Boxes),
AllBoxes = [<<"inbox">>, <<"archive">> | Boxes ],
false = lists:any(fun(<<"inbox">>) -> true;
(<<"archive">>) -> true;
(<<"bin">>) -> true;
(_) -> false
end, Boxes),
AllBoxes = [<<"inbox">>, <<"archive">>, <<"bin">> | Boxes ],
Config#{boxes := AllBoxes}.

%%%%%%%%%%%%%%%%%%%
Expand Down
16 changes: 8 additions & 8 deletions src/inbox/mod_inbox_rdbms.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,19 @@ init(HostType, _Options) ->
[lserver], <<"DELETE FROM inbox WHERE lserver = ?">>),
% upserts
UniqueKeyFields = [<<"luser">>, <<"lserver">>, <<"remote_bare_jid">>],
InsertFields = UniqueKeyFields ++ [<<"msg_id">>, <<"content">>, <<"unread_count">>, <<"timestamp">>],
InsertFields = UniqueKeyFields ++ [<<"msg_id">>, <<"box">>, <<"content">>, <<"unread_count">>, <<"timestamp">>],
rdbms_queries:prepare_upsert(HostType, inbox_upsert, inbox,
InsertFields,
[<<"msg_id">>,
{assignment, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>},
{expression, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN ? ELSE inbox.box END">>},
<<"content">>,
<<"unread_count">>,
<<"timestamp">>],
UniqueKeyFields, <<"timestamp">>),
rdbms_queries:prepare_upsert(HostType, inbox_upsert_incr_unread, inbox,
InsertFields,
[<<"msg_id">>,
{assignment, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN 'inbox' ELSE inbox.box END">>},
{expression, <<"box">>, <<"CASE WHEN inbox.box='archive' THEN ? ELSE inbox.box END">>},
<<"content">>,
{expression, <<"unread_count">>, <<"inbox.unread_count + ?">>},
<<"timestamp">>],
Expand Down Expand Up @@ -112,8 +112,8 @@ get_inbox_unread(HostType, {LUser, LServer, RemBareJID}) ->
set_inbox(HostType, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp) ->
Content = exml:to_binary(Packet),
Unique = [LUser, LServer, LToBareJid],
Update = [MsgId, Content, Count, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, Content, Count, Timestamp],
Update = [MsgId, <<"inbox">>, Content, Count, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, <<"inbox">>, Content, Count, Timestamp],
Res = rdbms_queries:execute_upsert(HostType, inbox_upsert, Insert, Update, Unique),
%% MySQL returns 1 when an upsert is an insert
%% and 2, when an upsert acts as update
Expand Down Expand Up @@ -146,15 +146,15 @@ set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) ->
set_inbox_incr_unread(HostType, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs) ->
Content = exml:to_binary(Packet),
Unique = [LUser, LServer, LToBareJid],
Update = [MsgId, Content, Incrs, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, Content, Incrs, Timestamp],
Update = [MsgId, <<"inbox">>, Content, Incrs, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, <<"inbox">>, Content, Incrs, Timestamp],
Res = rdbms_queries:execute_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique),
check_result(Res).

-spec reset_unread(HostType :: mongooseim:host_type(),
InboxEntryKey :: mod_inbox:entry_key(),
MsgId :: binary() | undefined,
TS :: integer()) -> mod_inbox:write_res().
TS :: integer()) -> mod_inbox:write_res().
reset_unread(HostType, {LUser, LServer, LToBareJid}, MsgId, TS) ->
Res = execute_reset_unread(HostType, LUser, LServer, LToBareJid, MsgId, TS),
check_result(Res).
Expand Down
96 changes: 58 additions & 38 deletions src/inbox/mod_inbox_rdbms_async.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
-behaviour(mod_inbox_backend).
-behaviour(mongoose_aggregator_worker).

-type box() :: binary().
-type task() ::
{set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer()} |
{set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), Incrs :: pos_integer()} |
{set_inbox, mod_inbox:entry_key(), exml:element(), pos_integer(), id(), integer(), box()} |
{set_inbox_incr_unread, mod_inbox:entry_key(), exml:element(), id(), integer(), pos_integer(), box()} |
{remove_inbox_row, mod_inbox:entry_key()} |
{reset_unread, mod_inbox:entry_key(), id(), integer()}.

Expand Down Expand Up @@ -56,24 +57,24 @@ start_pool(HostType, Opts) ->
request(Task, _Extra = #{host_type := HostType}) ->
request_one(HostType, Task).

request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp}) ->
request_one(HostType, {set_inbox, {LUser, LServer, LToBareJid}, Packet, Count, MsgId, Timestamp, Box}) ->
Content = exml:to_binary(Packet),
Unique = [LUser, LServer, LToBareJid],
Update = [MsgId, Content, Count, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, Content, Count, Timestamp],
Update = [MsgId, Box, Content, Count, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Count, Timestamp],
rdbms_queries:request_upsert(HostType, inbox_upsert, Insert, Update, Unique);
request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs}) ->
request_one(HostType, {set_inbox_incr_unread, {LUser, LServer, LToBareJid}, Packet, MsgId, Timestamp, Incrs, Box}) ->
Content = exml:to_binary(Packet),
Unique = [LUser, LServer, LToBareJid],
Update = [MsgId, Content, Incrs, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, Content, Incrs, Timestamp],
Update = [MsgId, Box, Content, Incrs, Timestamp],
Insert = [LUser, LServer, LToBareJid, MsgId, Box, Content, Incrs, Timestamp],
rdbms_queries:request_upsert(HostType, inbox_upsert_incr_unread, Insert, Update, Unique);
request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
mongoose_rdbms:execute_request(HostType, inbox_delete_row, [LUser, LServer, LToBareJid]);
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, undefined, TS}) ->
mongoose_rdbms:execute_request(HostType, inbox_reset_unread, [LUser, LServer, LToBareJid, TS]);
request_one(HostType, {reset_unread, {LUser, LServer, LToBareJid}, MsgId, TS}) ->
mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]).
mongoose_rdbms:execute_request(HostType, inbox_reset_unread_msg, [LUser, LServer, LToBareJid, MsgId, TS]);
request_one(HostType, {remove_inbox_row, {LUser, LServer, LToBareJid}}) ->
mongoose_rdbms:execute_request(HostType, inbox_move_conversation_to_bin, [LUser, LServer, LToBareJid]).

-spec aggregate(task(), task(), mongoose_async_pools:pool_extra()) -> {ok, task()}.
aggregate(Current, NewTask, _Extra) ->
Expand All @@ -94,14 +95,14 @@ verify(Answer, InboxTask, _Extra) ->
exml:element(), Count :: integer(), id(), Timestamp :: integer()) ->
mod_inbox:write_res().
set_inbox(HostType, Entry, Packet, Count, MsgId, Timestamp) ->
Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp},
Params = {set_inbox, Entry, Packet, Count, MsgId, Timestamp, <<"inbox">>},
mongoose_async_pools:put_task(HostType, inbox, Entry, Params).

-spec set_inbox_incr_unread(mongooseim:host_type(), mod_inbox:entry_key(),
exml:element(), MsgId :: binary(), Timestamp :: integer()) ->
mod_inbox:count_res().
set_inbox_incr_unread(HostType, Entry, Packet, MsgId, Timestamp) ->
Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1},
Params = {set_inbox_incr_unread, Entry, Packet, MsgId, Timestamp, 1, <<"inbox">>},
mongoose_async_pools:put_task(HostType, inbox, Entry, Params).

-spec reset_unread(mongooseim:host_type(), mod_inbox:entry_key(), binary() | undefined, integer()) ->
Expand Down Expand Up @@ -155,17 +156,29 @@ set_entry_properties(HostType, Entry, Properties) ->
aggregate(undefined, Task) ->
Task;

%%% if new task is remove_inbox, ignore all previous requests and just remove
aggregate(_, {remove_inbox_row, Entry}) ->
{remove_inbox_row, Entry};
%%% if new task is remove_row, do the previous with an updated box
% {reset_unread, mod_inbox:entry_key(), id(), integer()}.
aggregate({reset_unread, _, _, _},
{remove_inbox_row, _} = OldTask) ->
OldTask;
aggregate({set_inbox, Entry, Content, Count, MsgId, Timestamp, _},
{remove_inbox_row, _}) ->
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _},
{remove_inbox_row, _}) ->
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};
aggregate(_, {remove_inbox_row, _} = OldTask) ->
OldTask;

%%% if the last task was remove_row, this task should now only be an insert
aggregate({remove_inbox_row, _} = OldTask, {reset_unread, _, _, _}) ->
OldTask;
aggregate({remove_inbox_row, _}, {set_inbox, _, _, _, _, _} = NewTask) ->
NewTask;
aggregate({remove_inbox_row, _}, {set_inbox_incr_unread, _, _, _, _, _} = NewTask) ->
NewTask;
aggregate({remove_inbox_row, _},
{set_inbox, Entry, Content, Count, MsgId, Timestamp, _}) ->
{set_inbox, Entry, Content, Count, MsgId, Timestamp, <<"bin">>};
aggregate({remove_inbox_row, _},
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, _}) ->
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, <<"bin">>};

%%% If the last task was a reset_unread,
% we prefer explicit resets,
Expand All @@ -180,38 +193,45 @@ aggregate({reset_unread, _, undefined, _} = OldTask, {reset_unread, _, _, _}) ->
%% both are adhoc, we prefer the newer
aggregate({reset_unread, _, _, _}, {reset_unread, _, _, _} = NewTask) ->
NewTask;
aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _} = NewTask) ->
aggregate({reset_unread, _, _, _}, {set_inbox, _, _, _, _, _, _} = NewTask) ->
NewTask;
%% Here `Count` becomes an absolute value instead of an increment
aggregate({reset_unread, _, _, _}, {set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) ->
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp};
aggregate({reset_unread, _, _, _},
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
{set_inbox, Entry, Content, Incrs, MsgId, Timestamp, Box};

%%% If the last task was a set_inbox
%% Reset is an explicit reset-to-zero, so do reset the counter
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, undefined, _}) ->
{set_inbox, Entry, Content, 0, MsgId, Timestamp};
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
{reset_unread, _, undefined, _}) ->
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
%% Reset refers to that same set_inbox
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp}, {reset_unread, _, MsgId, _}) ->
{set_inbox, Entry, Content, 0, MsgId, Timestamp};
aggregate({set_inbox, Entry, Content, _, MsgId, Timestamp, Box},
{reset_unread, _, MsgId, _}) ->
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
%% Reset refers to some other set_inbox
aggregate({set_inbox, _, _, _, _, _} = OldTask, {reset_unread, _, _, _}) ->
aggregate({set_inbox, _, _, _, _, _, _} = OldTask,
{reset_unread, _, _, _}) ->
OldTask;
aggregate({set_inbox, _, _, Count, _, _, _},
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs}) ->
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs};
aggregate({set_inbox, _, _, Count, _, _, _, _},
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs, Box}) ->
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Count + Incrs, Box};

%%% If the last task was a set_inbox_incr_unread
% we're resetting on this message:
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _}, {reset_unread, _, MsgId, _}) ->
{set_inbox, Entry, Content, 0, MsgId, Timestamp};
aggregate({set_inbox_incr_unread, _, _, _, _, _} = OldTask, {reset_unread, _, _, _}) ->
aggregate({set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, _, Box},
{reset_unread, _, MsgId, _}) ->
{set_inbox, Entry, Content, 0, MsgId, Timestamp, Box};
aggregate({set_inbox_incr_unread, _, _, _, _, _, _} = OldTask,
{reset_unread, _, _, _}) ->
OldTask;
% prefer newest row, but accumulate increment
aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2},
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1}) ->
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2};
aggregate({set_inbox_incr_unread, _, _, _, _, Incrs2, _},
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1, Box}) ->
{set_inbox_incr_unread, Entry, Content, MsgId, Timestamp, Incrs1 + Incrs2, Box};

aggregate({set_inbox_incr_unread, _, _, MsgId, _, _}, {set_inbox, _, _, _, MsgId, _} = NewTask) ->
aggregate({set_inbox_incr_unread, _, _, MsgId, _, _, _},
{set_inbox, _, _, _, MsgId, _, _} = NewTask) ->
NewTask;

aggregate(_OldTask, NewTask) ->
Expand Down
4 changes: 2 additions & 2 deletions test/common/config_parser_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ all_modules() ->
mod_inbox =>
#{backend => rdbms,
async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)},
boxes => [<<"inbox">>, <<"archive">>],
boxes => [<<"inbox">>, <<"archive">>, <<"bin">>],
iqdisc => no_queue,
aff_changes => true,
groupchat => [muclight],
Expand Down Expand Up @@ -857,7 +857,7 @@ default_mod_config(mod_global_distrib) ->
default_mod_config(mod_inbox) ->
#{backend => rdbms,
async_writer => #{pool_size => 2 * erlang:system_info(schedulers_online)},
boxes => [<<"inbox">>, <<"archive">>],
boxes => [<<"inbox">>, <<"archive">>, <<"bin">>],
groupchat => [muclight],
aff_changes => true,
remove_on_kicked => true,
Expand Down
2 changes: 1 addition & 1 deletion test/config_parser_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1736,7 +1736,7 @@ mod_inbox(_Config) ->
?cfgh(P ++ [reset_markers], ChatMarkers, T(#{<<"reset_markers">> => ChatMarkers})),
?cfgh(P ++ [groupchat], [muc, muclight], T(#{<<"groupchat">> => [<<"muc">>, <<"muclight">>]})),
?cfgh(P ++ [boxes],
[<<"inbox">>, <<"archive">>, <<"favourites">>, <<"spam">>],
[<<"inbox">>, <<"archive">>, <<"bin">>, <<"favourites">>, <<"spam">>],
T(#{<<"boxes">> => [<<"favourites">>, <<"spam">>]})),
?cfgh(P ++ [aff_changes], true, T(#{<<"aff_changes">> => true})),
?cfgh(P ++ [remove_on_kicked], false, T(#{<<"remove_on_kicked">> => false})),
Expand Down

0 comments on commit 2bb7f5f

Please sign in to comment.