Skip to content

Commit

Permalink
Merge pull request #3734 from esl/mu-mam-simple-with-complete-result-…
Browse files Browse the repository at this point in the history
…flag

New shared function mod_mam_utils:lookup/3
There are two strategies to determine complete flag:
- is_complete_result_page_using_offset
- set_complete_result_page_using_extra_message for simple queries.
set_complete_result_page_using_extra_message asks for an extra message on a page from a DB. After that it hides the message from the user, but sets complete flag properly
  • Loading branch information
NelsonVides authored Aug 17, 2022
2 parents 1b9802b + f6ae3c3 commit e65aace
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 41 deletions.
72 changes: 59 additions & 13 deletions big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
pagination_before10/1,
pagination_after10/1,
pagination_simple_before10/1,
pagination_simple_before3/1,
pagination_simple_before6/1,
pagination_simple_before1_pagesize0/1,
pagination_simple_before2_pagesize0/1,
pagination_simple_after5/1,
pagination_simple_after10/1,
pagination_simple_after12/1,
pagination_last_after_id5/1,
pagination_last_after_id5_before_id11/1,
pagination_empty_rset/1,
Expand Down Expand Up @@ -178,9 +185,10 @@
rsm_send/3,
stanza_page_archive_request/3,
wait_empty_rset/2,
wait_message_range/2,
wait_message_range/3,
message_id/2,
wait_message_range/5,
message_id/2,
stanza_prefs_set_request/4,
stanza_prefs_get_request/1,
stanza_query_get_request/1,
Expand Down Expand Up @@ -498,6 +506,13 @@ rsm_cases() ->
pagination_last_page_after_id4,
%% Simple cases
pagination_simple_before10,
pagination_simple_before3,
pagination_simple_before6,
pagination_simple_before1_pagesize0,
pagination_simple_before2_pagesize0,
pagination_simple_after5,
pagination_simple_after10,
pagination_simple_after12,
%% item_not_found response for nonexistent message ID in before/after filters
server_returns_item_not_found_for_before_filter_with_nonexistent_id,
server_returns_item_not_found_for_after_filter_with_nonexistent_id,
Expand Down Expand Up @@ -2477,7 +2492,6 @@ pagination_offset5_max0(Config) ->
pagination_before10(Config) ->
P = ?config(props, Config),
F = fun(Alice) ->
%% Get the last page of size 5.
RSM = #rsm_in{max=5, direction=before, id=message_id(10, Config)},
rsm_send(Config, Alice,
stanza_page_archive_request(P, <<"before10">>, RSM)),
Expand All @@ -2487,17 +2501,37 @@ pagination_before10(Config) ->
parallel_story(Config, [{alice, 1}], F).

pagination_simple_before10(Config) ->
P = ?config(props, Config),
F = fun(Alice) ->
%% Get the last page of size 5.
RSM = #rsm_in{max=5, direction=before, id=message_id(10, Config), simple=true},
rsm_send(Config, Alice,
stanza_page_archive_request(P, <<"before10">>, RSM)),
%% wait_message_range(Client, TotalCount, Offset, FromN, ToN),
wait_message_range(Alice, undefined, undefined, 5, 9),
ok
end,
parallel_story(Config, [{alice, 1}], F).
RSM = #rsm_in{max = 5, direction = before, id = message_id(10, Config), simple = true},
pagination_test(before10, RSM, simple_range(5, 9, false), Config).

pagination_simple_before3(Config) ->
RSM = #rsm_in{max = 5, direction = before, id = message_id(3, Config), simple = true},
pagination_test(before3, RSM, simple_range(1, 2, true), Config).

pagination_simple_before6(Config) ->
RSM = #rsm_in{max = 5, direction = before, id = message_id(6, Config), simple = true},
pagination_test(before6, RSM, simple_range(1, 5, true), Config).

pagination_simple_before1_pagesize0(Config) ->
%% No messages forwarded, but is_complete is set
RSM = #rsm_in{max = 0, direction = before, id = message_id(1, Config), simple = true},
pagination_test(before1, RSM, simple_range(undefined, undefined, true), Config).

pagination_simple_before2_pagesize0(Config) ->
RSM = #rsm_in{max = 0, direction = before, id = message_id(2, Config), simple = true},
pagination_test(before2, RSM, simple_range(undefined, undefined, false), Config).

pagination_simple_after5(Config) ->
RSM = #rsm_in{max = 3, direction = 'after', id = message_id(5, Config), simple = true},
pagination_test(after5, RSM, simple_range(6, 8, false), Config).

pagination_simple_after10(Config) ->
RSM = #rsm_in{max = 5, direction = 'after', id = message_id(10, Config), simple = true},
pagination_test(after10, RSM, simple_range(11, 15, true), Config).

pagination_simple_after12(Config) ->
RSM = #rsm_in{max = 5, direction = 'after', id = message_id(12, Config), simple = true},
pagination_test(after12, RSM, simple_range(13, 15, true), Config).

pagination_after10(Config) ->
P = ?config(props, Config),
Expand Down Expand Up @@ -3134,3 +3168,15 @@ retract_element(stanza_id) ->

origin_id() ->
<<"orig-id-1">>.

simple_range(From, To, IsComplete) ->
#{total_count => undefined, offset => undefined,
from => From, to => To, is_complete => IsComplete}.

pagination_test(Name, RSM, Range, Config) ->
P = ?config(props, Config),
F = fun(Alice) ->
rsm_send(Config, Alice, stanza_page_archive_request(P, atom_to_binary(Name), RSM)),
wait_message_range(Alice, Range)
end,
parallel_story(Config, [{alice, 1}], F).
18 changes: 16 additions & 2 deletions big_tests/tests/mam_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@
rsm_send/3,
stanza_page_archive_request/3,
wait_empty_rset/2,
wait_message_range/2,
wait_message_range/3,
message_id/2,
wait_message_range/5,
message_id/2,
stanza_prefs_set_request/4,
stanza_prefs_get_request/1,
stanza_query_get_request/1,
Expand Down Expand Up @@ -823,6 +824,12 @@ wait_message_range(Client, FromN, ToN) ->
wait_message_range(Client, 15, FromN-1, FromN, ToN).

wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
wait_message_range(Client, #{total_count => TotalCount, offset => Offset,
from => FromN, to => ToN}).

wait_message_range(Client, Params = #{total_count := TotalCount, offset := Offset,
from := FromN, to := ToN}) ->
IsComplete = maps:get(is_complete, Params, undefined),
Result = wait_archive_respond(Client),
Messages = respond_messages(Result),
IQ = respond_iq(Result),
Expand All @@ -833,8 +840,13 @@ wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
?assert_equal(TotalCount, ParsedIQ#result_iq.count),
?assert_equal(Offset, ParsedIQ#result_iq.first_index),
%% Compare body of the messages.
?assert_equal([generate_message_text(N) || N <- lists:seq(FromN, ToN)],
?assert_equal([generate_message_text(N) || N <- maybe_seq(FromN, ToN)],
[B || #forwarded_message{message_body=B} <- ParsedMessages]),
case IsComplete of
true -> ?assert_equal(<<"true">>, ParsedIQ#result_iq.complete);
false -> ?assert_equal(<<"false">>, ParsedIQ#result_iq.complete);
undefined -> ok
end,
ok
catch Class:Reason:StackTrace ->
ct:pal("IQ: ~p~n"
Expand All @@ -845,6 +857,8 @@ wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
erlang:raise(Class, Reason, StackTrace)
end.

maybe_seq(undefined, undefined) -> [];
maybe_seq(A, B) -> lists:seq(A, B).

wait_empty_rset(Alice, TotalCount) ->
Result = wait_archive_respond(Alice),
Expand Down
6 changes: 6 additions & 0 deletions src/mam/mod_mam.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
Offset :: non_neg_integer() | undefined,
MessageRows :: [message_row()]}.

-type lookup_result_map() :: #{total_count := TotalCount :: non_neg_integer() | undefined,
offset := Offset :: non_neg_integer() | undefined,
messages := MessageRows :: [message_row()],
is_complete => boolean()}.

%% Internal types
-type iterator_fun() :: fun(() -> {'ok', {_, _}}).
-type rewriter_fun() :: fun((JID :: jid:literal_jid())
Expand Down Expand Up @@ -73,6 +78,7 @@
unix_timestamp/0,
archive_id/0,
lookup_result/0,
lookup_result_map/0,
message_row/0,
message_id/0,
restore_option/0,
Expand Down
11 changes: 5 additions & 6 deletions src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@

%% Other
-import(mod_mam_utils,
[mess_id_to_external_binary/1,
is_complete_result_page/4]).
[mess_id_to_external_binary/1]).

-include_lib("mongoose.hrl").
-include_lib("jlib.hrl").
Expand Down Expand Up @@ -406,7 +405,7 @@ handle_set_message_form(HostType, #jid{} = From, #jid{} = ArcJID, IQ) ->
jlib:iq() | ignore | {error, term(), jlib:iq()}.
do_handle_set_message_form(HostType, From, ArcID, ArcJID, IQ, Params0) ->
Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From),
Result = lookup_messages(HostType, Params),
Result = mod_mam_utils:lookup(HostType, Params, fun lookup_messages/2),
handle_lookup_result(Result, HostType, From, IQ, Params).

-spec handle_lookup_result({ok, mod_mam:lookup_result()} | {error, term()},
Expand All @@ -421,16 +420,16 @@ handle_lookup_result(Result, HostType, From, IQ, #{owner_jid := ArcJID} = Params
send_messages_and_iq_result(Res, HostType, From, IQ, Params)
end.

send_messages_and_iq_result({TotalCount, Offset, MessageRows}, HostType, From,
send_messages_and_iq_result(#{total_count := TotalCount, offset := Offset,
messages := MessageRows, is_complete := IsComplete},
HostType, From,
#iq{xmlns = MamNs, sub_el = QueryEl} = IQ,
#{owner_jid := ArcJID} = Params) ->
%% Forward messages
QueryID = exml_query:attr(QueryEl, <<"queryid">>, <<>>),
{FirstMessID, LastMessID} = forward_messages(HostType, From, ArcJID, MamNs,
QueryID, MessageRows, true),

%% Make fin iq
IsComplete = is_complete_result_page(TotalCount, Offset, MessageRows, Params),
IsStable = true,
ResultSetEl = result_set(FirstMessID, LastMessID, Offset, TotalCount),
ExtFinMod = mod_mam_params:extra_fin_element_module(?MODULE, HostType),
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_muc_cassandra_arch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -757,5 +757,5 @@ db_message_format(HostType) ->
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).

-spec pool_name(HostType :: host_type()) -> mongoose_wpool:pool_name().
pool_name(HostType) ->
pool_name(_HostType) ->
default.
9 changes: 4 additions & 5 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@

%% Other
-import(mod_mam_utils,
[mess_id_to_external_binary/1,
is_complete_result_page/4]).
[mess_id_to_external_binary/1]).

%% ejabberd
-import(mod_mam_utils,
Expand Down Expand Up @@ -398,16 +397,16 @@ do_handle_set_message_form(Params0, From, ArcID, ArcJID,
HostType) ->
QueryID = exml_query:attr(QueryEl, <<"queryid">>, <<>>),
Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From),
case lookup_messages(HostType, Params) of
case mod_mam_utils:lookup(HostType, Params, fun lookup_messages/2) of
{error, Reason} ->
report_issue(Reason, mam_lookup_failed, ArcJID, IQ),
return_error_iq(IQ, Reason);
{ok, {TotalCount, Offset, MessageRows}} ->
{ok, #{total_count := TotalCount, offset := Offset, messages := MessageRows,
is_complete := IsComplete}} ->
%% Forward messages
{FirstMessID, LastMessID} = forward_messages(HostType, From, ArcJID, MamNs,
QueryID, MessageRows, true),
%% Make fin iq
IsComplete = is_complete_result_page(TotalCount, Offset, MessageRows, Params),
IsStable = true,
ResultSetEl = result_set(FirstMessID, LastMessID, Offset, TotalCount),
ExtFinMod = mod_mam_params:extra_fin_element_module(?MODULE, HostType),
Expand Down
85 changes: 71 additions & 14 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
calculate_msg_id_borders/3,
calculate_msg_id_borders/4,
maybe_encode_compact_uuid/2,
is_complete_result_page/4,
wait_shaper/4,
check_for_item_not_found/3]).

Expand All @@ -90,7 +89,8 @@
is_jid_in_user_roster/3]).

%% Shared logic
-export([check_result_for_policy_violation/2]).
-export([check_result_for_policy_violation/2,
lookup/3]).

-callback extra_fin_element(mongooseim:host_type(),
mam_iq:lookup_params(),
Expand Down Expand Up @@ -1039,14 +1039,15 @@ maybe_previous_id(X) ->
%% It's the most efficient way to query archive, if the client side does
%% not care about the total number of messages and if it's stateless
%% (i.e. web interface).
-spec is_complete_result_page(TotalCount, Offset, MessageRows, Params) ->
%% Handles case when we have TotalCount and Offset as integers
-spec is_complete_result_page_using_offset(Params, Result) ->
boolean() when
TotalCount :: non_neg_integer()|undefined,
Offset :: non_neg_integer()|undefined,
MessageRows :: list(),
Params :: mam_iq:lookup_params().
is_complete_result_page(TotalCount, Offset, MessageRows,
#{page_size := PageSize} = Params) ->
Params :: mam_iq:lookup_params(),
Result :: mod_mam:lookup_result_map().
is_complete_result_page_using_offset(#{page_size := PageSize} = Params,
#{total_count := TotalCount, offset := Offset,
messages := MessageRows})
when is_integer(TotalCount), is_integer(Offset) ->
case maps:get(ordering_direction, Params, forward) of
forward ->
is_most_recent_page(PageSize, TotalCount, Offset, MessageRows);
Expand Down Expand Up @@ -1155,19 +1156,75 @@ is_policy_violation(TotalCount, Offset, MaxResultLimit, LimitPassed) ->
LookupResult :: mod_mam:lookup_result(),
R :: {ok, mod_mam:lookup_result()} | {error, item_not_found}.
check_for_item_not_found(#rsm_in{direction = before, id = ID},
PageSize, {TotalCount, Offset, MessageRows}) ->
_PageSize, {TotalCount, Offset, MessageRows}) ->
case maybe_last(MessageRows) of
{ok, #{id := ID}} = _IntervalEndpoint ->
Page = lists:sublist(MessageRows, PageSize),
{ok, {TotalCount, Offset, Page}};
{ok, #{id := ID}} ->
{ok, {TotalCount, Offset, list_without_last(MessageRows)}};
undefined ->
{error, item_not_found}
end;
check_for_item_not_found(#rsm_in{direction = aft, id = ID},
_PageSize, {TotalCount, Offset, MessageRows0}) ->
case MessageRows0 of
[#{id := ID} = _IntervalEndpoint | MessageRows] ->
[#{id := ID} | MessageRows] ->
{ok, {TotalCount, Offset, MessageRows}};
_ ->
{error, item_not_found}
end.

-spec lookup(HostType :: mongooseim:host_type(),
Params :: mam_iq:lookup_params(),
F :: fun()) ->
{ok, mod_mam:lookup_result_map()} | {error, Reason :: term()}.
lookup(HostType, Params, F) ->
F1 = patch_fun_to_make_result_as_map(F),
process_lookup_with_complete_check(HostType, Params, F1).

process_lookup_with_complete_check(HostType, Params = #{is_simple := true}, F) ->
process_simple_lookup_with_complete_check(HostType, Params, F);
process_lookup_with_complete_check(HostType, Params, F) ->
case F(HostType, Params) of
{ok, Result} ->
IsComplete = is_complete_result_page_using_offset(Params, Result),
{ok, Result#{is_complete => IsComplete}};
Other ->
Other
end.

patch_fun_to_make_result_as_map(F) ->
fun(HostType, Params) -> result_to_map(F(HostType, Params)) end.

result_to_map({ok, {TotalCount, Offset, MessageRows}}) ->
{ok, #{total_count => TotalCount, offset => Offset, messages => MessageRows}};
result_to_map(Other) ->
Other.

%% We query an extra message by changing page_size.
%% After that we remove this message from the result set when returning.
process_simple_lookup_with_complete_check(HostType, Params = #{page_size := PageSize}, F) ->
Params2 = Params#{page_size => PageSize + 1},
case F(HostType, Params2) of
{ok, Result} ->
{ok, set_complete_result_page_using_extra_message(PageSize, Params, Result)};
Other ->
Other
end.

set_complete_result_page_using_extra_message(PageSize, Params, Result = #{messages := MessageRows}) ->
case length(MessageRows) =:= (PageSize + 1) of
true ->
Result#{is_complete => false, messages => remove_extra_message(Params, MessageRows)};
false ->
Result#{is_complete => true}
end.

remove_extra_message(Params, Messages) ->
case maps:get(ordering_direction, Params, forward) of
forward ->
list_without_last(Messages);
backward ->
tl(Messages)
end.

list_without_last(List) ->
lists:reverse(tl(lists:reverse(List))).

0 comments on commit e65aace

Please sign in to comment.