Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chrzaszcz committed Jan 19, 2023
1 parent 1424476 commit 309f43a
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 172 deletions.
2 changes: 1 addition & 1 deletion big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
{suites, "tests", oauth_SUITE}.
{suites, "tests", offline_SUITE}.
{suites, "tests", offline_stub_SUITE}.
% {suites, "tests", pep_SUITE}.
{suites, "tests", pep_SUITE}.
{suites, "tests", persistent_cluster_id_SUITE}.
{suites, "tests", presence_SUITE}.
{suites, "tests", privacy_SUITE}.
Expand Down
193 changes: 93 additions & 100 deletions big_tests/tests/pep_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("escalus/include/escalus_xmlns.hrl").
-include_lib("exml/include/exml.hrl").
-include_lib("exml/include/exml_stream.hrl").
-include_lib("eunit/include/eunit.hrl").

-export([suite/0, all/0, groups/0]).
Expand All @@ -35,6 +34,7 @@
]).

-export([
start_caps_clients/2,
send_initial_presence_with_caps/2
]).

Expand All @@ -55,7 +55,7 @@ all() ->
].

groups() ->
G = [
[
{pep_tests, [parallel],
[
disco_test,
Expand All @@ -80,8 +80,7 @@ groups() ->
unsubscribe_after_presence_unsubscription
]
}
],
ct_helper:repeat_all_until_all_ok(G).
].

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().
Expand Down Expand Up @@ -156,7 +155,7 @@ disco_sm_test(Config) ->
disco_sm_items_test(Config) ->
NodeNS = random_node_ns(),
escalus:fresh_story(
set_caps_node(NodeNS, Config),
Config, %%set_caps_node(NodeNS, Config),
[{alice, 1}],
fun(Alice) ->
AliceJid = escalus_client:short_jid(Alice),
Expand Down Expand Up @@ -191,31 +190,38 @@ pep_caps_test(Config) ->
%% Send presence with capabilities (chap. 1 ex. 4)
%% Server does not know the version string, so it requests feature list
send_presence_with_caps(Bob, Caps),
DiscoRequest = escalus:wait_for_stanza(Bob),
receive_presence_with_caps(Bob, Bob, Caps),

DiscoRequest = escalus:wait_for_stanza(Bob),
%% Client responds with a list of supported features (chap. 1 ex. 5)
send_caps_disco_result(Bob, DiscoRequest, NodeNS),

receive_presence_with_caps(Bob, Bob, Caps)
send_caps_disco_result(Bob, DiscoRequest, NodeNS)
end).

publish_and_notify_test(Config) ->
NodeNS = random_node_ns(),
escalus:fresh_story(
set_caps_node(NodeNS, Config),
[{alice, 1}, {bob, 1}],
fun(Alice, Bob) ->
escalus_story:make_all_clients_friends([Alice, Bob]),

pubsub_tools:publish(Alice, <<"item1">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Bob, <<"item1">>, {escalus_utils:get_short_jid(Alice), NodeNS}, [])
end).

set_caps_node(NodeNS, Config) ->
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]}
| Config].
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {bob, 1}], fun publish_and_notify_story/3).

publish_and_notify_story(Config, Alice, Bob) ->
NodeNS = ?config(node_ns, Config),
escalus_story:make_all_clients_friends([Alice, Bob]),
pubsub_tools:publish(Alice, <<"item1">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Bob, <<"item1">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []).

start_caps_clients(Config, UserCDs) ->
UserCount = length(UserCDs),
lists:map(fun({UserSpec, Resource}) ->
start_caps_client(UserSpec, Resource, Config, UserCount)
end, UserCDs).

start_caps_client(UserSpec, Resource, Config, UserCount) ->
NodeNS = ?config(node_ns, Config),
{ok, Client} = escalus_client:start(Config, UserSpec, Resource),
send_initial_presence_with_caps(NodeNS, Client),
escalus_story:drop_presences(Client, 1),
handle_requested_caps(NodeNS, Client),
escalus_story:drop_presences(Client, UserCount - 1),
Client.

publish_options_test(Config) ->
% Given pubsub is configured with pep plugin
Expand Down Expand Up @@ -244,90 +250,66 @@ send_caps_after_login_test(Config) ->

escalus_story:make_all_clients_friends([Alice, Bob]),

%% Presence subscription triggers PEP last item sending
%% and sometimes this async process takes place after caps
%% are updated, leading to duplicated notification
%% We use timer:sleep here to avoid it for now, because
%% TODO: mod_pubsub send loop has to be fixed, supervised, refactored etc.
timer:sleep(1000),

Caps = caps(NodeNS),
send_presence_with_caps_and_handle_disco(Bob, Caps, NodeNS),
send_presence_with_caps(Bob, Caps),
receive_presence_with_caps(Bob, Bob, Caps),
receive_presence_with_caps(Alice, Bob, Caps),

handle_requested_caps(NodeNS, Bob),

pubsub_tools:receive_item_notification(
Bob, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),

[] = escalus_client:peek_stanzas(Bob)
end).

delayed_receive(Config) ->
%% if alice publishes an item and then bob subscribes successfully to her presence
%% then bob will receive the item right after final subscription stanzas
NodeNS = random_node_ns(),
escalus:fresh_story(
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]}
| Config],
[{alice, 1}, {bob, 1}],
fun(Alice, Bob) ->
pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
ct:pal("Message: ~p", [Message]),
pubsub_tools:check_item_notification(
Message, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),
ok
end).
%% if alice publishes an item and then bob subscribes successfully to her presence
%% then bob will receive the item right after final subscription stanzas
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {bob, 1}], fun delayed_receive_story/3).

delayed_receive_story(Config, Alice, Bob) ->
NodeNS = ?config(node_ns, Config),
pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
Node = {escalus_utils:get_short_jid(Alice), NodeNS},
pubsub_tools:check_item_notification(Message, <<"item2">>, Node, []).

delayed_receive_with_sm(Config) ->
%% Same as delayed_receive but with stream management turned on
NodeNS = random_node_ns(),
escalus:fresh_story(
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]}
| Config],
[{alice, 1}, {bob, 1}],
fun(Alice, Bob) ->
enable_sm(Alice),
enable_sm(Bob),
publish_with_sm(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
ct:pal("Message: ~p", [Message]),
pubsub_tools:check_item_notification(Message,
<<"item2">>,
{escalus_utils:get_short_jid(Alice), NodeNS},
[]),
ok
end).
%% Same as delayed_receive but with stream management turned on
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {bob, 1}],
fun delayed_receive_with_sm_story/3).

delayed_receive_with_sm_story(Config, Alice, Bob) ->
NodeNS = ?config(node_ns, Config),
enable_sm(Alice),
enable_sm(Bob),
publish_with_sm(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
Node = {escalus_utils:get_short_jid(Alice), NodeNS},
pubsub_tools:check_item_notification(Message, <<"item2">>, Node, []).

h_ok_after_notify_test(ConfigIn) ->
Config = escalus_users:update_userspec(ConfigIn, kate,
stream_management, true),
NodeNS = random_node_ns(),
escalus:fresh_story(
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]} | Config ],
[{alice, 1}, {kate, 1}],
fun(Alice, Kate) ->
escalus_story:make_all_clients_friends([Alice, Kate]),
Config = escalus_users:update_userspec(ConfigIn, kate, stream_management, true),
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {kate, 1}],
fun h_ok_after_notify_test_story/3).

%% TODO: Dirty fix. For some reason PEP resends item2 with <delay> element,
%% so probably there is some race condition that applies to becoming friends
%% and publishing
timer:sleep(1000),
h_ok_after_notify_test_story(Config, Alice, Kate) ->
NodeNS = ?config(node_ns, Config),
escalus_story:make_all_clients_friends([Alice, Kate]),

pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Kate, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),
pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Kate, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),

H = escalus_tcp:get_sm_h(Kate#client.rcv_pid),
escalus:send(Kate, escalus_stanza:sm_ack(H)),
H = escalus_tcp:get_sm_h(Kate#client.rcv_pid),
escalus:send(Kate, escalus_stanza:sm_ack(H)),

escalus_connection:send(Kate, escalus_stanza:sm_request()),
escalus:assert(is_sm_ack,
escalus_connection:get_stanza(Kate, stream_mgmt_ack))
end).
escalus_connection:send(Kate, escalus_stanza:sm_request()),
escalus:assert(is_sm_ack, escalus_connection:get_stanza(Kate, stream_mgmt_ack)).

authorize_access_model(Config) ->
escalus:fresh_story(Config,
Expand Down Expand Up @@ -381,7 +363,7 @@ unsubscribe_after_presence_unsubscription(Config) ->

%% Unsubscription from PEP nodes is implicit
pubsub_tools:publish(Alice, <<"salmon">>, {pep, NodeNS}, []),
[] = escalus:wait_for_stanzas(Bob, 1),
[] = escalus:wait_for_stanzas(Bob, 1, 2000),

pubsub_tools:delete_node(Alice, PepNode, [])
end).
Expand All @@ -407,17 +389,24 @@ required_modules(cache_tests) ->
last_item_cache => mongoose_helper:mnesia_or_rdbms_backend()
})}].

send_initial_presence_with_caps(NodeNS, User) ->
case string:to_lower(binary_to_list(escalus_client:username(User))) of
"alice" ++ _ -> escalus_story:send_initial_presence(User);
"bob" ++ _ -> send_presence_with_caps_and_handle_disco(User, caps(NodeNS), NodeNS);
"kate" ++ _ -> send_presence_with_caps_and_handle_disco(User, caps(NodeNS), NodeNS)
send_initial_presence_with_caps(NodeNS, Client) ->
case is_caps_client(Client) of
false -> escalus_story:send_initial_presence(Client);
true -> send_presence_with_caps(Client, caps(NodeNS))
end.

send_presence_with_caps_and_handle_disco(User, Caps, NodeNS) ->
send_presence_with_caps(User, Caps),
DiscoRequest = escalus:wait_for_stanza(User),
send_caps_disco_result(User, DiscoRequest, NodeNS).
handle_requested_caps(NodeNS, User) ->
case is_caps_client(User) of
false -> ok;
true -> DiscoRequest = escalus:wait_for_stanza(User),
send_caps_disco_result(User, DiscoRequest, NodeNS)
end.

is_caps_client(Client) ->
case escalus_client:username(Client) of
<<"alice", _/binary>> -> false;
_ -> true
end.

send_presence_with_caps(User, Caps) ->
Presence = escalus_stanza:presence(<<"available">>, [Caps]),
Expand All @@ -443,6 +432,10 @@ verify_publish_options(FullNodeConfig, Options) ->
lists:member(Option, NodeConfig)
end, Options).

set_caps(Config) ->
[{escalus_overrides, [{start_ready_clients, {?MODULE, start_caps_clients}}]},
{node_ns, random_node_ns()} | Config].

%%-----------------------------------------------------------------
%% XML helpers
%%-----------------------------------------------------------------
Expand Down
34 changes: 29 additions & 5 deletions src/mod_caps.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

-export([user_send_presence/3,
user_receive_presence/3,
c2s_broadcast_recipients/3]).
get_pep_recipients/3,
filter_pep_recipient/3]).

%% for test cases
-export([delete_caps/1, make_disco_hash/2]).
Expand Down Expand Up @@ -322,19 +323,19 @@ upsert_caps(LFrom, Caps, Rs) ->
gb_trees:update(LFrom, Caps, Rs)
end.

-spec c2s_broadcast_recipients(Acc, Params, Extra) -> {ok, Acc} when
-spec get_pep_recipients(Acc, Params, Extra) -> {ok, Acc} when
Acc :: [jid:simple_jid()],
Params :: #{c2s_data := mongoose_c2s:data(), type := {atom(), binary()}},
Extra :: map().
c2s_broadcast_recipients(InAcc, #{c2s_data := C2SData, type := {pep_message, Feature}}, _) ->
get_pep_recipients(InAcc, #{state := C2SData, feature := Feature}, _) ->
HostType = mongoose_c2s:get_host_type(C2SData),
NewAcc = case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Rs} ->
filter_recipients_by_caps(HostType, InAcc, Feature, Rs);
_ -> InAcc
end,
{ok, NewAcc};
c2s_broadcast_recipients(Acc, _, _) -> {ok, Acc}.
get_pep_recipients(Acc, _, _) -> {ok, Acc}.

-spec filter_recipients_by_caps(mongooseim:host_type(), Acc, binary(), caps_resources()) -> Acc
when Acc :: [jid:simple_jid()].
Expand All @@ -347,6 +348,28 @@ filter_recipients_by_caps(HostType, InAcc, Feature, Rs) ->
end,
InAcc, Rs).

-spec filter_pep_recipient(Acc, Params, Extra) -> {ok | stop, Acc} when
Acc :: boolean(),
Params :: #{state := ejabberd_c2s:state(), feature := binary(), to := jid:jid()},
Extra :: gen_hook:extra().
filter_pep_recipient(InAcc, #{state := C2SData, feature := Feature, to := To}, _) ->
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Rs} ->
?LOG_DEBUG(#{what => caps_lookup, text => <<"Look for CAPS for To jid">>,
acc => InAcc, c2s_state => C2SData, caps_resources => Rs}),
LTo = jid:to_lower(To),
case gb_trees:lookup(LTo, Rs) of
{value, Caps} ->
HostType = mongoose_c2s:get_host_type(C2SData),
Drop = not lists:member(Feature, get_features_list(HostType, Caps)),
{stop, Drop};
none ->
{stop, true}
end;
_ -> {ok, InAcc}
end;
filter_pep_recipient(Acc, _, _) -> {ok, Acc}.

init_db(mnesia) ->
case catch mnesia:table_info(caps_features, storage_type) of
{'EXIT', _} ->
Expand Down Expand Up @@ -390,7 +413,8 @@ terminate(_Reason, #state{host_type = HostType}) ->

hooks(HostType) ->
[{disco_local_features, HostType, fun ?MODULE:disco_local_features/3, #{}, 1},
{c2s_broadcast_recipients, HostType, fun ?MODULE:c2s_broadcast_recipients/3, #{}, 75},
{get_pep_recipients, HostType, fun ?MODULE:get_pep_recipients/3, #{}, 75},
{filter_pep_recipient, HostType, fun ?MODULE:filter_pep_recipient/3, #{}, 75},
{user_send_presence, HostType, fun ?MODULE:user_send_presence/3, #{}, 75},
{user_receive_presence, HostType, fun ?MODULE:user_receive_presence/3, #{}, 1},
{c2s_stream_features, HostType, fun ?MODULE:caps_stream_features/3, #{}, 75},
Expand Down
Loading

0 comments on commit 309f43a

Please sign in to comment.