Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use map for mod_mam:message_row() #3093

Merged
merged 7 commits into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
{suites, "tests", service_domain_db_SUITE}.
{suites, "tests", domain_isolation_SUITE}.
{suites, "tests", domain_removal_SUITE}.
{suites, "tests", mam_send_message_SUITE}.

{config, ["test.config"]}.
{logdir, "ct_report"}.
Expand Down
142 changes: 142 additions & 0 deletions big_tests/tests/mam_send_message_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
-module(mam_send_message_SUITE).

%% API
-export([all/0,
groups/0,
init_per_suite/1,
end_per_suite/1,
init_per_group/2,
end_per_group/2,
init_per_testcase/2,
end_per_testcase/2]).

-export([mam_muc_send_message/1,
mam_pm_send_message/1]).

-import(mam_helper,
[stanza_archive_request/2,
wait_archive_respond/1,
assert_respond_size/2,
respond_messages/1,
parse_forwarded_message/1]).

-import(distributed_helper, [mim/0,
require_rpc_nodes/1,
rpc/4]).

-include("mam_helper.hrl").
-include_lib("escalus/include/escalus.hrl").
-include_lib("escalus/include/escalus_xmlns.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("exml/include/exml_stream.hrl").

all() ->
[{group, send_message}].

groups() ->
[
{send_message, [], [mam_pm_send_message,
mam_muc_send_message]}
].

domain() ->
ct:get_config({hosts, mim, domain}).

%%%===================================================================
%%% Overall setup/teardown
%%%===================================================================
init_per_suite(Config) ->
escalus:init_per_suite(Config).

end_per_suite(Config) ->
escalus:end_per_suite(Config).

%%%===================================================================
%%% Group specific setup/teardown
%%%===================================================================
init_per_group(Group, Config) ->
case mongoose_helper:is_rdbms_enabled(domain()) of
true ->
load_custom_module(),
Config2 = dynamic_modules:save_modules(domain(), Config),
rpc(mim(), gen_mod_deps, start_modules, [domain(), group_to_modules(Group)]),
Config2;
false ->
{skip, require_rdbms}
end.

end_per_group(_Groupname, Config) ->
case mongoose_helper:is_rdbms_enabled(domain()) of
true ->
dynamic_modules:restore_modules(domain(), Config);
false ->
ok
end,
ok.

group_to_modules(send_message) ->
MH = muc_light_helper:muc_host(),
[{mod_mam_meta, [{backend, rdbms}, {pm, []}, {muc, [{host, MH}]},
{send_message, mam_send_message_example}]},
{mod_muc_light, []},
{mam_send_message_example, []}].

load_custom_module() ->
mam_send_message_example:module_info(),
{Mod, Code, File} = code:get_object_code(mam_send_message_example),
rpc(mim(), code, load_binary, [Mod, File, Code]).

%%%===================================================================
%%% Testcase specific setup/teardown
%%%===================================================================

init_per_testcase(TestCase, Config) ->
escalus:init_per_testcase(TestCase, Config).

end_per_testcase(TestCase, Config) ->
escalus:end_per_testcase(TestCase, Config).

%%%===================================================================
%%% Test Cases
%%%===================================================================

mam_pm_send_message(Config) ->
P = ?config(props, Config),
arcusfelis marked this conversation as resolved.
Show resolved Hide resolved
F = fun(Alice, Bob) ->
escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
escalus:wait_for_stanza(Bob),
mam_helper:wait_for_archive_size(Alice, 1),
mam_helper:wait_for_archive_size(Bob, 1),
escalus:send(Alice, stanza_archive_request(P, <<"q1">>)),
Res = wait_archive_respond(Alice),
assert_respond_size(1, Res),
[Msg] = respond_messages(Res),
verify_has_some_hash(Msg)
end,
escalus_fresh:story(Config, [{alice, 1}, {bob, 1}], F).
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved

mam_muc_send_message(Config0) ->
F = fun(Config, Alice) ->
P = ?config(props, Config),
Room = muc_helper:fresh_room_name(),
MucHost = muc_light_helper:muc_host(),
muc_light_helper:create_room(Room, MucHost, alice,
[], Config, muc_light_helper:ver(1)),
escalus_assert:has_no_stanzas(Alice),
RoomAddr = <<Room/binary, "@", MucHost/binary>>,
escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, <<"text">>)),
M = escalus:wait_for_stanza(Alice),
escalus:assert(is_message, M),
escalus_assert:has_no_stanzas(Alice),
mam_helper:wait_for_room_archive_size(MucHost, Room, 1),
escalus:send(Alice, escalus_stanza:to(stanza_archive_request(P, <<"q1">>), RoomAddr)),
[Msg] = respond_messages(assert_respond_size(1, wait_archive_respond(Alice))),
verify_has_some_hash(Msg)
end,
escalus_fresh:story_with_config(Config0, [{alice, 1}], F).
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved

verify_has_some_hash(Msg) ->
Hash = exml_query:path(Msg, [{element, <<"result">>},
{element, <<"some_hash">>},
{attr, <<"value">>}]),
binary_to_integer(Hash). %% is integer
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
%% Adds some_hash element to each extracted message result.
%%
%% An example module for extending MAM lookup results.
%% Defines a callback for send_message callback.
%% Handles lookup messages hooks to extend message rows with extra info.
-module(mam_send_message_example).
-behaviour(gen_mod).
-behaviour(mongoose_module_metrics).
-include_lib("exml/include/exml.hrl").

-export([start/2,
stop/1,
lookup_messages/3,
send_message/4]).


start(Host, _Opts) ->
ejabberd_hooks:add(hooks(Host)).

stop(Host) ->
ejabberd_hooks:delete(hooks(Host)).

hooks(Host) ->
[{mam_lookup_messages, Host, ?MODULE, lookup_messages, 60},
{mam_muc_lookup_messages, Host, ?MODULE, lookup_messages, 60}].

%% caller_jid could be used for privacy checking or per-user customization
lookup_messages({error, _Reason} = Result, _Host, _Params) ->
Result;
lookup_messages({ok, {TotalCount, Offset, MessageRows}},
Host, Params = #{owner_jid := ArcJID, caller_jid := _CallerJID}) ->
MessageRows2 = [extend_message(Host, ArcJID, Row) || Row <- MessageRows],
{ok, {TotalCount, Offset, MessageRows2}}.

extend_message(_Host, _ArcJID, Row = #{}) ->
%% Extend a message with a new field
%% Usually extracted from a DB
Row#{some_hash => erlang:phash2(Row, 32)}.

send_message(Row, From, To, Mess) ->
Res = xml:get_subtag(Mess, <<"result">>),
Res2 = xml:append_subtags(Res, [new_subelem(Row)]),
Mess2 = xml:replace_subelement(Mess, Res2),
mod_mam_utils:send_message(Row, From, To, Mess2).

new_subelem(#{some_hash := SomeHash}) ->
#xmlel{name = <<"some_hash">>, attrs = [{<<"value">>, integer_to_binary(SomeHash)}]}.
11 changes: 11 additions & 0 deletions doc/modules/mod_mam.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ Do not add a `<stanza-id/>` element from MAM v0.6.

Name of a module implementing [`is_archivable_message/3` callback](#is_archivable_message) that determines if the message should be archived.

### `modules.mod_mam_meta.send_message`
* **Syntax:** non-empty string
* **Default:** `"mod_mam_utils"`
* **Example:** `send_message = "mod_mam_utils"`

Name of a module implementing `send_message/4` callback that routes a message during lookup operation.
Consult with `mod_mam_utils:send_message/4` code for more information.

Check `big_tests/tests/mam_send_message_SUITE_data/mam_send_message_example.erl` file
in the MongooseIM repository for the usage example.

### `modules.mod_mam_meta.archive_chat_markers`
* **Syntax:** boolean
* **Default:** `false`
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mam_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ decode_row({ExtMessID, ExtSrcJID, ExtData}, Env) ->
MessID = mongoose_rdbms:result_to_integer(ExtMessID),
SrcJID = decode_jid(ExtSrcJID, Env),
Packet = decode_packet(ExtData, Env),
{MessID, SrcJID, Packet}.
#{id => MessID, jid => SrcJID, packet => Packet}.

-spec decode_muc_row(db_muc_row(), env_vars()) -> mod_mam:message_row().
decode_muc_row({ExtMessID, Nick, ExtData}, Env = #{archive_jid := RoomJID}) ->
MessID = mongoose_rdbms:result_to_integer(ExtMessID),
SrcJID = jid:replace_resource(RoomJID, Nick),
Packet = decode_packet(ExtData, Env),
{MessID, SrcJID, Packet}.
#{id => MessID, jid => SrcJID, packet => Packet}.

-spec decode_muc_gdpr_row(db_muc_gdpr_row(), env_vars()) -> decoded_muc_gdpr_row().
decode_muc_gdpr_row({ExtMessID, ExtData}, Env) ->
Expand Down
10 changes: 6 additions & 4 deletions src/mam/mam_iq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-export([form_to_with_jid/1]).
-export([form_to_lookup_params/4]).

-export([lookup_params_with_archive_details/3]).
-export([lookup_params_with_archive_details/4]).

-import(mod_mam_utils,
[maybe_microseconds/1,
Expand All @@ -34,6 +34,7 @@
-type lookup_params() :: #{
archive_id => mod_mam:archive_id(),
owner_jid => jid:jid(),
caller_jid => jid:jid(),
rsm => jlib:rsm_in() | undefined,
max_result_limit => non_neg_integer(),
%% Contains page size value provided by client or enforced by server.
Expand Down Expand Up @@ -215,11 +216,12 @@ common_lookup_params(QueryEl, MaxResultLimit, DefaultResultLimit) ->
limit_passed => Limit =/= <<>>,
ordering_direction => ordering_direction(RSM)}.

-spec lookup_params_with_archive_details(lookup_params(), term(), jid:jid()) ->
-spec lookup_params_with_archive_details(lookup_params(), term(), jid:jid(), jid:jid()) ->
lookup_params().
lookup_params_with_archive_details(Params, ArcID, ArcJID) ->
lookup_params_with_archive_details(Params, ArcID, ArcJID, CallerJID) ->
Params#{archive_id => ArcID,
owner_jid => ArcJID}.
owner_jid => ArcJID,
caller_jid => CallerJID}.

ordering_direction(#rsm_in{direction = before}) -> backward;
ordering_direction(_) -> forward.
Expand Down
3 changes: 2 additions & 1 deletion src/mam/mam_lookup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ decode_rows(MessageRows, Env) ->
[decode_row(Row, Env) || Row <- MessageRows].

%% First element of the tuple is decoded message ID
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no tuple now, please amend this comment.

decoded_row_to_message_id(DecodedRow) -> element(1, DecodedRow).
-spec decoded_row_to_message_id(mod_mam:message_row()) -> mod_mam:message_id().
decoded_row_to_message_id(#{id := MessId}) -> MessId.

-spec extract_messages(Env :: env_vars(),
Filter :: filter(), Offset :: non_neg_integer(), Max :: pos_integer(),
Expand Down
30 changes: 15 additions & 15 deletions src/mam/mod_mam.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@

%% ejabberd
-import(mod_mam_utils,
[send_message/3,
is_jid_in_user_roster/2]).
[is_jid_in_user_roster/2]).


-include("mongoose.hrl").
Expand All @@ -119,7 +118,7 @@

-type borders() :: #mam_borders{}.

-type message_row() :: {message_id(), jid:jid(), exml:element()}.
-type message_row() :: #{id => message_id(), jid => jid:jid(), packet => exml:element()}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they optional? Could we make the keys mandatory in this type?

-type lookup_result() :: {TotalCount :: non_neg_integer() | undefined,
Offset :: non_neg_integer() | undefined,
MessageRows :: [message_row()]}.
Expand Down Expand Up @@ -425,7 +424,7 @@ handle_set_message_form(#jid{} = From, #jid{} = ArcJID,
Params0 = mam_iq:form_to_lookup_params(IQ, mod_mam_params:max_result_limit(?MODULE, Host),
mod_mam_params:default_result_limit(?MODULE, Host),
mod_mam_params:extra_params_module(?MODULE, Host)),
Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID),
Params = mam_iq:lookup_params_with_archive_details(Params0, ArcID, ArcJID, From),
case lookup_messages(Host, Params) of
{error, Reason} ->
report_issue(Reason, mam_lookup_failed, ArcJID, IQ),
Expand All @@ -450,12 +449,16 @@ forward_messages(From, ArcJID, MamNs, QueryID, MessageRows, SetClientNs) ->
[_|_] -> {message_row_to_ext_id(hd(MessageRows)),
message_row_to_ext_id(lists:last(MessageRows))}
end,

[send_message(ArcJID, From,
message_row_to_xml(MamNs, M, QueryID, SetClientNs))
|| M <- MessageRows],
Host = ArcJID#jid.lserver,
SendModule = mod_mam_params:send_message_mod(?MODULE, Host),
[send_message(SendModule, Row, ArcJID, From,
message_row_to_xml(MamNs, Row, QueryID, SetClientNs))
|| Row <- MessageRows],
{FirstMessID, LastMessID}.

send_message(SendModule, Row, ArcJID, From, Packet) ->
SendModule:send_message(Row, ArcJID, From, Packet).

-spec handle_get_message_form(jid:jid(), jid:jid(), jlib:iq()) ->
jlib:iq().
handle_get_message_form(_From=#jid{lserver = Host}, _ArcJID=#jid{}, IQ=#iq{}) ->
Expand Down Expand Up @@ -609,20 +612,17 @@ archive_message(Host, Params) ->
%% ----------------------------------------------------------------------
%% Helpers

-type messid_jid_packet() :: {MessId :: integer(),
SrcJID :: jid:jid(),
Packet :: exml:element()}.
-spec message_row_to_xml(binary(), messid_jid_packet(), QueryId :: binary(), boolean()) ->
-spec message_row_to_xml(binary(), message_row(), QueryId :: binary(), boolean()) ->
exml:element().
message_row_to_xml(MamNs, {MessID, SrcJID, Packet}, QueryID, SetClientNs) ->
message_row_to_xml(MamNs, #{id := MessID, jid := SrcJID, packet := Packet}, QueryID, SetClientNs) ->
{Microseconds, _NodeMessID} = decode_compact_uuid(MessID),
TS = calendar:system_time_to_rfc3339(erlang:convert_time_unit(Microseconds, microsecond, second), [{offset, "Z"}]),
BExtMessID = mess_id_to_external_binary(MessID),
Packet1 = mod_mam_utils:maybe_set_client_xmlns(SetClientNs, Packet),
wrap_message(MamNs, Packet1, QueryID, BExtMessID, TS, SrcJID).

-spec message_row_to_ext_id(messid_jid_packet()) -> binary().
message_row_to_ext_id({MessID, _, _}) ->
-spec message_row_to_ext_id(message_row()) -> binary().
message_row_to_ext_id(#{id := MessID}) ->
mess_id_to_external_binary(MessID).

handle_error_iq(Host, Acc, _To, _Action, {error, _Reason, IQ}) ->
Expand Down
2 changes: 1 addition & 1 deletion src/mam/mod_mam_cassandra_arch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ rows_to_uniform_format(MessageRows) ->
row_to_uniform_format(#{from_jid := FromJID, message := Msg, id := MsgID}) ->
SrcJID = jid:from_binary(FromJID),
Packet = stored_binary_to_packet(Msg),
{MsgID, SrcJID, Packet}.
#{id => MsgID, jid => SrcJID, packet => Packet}.

row_to_message_id(#{id := MsgID}) ->
MsgID.
Expand Down
5 changes: 2 additions & 3 deletions src/mam/mod_mam_elasticsearch_arch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,13 @@ search_result_to_mam_lookup_result(Result, Params) ->
{CorrectedTotalCount, Offset, Messages}
end.

-spec hit_to_mam_message(map()) -> {mod_mam:message_id(), jid:jid(), exml:element()}.
-spec hit_to_mam_message(map()) -> mod_mam:message_row().
hit_to_mam_message(#{<<"_source">> := JSON}) ->
MessageId = maps:get(<<"mam_id">>, JSON),
Packet = maps:get(<<"message">>, JSON),
SourceBinJid = maps:get(<<"source_jid">>, JSON),

{ok, Stanza} = exml:parse(Packet),
{MessageId, jid:from_binary(SourceBinJid), Stanza}.
#{id => MessageId, jid => jid:from_binary(SourceBinJid), packet => Stanza}.

hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) ->
MessageId = maps:get(<<"mam_id">>, JSON),
Expand Down
3 changes: 3 additions & 0 deletions src/mam/mod_mam_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ config_items() ->
<<"no_stanzaid_element">> => #option{type = boolean},
<<"is_archivable_message">> => #option{type = atom,
validate = module},
<<"send_message">> => #option{type = atom,
validate = module},
<<"archive_chat_markers">> => #option{type = boolean},
<<"message_retraction">> => #option{type = boolean},

Expand Down Expand Up @@ -202,6 +204,7 @@ valid_core_mod_opts(mod_mam_muc) ->

common_opts() ->
[is_archivable_message,
send_message,
archive_chat_markers,
extra_lookup_params,
full_text_search,
Expand Down
Loading