Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chrzaszcz committed Mar 24, 2022
1 parent 443dc9d commit ff34f9c
Show file tree
Hide file tree
Showing 24 changed files with 917 additions and 989 deletions.
20 changes: 10 additions & 10 deletions big_tests/tests/mod_event_pusher_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

-import(domain_helper, [host_type/0]).

-import(config_parser_helper, [mod_config/2, mod_event_pusher_http_server/0]).

%%%===================================================================
%%% Suite configuration
%%%===================================================================
Expand Down Expand Up @@ -58,9 +60,9 @@ end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(no_prefix, Config) ->
set_modules(Config, []);
set_modules(Config, #{});
init_per_group(with_prefix, Config) ->
set_modules(Config, [{path, "/prefix"}]).
set_modules(Config, #{path => <<"prefix">>}).

end_per_group(_GroupName, Config) ->
dynamic_modules:restore_modules(Config),
Expand Down Expand Up @@ -177,23 +179,21 @@ start_pool() ->
stop_pool() ->
ejabberd_node_utils:call_fun(mongoose_wpool, stop, [http, global, http_pool]).

set_modules(Config0, Opts) ->
set_modules(Config0, ExtraServerOpts) ->
Config = dynamic_modules:save_modules(host_type(), Config0),
Backend = mongoose_helper:get_backend_mnesia_rdbms_riak(host_type()),
ModOffline = create_offline_config(Backend),
ModOpts = [{backends,
[{http,
[{worker_timeout, 500},
{host, http_notifications_host()}] ++ Opts}]}],
Server = maps:merge(mod_event_pusher_http_server(), ExtraServerOpts),
ModOpts = #{http => #{servers => [Server]}},
dynamic_modules:ensure_modules(host_type(), [{mod_event_pusher, ModOpts} | ModOffline]),
Config.

-spec create_offline_config(atom()) -> [{mod_offline, gen_mod:module_opts()}].
create_offline_config(riak) ->
[{mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => riak,
riak => #{bucket_type => <<"offline">>}})}];
[{mod_offline, mod_config(mod_offline, #{backend => riak,
riak => #{bucket_type => <<"offline">>}})}];
create_offline_config(Backend) ->
[{mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})}].
[{mod_offline, mod_config(mod_offline, #{backend => Backend})}].

start_http_listener(simple_message, Prefix) ->
http_helper:start(http_notifications_port(), Prefix, fun process_notification/1);
Expand Down
106 changes: 49 additions & 57 deletions big_tests/tests/mod_event_pusher_rabbit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,15 @@
%%%-------------------------------------------------------------------
-module(mod_event_pusher_rabbit_SUITE).

-include_lib("escalus/include/escalus.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("escalus/include/escalus_xmlns.hrl").
-include_lib("exml/include/exml.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-include("assert_received_match.hrl").

-import(distributed_helper, [mim/0,
rpc/4]).

-import(distributed_helper, [mim/0, rpc/4]).
-import(domain_helper, [domain/0]).
-import(config_parser_helper, [config/2]).

-export([suite/0, all/0, groups/0]).
-export([init_per_suite/1, end_per_suite/1,
Expand Down Expand Up @@ -48,17 +44,6 @@
-define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>).
-define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>).
-define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>).
-define(MOD_EVENT_PUSHER_RABBIT_CFG,
[{presence_exchange, [{name, ?PRESENCE_EXCHANGE}]},
{chat_msg_exchange, [{name, ?CHAT_MSG_EXCHANGE},
{sent_topic, ?CHAT_MSG_SENT_TOPIC},
{recv_topic, ?CHAT_MSG_RECV_TOPIC}]},
{groupchat_msg_exchange, [{name, ?GROUP_CHAT_MSG_EXCHANGE},
{sent_topic, ?GROUP_CHAT_MSG_SENT_TOPIC},
{recv_topic, ?GROUP_CHAT_MSG_RECV_TOPIC}]}
]).
-define(MOD_EVENT_PUSHER_CFG, [{backends,
[{rabbit, ?MOD_EVENT_PUSHER_RABBIT_CFG}]}]).
-define(WPOOL_CFG, #{type => rabbit, scope => host, tag => event_pusher,
opts => #{workers => 20, strategy => best_worker, call_timeout => 5000},
conn_opts => #{confirms_enabled => false,
Expand All @@ -81,17 +66,21 @@

all() ->
[
{group, initialization_on_startup},
{group, pool_startup},
{group, module_startup},
{group, presence_status_publish},
{group, chat_message_publish},
{group, group_chat_message_publish}
].

groups() ->
G = [
{initialization_on_startup, [],
{pool_startup, [],
[
rabbit_pool_starts_with_default_config
]},
{module_startup, [],
[
rabbit_pool_starts_with_default_config,
exchanges_are_created_on_module_startup
]},
{presence_status_publish, [],
Expand Down Expand Up @@ -140,23 +129,47 @@ end_per_suite(Config) ->
muc_helper:unload_muc(),
escalus:end_per_suite(Config).

init_per_group(initialization_on_startup, Config) ->
Config;
init_per_group(_, Config0) ->
init_per_group(GroupName, Config0) ->
Domain = domain(),
Config = dynamic_modules:save_modules(Domain, Config0),
dynamic_modules:ensure_modules(Domain,
[{mod_event_pusher, ?MOD_EVENT_PUSHER_CFG}]),
dynamic_modules:ensure_modules(Domain, required_modules(GroupName)),
Config.

end_per_group(initialization_on_startup, Config) ->
Config;
required_modules(pool_startup) ->
[{mod_event_pusher, stopped}];
required_modules(GroupName) ->
[{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}].

mod_event_pusher_rabbit_opts(GroupName) ->
maps:from_list([{ExchangeKey, exchange_opts(GroupName, ExchangeKey)}
|| ExchangeKey <- exchange_keys()]).

exchange_opts(GroupName, Exchange) ->
config([modules, mod_event_pusher, rabbit, Exchange],
maps:merge(basic_exchange_opts(Exchange), extra_exchange_opts(GroupName))).

basic_exchange_opts(presence_exchange) ->
#{name => ?PRESENCE_EXCHANGE};
basic_exchange_opts(chat_msg_exchange) ->
#{name => ?CHAT_MSG_EXCHANGE,
sent_topic => ?CHAT_MSG_SENT_TOPIC,
recv_topic => ?CHAT_MSG_RECV_TOPIC};
basic_exchange_opts(groupchat_msg_exchange) ->
#{name => ?GROUP_CHAT_MSG_EXCHANGE,
sent_topic => ?GROUP_CHAT_MSG_SENT_TOPIC,
recv_topic => ?GROUP_CHAT_MSG_RECV_TOPIC}.

extra_exchange_opts(module_startup) ->
#{type => <<"headers">>};
extra_exchange_opts(_) ->
#{}.

exchange_keys() ->
[presence_exchange, chat_msg_exchange, groupchat_msg_exchange].

end_per_group(_, Config) ->
delete_exchanges(),
Domain = domain(),
dynamic_modules:stop(Domain, mod_event_pusher),
dynamic_modules:restore_modules(Config),
escalus:delete_users(Config, escalus:get_users([bob, alice])).
dynamic_modules:restore_modules(Config).

init_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
Config;
Expand All @@ -166,19 +179,13 @@ init_per_testcase(CaseName, Config0) ->
Config = Config2 ++ connect_to_rabbit(),
escalus:init_per_testcase(CaseName, Config).

end_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
Config;
end_per_testcase(exchanges_are_created_on_module_startup, Config) ->
delete_exchanges(),
stop_mod_event_pusher_rabbit(),
close_rabbit_connection(Config),
Config;
end_per_testcase(rabbit_pool_starts_with_default_config, _Config) ->
ok;
end_per_testcase(CaseName, Config) ->
maybe_cleanup_muc(CaseName, Config),
close_rabbit_connection(Config),
escalus:end_per_testcase(CaseName, Config).


%%--------------------------------------------------------------------
%% GROUP initialization_on_startup
%%--------------------------------------------------------------------
Expand All @@ -200,18 +207,12 @@ rabbit_pool_starts_with_default_config(_Config) ->
stop_rabbit_wpool(RabbitWpool).

exchanges_are_created_on_module_startup(Config) ->
%% GIVEN
%% GIVEN module is started with custom exchange types
Connection = proplists:get_value(rabbit_connection, Config),
ExCustomType = <<"headers">>,
Exchanges = [{?PRESENCE_EXCHANGE, ExCustomType},
{?CHAT_MSG_EXCHANGE, ExCustomType},
{?GROUP_CHAT_MSG_EXCHANGE, ExCustomType}],
ConfigWithCustomExchangeType =
extend_config_with_exchange_type(ExCustomType),
%% WHEN
start_mod_event_pusher_rabbit(ConfigWithCustomExchangeType),
Exchanges = [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE],
%% THEN exchanges are created
[?assert(ensure_exchange_present(Connection, Exchange))
[?assert(ensure_exchange_present(Connection, {Exchange, ExCustomType}))
|| Exchange <- Exchanges].

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -702,15 +703,6 @@ user_room_jid(RoomJID, UserJID) ->
Nick = nick(UserJID),
<<RoomJID/binary, "/", Nick/binary>>.

extend_config_with_exchange_type(ExType) ->
lists:map(fun({Ex, Opts}) when
Ex == presence_exchange orelse
Ex == chat_msg_exchange orelse
Ex == groupchat_msg_exchange ->
{Ex, Opts ++ [{type, ExType}]};
(Other) -> Other
end, ?MOD_EVENT_PUSHER_RABBIT_CFG).

is_rabbitmq_available() ->
try amqp_connection:start(#amqp_params_network{}) of
{ok, Conn} ->
Expand Down
45 changes: 20 additions & 25 deletions big_tests/tests/mod_event_pusher_sns_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,10 @@
-include("assert_received_match.hrl").

-import(domain_helper, [domain/0]).
-import(config_parser_helper, [config/2]).

-define(NS_HTTP_UPLOAD, <<"urn:xmpp:http:upload">>).
-define(S3_HOSTNAME, "http://bucket.s3-eu-east-25.example.com").
-define(SNS_OPTS,
[
{access_key_id, "AKIAIH54ALYGMZTESTID"},
{secret_access_key, "buRqHOxXCFUQkiuYgdUAy+XoGKt0Ec6DTESTKEY+"},
{region, "eu-west-1"},
{account_id, "123456789012"},
{sns_host, "sns.eu-west-1.amazonaws.com"},
{plugin_module, mod_event_pusher_sns_defaults},
{presence_updates_topic, "user_presence_updated-dev-1"},
{pm_messages_topic, "user_message_sent-dev-1"},
{muc_messages_topic, "user_messagegroup_sent-dev-1"},
{muc_host, "muc.@HOST@"}
]).

-record(publish, {
topic_arn,
Expand Down Expand Up @@ -85,13 +73,22 @@ end_per_suite(Config) ->

init_per_group(_, Config0) ->
Domain = domain(),
Config1 = dynamic_modules:save_modules(Domain, Config0),
Config = [{sns_config, ?SNS_OPTS} | Config1],
Config = dynamic_modules:save_modules(Domain, Config0),
dynamic_modules:ensure_modules(Domain, required_modules()),
Config.

required_modules() ->
[{mod_event_pusher, [{backends, [{sns, ?SNS_OPTS}]}]}].
[{mod_event_pusher, #{sns => config([modules, mod_event_pusher, sns], sns_opts())}}].

sns_opts() ->
#{presence_updates_topic => "user_presence_updated-dev-1",
pm_messages_topic => "user_message_sent-dev-1",
muc_messages_topic => "user_messagegroup_sent-dev-1",
sns_host => "sns.eu-west-1.amazonaws.com",
region => "eu-west-1",
access_key_id => "AKIAIH54ALYGMZTESTID",
secret_access_key => "buRqHOxXCFUQkiuYgdUAy+XoGKt0Ec6DTESTKEY+",
account_id => "123456789012"}.

end_per_group(_, Config) ->
dynamic_modules:restore_modules(Config),
Expand Down Expand Up @@ -127,7 +124,7 @@ connected_user_changes_status(Config) ->
escalus:story(
Config, [{bob, 1}, {alice, 1}],
fun(Bob, Alice) ->
Topic = make_topic_arn(Config, presence_updates_topic),
Topic = make_topic_arn(presence_updates_topic),
BobJID = nick_to_jid(bob, Config),
AliceJID = nick_to_jid(alice, Config),

Expand Down Expand Up @@ -187,7 +184,7 @@ pm_messages(Config) ->
escalus:story(
Config, [{bob, 1}, {alice, 1}],
fun(Bob, Alice) ->
Topic = make_topic_arn(Config, pm_messages_topic),
Topic = make_topic_arn(pm_messages_topic),
BobJID = nick_to_jid(bob, Config),
AliceJID = nick_to_jid(alice, Config),

Expand Down Expand Up @@ -225,7 +222,7 @@ muc_messages(Config) ->
escalus:story(
Config, [{bob, 1}, {alice, 1}],
fun(Bob, Alice) ->
Topic = make_topic_arn(Config, muc_messages_topic),
Topic = make_topic_arn(muc_messages_topic),
Room = ?config(room, Config),
RoomAddr = room_address(Room),
BobJID = nick_to_jid(bob, Config),
Expand Down Expand Up @@ -280,12 +277,10 @@ rpc(M, F, A) ->
Cookie = escalus_ct:get_config(ejabberd_cookie),
escalus_rpc:call(Node, M, F, A, 10000, Cookie).

make_topic_arn(Config, TopicVar) ->
SNSConfig = proplists:get_value(sns_config, Config),
string:join(["arn", "aws", "sns",
proplists:get_value(region, SNSConfig),
proplists:get_value(account_id, SNSConfig),
proplists:get_value(TopicVar, SNSConfig)], ":").
make_topic_arn(TopicVar) ->
#{region := Region, account_id := AccountId} = SNSOpts = sns_opts(),
Topic = maps:get(TopicVar, SNSOpts),
string:join(["arn", "aws", "sns", Region, AccountId, Topic], ":").

%% @doc Get a binary jid of the user, that tagged with `UserName' in the config.
nick_to_jid(UserName, Config) when is_atom(UserName) ->
Expand Down
18 changes: 9 additions & 9 deletions big_tests/tests/push_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-import(escalus_ejabberd, [rpc/3]).
-import(distributed_helper, [subhost_pattern/1]).
-import(domain_helper, [host_type/0]).
-import(config_parser_helper, [mod_config/2]).
-import(config_parser_helper, [mod_config/2, config/2]).
-import(push_helper, [
enable_stanza/2, enable_stanza/3, enable_stanza/4,
disable_stanza/1, disable_stanza/2, become_unavailable/1
Expand All @@ -37,7 +37,7 @@ all() ->
].

groups() ->
G = [
[
{disco, [], [
push_notifications_listed_disco_when_available,
push_notifications_not_listed_disco_when_not_available
Expand Down Expand Up @@ -73,8 +73,7 @@ groups() ->
muclight_msg_notify_if_user_offline_with_publish_options,
muclight_msg_notify_stops_after_disabling
]}
],
ct_helper:repeat_all_until_all_ok(G).
].

notification_groups() ->
[
Expand Down Expand Up @@ -169,9 +168,9 @@ required_modules(_) ->
[pusher_module()].

pusher_module() ->
PushOpts = [{virtual_pubsub_hosts, [subhost_pattern(?VIRTUAL_PUBSUB_DOMAIN)]},
{backend, mongoose_helper:mnesia_or_rdbms_backend()}],
{mod_event_pusher, [{backends, [{push, PushOpts}]}]}.
PushOpts = #{backend => mongoose_helper:mnesia_or_rdbms_backend(),
virtual_pubsub_hosts => [subhost_pattern(?VIRTUAL_PUBSUB_DOMAIN)]},
{mod_event_pusher, #{push => config([modules, mod_event_pusher, push], PushOpts)}}.

muc_light_module() ->
{mod_muc_light,
Expand Down Expand Up @@ -709,7 +708,7 @@ stop_route_listener(Config) ->
Domain = pubsub_domain(Config),
rpc(mongoose_router, unregister_route, [Domain]).

process_packet(_Acc, _From, To, El, #{state := State}) ->
process_packet(Acc, _From, To, El, #{state := State}) ->
#{ pid := TestCasePid, pub_options_ns := PubOptionsNS, push_form_ns := PushFormNS } = State,
PublishXML = exml_query:path(El, [{element, <<"pubsub">>},
{element, <<"publish-options">>},
Expand All @@ -733,7 +732,8 @@ process_packet(_Acc, _From, To, El, #{state := State}) ->
TestCasePid ! #{ error => invalid_namespace,
publish_options0 => PublishOptions,
payload0 => Payload }
end.
end,
Acc.

parse_form(undefined) ->
undefined;
Expand Down
Loading

0 comments on commit ff34f9c

Please sign in to comment.