Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
chrzaszcz committed Jan 18, 2023
1 parent 20574c0 commit 571431a
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 147 deletions.
189 changes: 95 additions & 94 deletions big_tests/tests/pep_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
]).

-export([
start_caps_clients/2,
send_initial_presence_with_caps/2
]).

Expand All @@ -55,8 +56,8 @@ all() ->
].

groups() ->
G = [
{pep_tests, [parallel],
[
{pep_tests, [parallel, {repeat_until_any_fail, 100}],
[
disco_test,
disco_sm_test,
Expand All @@ -80,8 +81,7 @@ groups() ->
unsubscribe_after_presence_unsubscription
]
}
],
ct_helper:repeat_all_until_all_ok(G).
].

suite() ->
require_rpc_nodes([mim]) ++ escalus:suite().
Expand Down Expand Up @@ -156,7 +156,7 @@ disco_sm_test(Config) ->
disco_sm_items_test(Config) ->
NodeNS = random_node_ns(),
escalus:fresh_story(
set_caps_node(NodeNS, Config),
Config, %%set_caps_node(NodeNS, Config),
[{alice, 1}],
fun(Alice) ->
AliceJid = escalus_client:short_jid(Alice),
Expand Down Expand Up @@ -191,31 +191,38 @@ pep_caps_test(Config) ->
%% Send presence with capabilities (chap. 1 ex. 4)
%% Server does not know the version string, so it requests feature list
send_presence_with_caps(Bob, Caps),
DiscoRequest = escalus:wait_for_stanza(Bob),
receive_presence_with_caps(Bob, Bob, Caps),

DiscoRequest = escalus:wait_for_stanza(Bob),
%% Client responds with a list of supported features (chap. 1 ex. 5)
send_caps_disco_result(Bob, DiscoRequest, NodeNS),

receive_presence_with_caps(Bob, Bob, Caps)
send_caps_disco_result(Bob, DiscoRequest, NodeNS)
end).

publish_and_notify_test(Config) ->
NodeNS = random_node_ns(),
escalus:fresh_story(
set_caps_node(NodeNS, Config),
[{alice, 1}, {bob, 1}],
fun(Alice, Bob) ->
escalus_story:make_all_clients_friends([Alice, Bob]),

pubsub_tools:publish(Alice, <<"item1">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Bob, <<"item1">>, {escalus_utils:get_short_jid(Alice), NodeNS}, [])
end).

set_caps_node(NodeNS, Config) ->
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]}
| Config].
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {bob, 1}], fun publish_and_notify_story/3).

publish_and_notify_story(Config, Alice, Bob) ->
NodeNS = ?config(node_ns, Config),
escalus_story:make_all_clients_friends([Alice, Bob]),
pubsub_tools:publish(Alice, <<"item1">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Bob, <<"item1">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []).

start_caps_clients(Config, UserCDs) ->
UserCount = length(UserCDs),
lists:map(fun({UserSpec, Resource}) ->
start_caps_client(UserSpec, Resource, Config, UserCount)
end, UserCDs).

start_caps_client(UserSpec, Resource, Config, UserCount) ->
NodeNS = ?config(node_ns, Config),
{ok, Client} = escalus_client:start(Config, UserSpec, Resource),
send_initial_presence_with_caps(NodeNS, Client),
escalus_story:drop_presences(Client, 1),
handle_requested_caps(NodeNS, Client),
escalus_story:drop_presences(Client, UserCount - 1),
Client.

publish_options_test(Config) ->
% Given pubsub is configured with pep plugin
Expand Down Expand Up @@ -252,82 +259,65 @@ send_caps_after_login_test(Config) ->
timer:sleep(1000),

Caps = caps(NodeNS),
send_presence_with_caps_and_handle_disco(Bob, Caps, NodeNS),
send_presence_with_caps(Bob, Caps),
receive_presence_with_caps(Bob, Bob, Caps),
receive_presence_with_caps(Alice, Bob, Caps),

handle_requested_caps(NodeNS, Bob),

pubsub_tools:receive_item_notification(
Bob, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),

[] = escalus_client:peek_stanzas(Bob)
end).

delayed_receive(Config) ->
%% if alice publishes an item and then bob subscribes successfully to her presence
%% then bob will receive the item right after final subscription stanzas
NodeNS = random_node_ns(),
escalus:fresh_story(
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]}
| Config],
[{alice, 1}, {bob, 1}],
fun(Alice, Bob) ->
pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
ct:pal("Message: ~p", [Message]),
pubsub_tools:check_item_notification(
Message, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),
ok
end).
%% if alice publishes an item and then bob subscribes successfully to her presence
%% then bob will receive the item right after final subscription stanzas
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {bob, 1}], fun delayed_receive_story/3).

delayed_receive_story(Config, Alice, Bob) ->
NodeNS = ?config(node_ns, Config),
pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
Node = {escalus_utils:get_short_jid(Alice), NodeNS},
pubsub_tools:check_item_notification(Message, <<"item2">>, Node, []).

delayed_receive_with_sm(Config) ->
%% Same as delayed_receive but with stream management turned on
NodeNS = random_node_ns(),
escalus:fresh_story(
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]}
| Config],
[{alice, 1}, {bob, 1}],
fun(Alice, Bob) ->
enable_sm(Alice),
enable_sm(Bob),
publish_with_sm(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
ct:pal("Message: ~p", [Message]),
pubsub_tools:check_item_notification(Message,
<<"item2">>,
{escalus_utils:get_short_jid(Alice), NodeNS},
[]),
ok
end).
%% Same as delayed_receive but with stream management turned on
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {bob, 1}],
fun delayed_receive_with_sm_story/3).

delayed_receive_with_sm_story(Config, Alice, Bob) ->
NodeNS = ?config(node_ns, Config),
enable_sm(Alice),
enable_sm(Bob),
publish_with_sm(Alice, <<"item2">>, {pep, NodeNS}, []),
[Message] = make_friends(Bob, Alice),
Node = {escalus_utils:get_short_jid(Alice), NodeNS},
pubsub_tools:check_item_notification(Message, <<"item2">>, Node, []).

h_ok_after_notify_test(ConfigIn) ->
Config = escalus_users:update_userspec(ConfigIn, kate,
stream_management, true),
NodeNS = random_node_ns(),
escalus:fresh_story(
[{escalus_overrides,
[{initial_activity, {?MODULE, send_initial_presence_with_caps, [NodeNS]}}]} | Config ],
[{alice, 1}, {kate, 1}],
fun(Alice, Kate) ->
escalus_story:make_all_clients_friends([Alice, Kate]),

%% TODO: Dirty fix. For some reason PEP resends item2 with <delay> element,
%% so probably there is some race condition that applies to becoming friends
%% and publishing
timer:sleep(1000),
Config = escalus_users:update_userspec(ConfigIn, kate, stream_management, true),
Config1 = set_caps(Config),
escalus:fresh_story_with_config(Config1, [{alice, 1}, {kate, 1}],
fun h_ok_after_notify_test_story/3).

pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Kate, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),
h_ok_after_notify_test_story(Config, Alice, Kate) ->
NodeNS = ?config(node_ns, Config),
escalus_story:make_all_clients_friends([Alice, Kate]),

H = escalus_tcp:get_sm_h(Kate#client.rcv_pid),
escalus:send(Kate, escalus_stanza:sm_ack(H)),
pubsub_tools:publish(Alice, <<"item2">>, {pep, NodeNS}, []),
pubsub_tools:receive_item_notification(
Kate, <<"item2">>, {escalus_utils:get_short_jid(Alice), NodeNS}, []),

escalus_connection:send(Kate, escalus_stanza:sm_request()),
escalus:assert(is_sm_ack,
escalus_connection:get_stanza(Kate, stream_mgmt_ack))
end).
H = escalus_tcp:get_sm_h(Kate#client.rcv_pid),
escalus:send(Kate, escalus_stanza:sm_ack(H)),

escalus_connection:send(Kate, escalus_stanza:sm_request()),
escalus:assert(is_sm_ack, escalus_connection:get_stanza(Kate, stream_mgmt_ack)).

authorize_access_model(Config) ->
escalus:fresh_story(Config,
Expand Down Expand Up @@ -381,7 +371,7 @@ unsubscribe_after_presence_unsubscription(Config) ->

%% Unsubscription from PEP nodes is implicit
pubsub_tools:publish(Alice, <<"salmon">>, {pep, NodeNS}, []),
[] = escalus:wait_for_stanzas(Bob, 1),
[] = escalus:wait_for_stanzas(Bob, 1, 2000),

pubsub_tools:delete_node(Alice, PepNode, [])
end).
Expand All @@ -407,17 +397,24 @@ required_modules(cache_tests) ->
last_item_cache => mongoose_helper:mnesia_or_rdbms_backend()
})}].

send_initial_presence_with_caps(NodeNS, User) ->
case string:to_lower(binary_to_list(escalus_client:username(User))) of
"alice" ++ _ -> escalus_story:send_initial_presence(User);
"bob" ++ _ -> send_presence_with_caps_and_handle_disco(User, caps(NodeNS), NodeNS);
"kate" ++ _ -> send_presence_with_caps_and_handle_disco(User, caps(NodeNS), NodeNS)
send_initial_presence_with_caps(NodeNS, Client) ->
case is_caps_client(Client) of
false -> escalus_story:send_initial_presence(Client);
true -> send_presence_with_caps(Client, caps(NodeNS))
end.

send_presence_with_caps_and_handle_disco(User, Caps, NodeNS) ->
send_presence_with_caps(User, Caps),
DiscoRequest = escalus:wait_for_stanza(User),
send_caps_disco_result(User, DiscoRequest, NodeNS).
handle_requested_caps(NodeNS, User) ->
case is_caps_client(User) of
false -> ok;
true -> DiscoRequest = escalus:wait_for_stanza(User),
send_caps_disco_result(User, DiscoRequest, NodeNS)
end.

is_caps_client(Client) ->
case escalus_client:username(Client) of
<<"alice", _/binary>> -> false;
_ -> true
end.

send_presence_with_caps(User, Caps) ->
Presence = escalus_stanza:presence(<<"available">>, [Caps]),
Expand All @@ -443,6 +440,10 @@ verify_publish_options(FullNodeConfig, Options) ->
lists:member(Option, NodeConfig)
end, Options).

set_caps(Config) ->
[{escalus_overrides, [{start_ready_clients, {?MODULE, start_caps_clients}}]},
{node_ns, random_node_ns()} | Config].

%%-----------------------------------------------------------------
%% XML helpers
%%-----------------------------------------------------------------
Expand Down
28 changes: 26 additions & 2 deletions src/mod_caps.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@

-export([user_send_presence/3,
user_receive_presence/3,
c2s_broadcast_recipients/3]).
c2s_broadcast_recipients/3,
filter_pep_recipient/3]).

%% for test cases
-export([delete_caps/1, make_disco_hash/2]).
Expand Down Expand Up @@ -326,7 +327,7 @@ upsert_caps(LFrom, Caps, Rs) ->
Acc :: [jid:simple_jid()],
Params :: #{c2s_data := mongoose_c2s:data(), type := {atom(), binary()}},
Extra :: map().
c2s_broadcast_recipients(InAcc, #{c2s_data := C2SData, type := {pep_message, Feature}}, _) ->
c2s_broadcast_recipients(InAcc, #{state := C2SData, type := {pep_message, Feature}}, _) ->
HostType = mongoose_c2s:get_host_type(C2SData),
NewAcc = case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Rs} ->
Expand All @@ -347,6 +348,28 @@ filter_recipients_by_caps(HostType, InAcc, Feature, Rs) ->
end,
InAcc, Rs).

-spec filter_pep_recipient(Acc, Params, Extra) -> {ok | stop, Acc} when
Acc :: boolean(),
Params :: #{state := ejabberd_c2s:state(), feature := binary(), to := jid:jid()},
Extra :: gen_hook:extra().
filter_pep_recipient(InAcc, #{state := C2SData, feature := Feature, to := To}, _) ->
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
{ok, Rs} ->
?LOG_DEBUG(#{what => caps_lookup, text => <<"Look for CAPS for To jid">>,
acc => InAcc, c2s_state => C2SData, caps_resources => Rs}),
LTo = jid:to_lower(To),
case gb_trees:lookup(LTo, Rs) of
{value, Caps} ->
HostType = mongoose_c2s:get_host_type(C2SData),
Drop = not lists:member(Feature, get_features_list(HostType, Caps)),
{stop, Drop};
none ->
{stop, true}
end;
_ -> {ok, InAcc}
end;
filter_pep_recipient(Acc, _, _) -> {ok, Acc}.

init_db(mnesia) ->
case catch mnesia:table_info(caps_features, storage_type) of
{'EXIT', _} ->
Expand Down Expand Up @@ -391,6 +414,7 @@ terminate(_Reason, #state{host_type = HostType}) ->
hooks(HostType) ->
[{disco_local_features, HostType, fun ?MODULE:disco_local_features/3, #{}, 1},
{c2s_broadcast_recipients, HostType, fun ?MODULE:c2s_broadcast_recipients/3, #{}, 75},
{filter_pep_recipient, HostType, fun ?MODULE:filter_pep_recipient/3, #{}, 75},
{user_send_presence, HostType, fun ?MODULE:user_send_presence/3, #{}, 75},
{user_receive_presence, HostType, fun ?MODULE:user_receive_presence/3, #{}, 1},
{c2s_stream_features, HostType, fun ?MODULE:caps_stream_features/3, #{}, 75},
Expand Down
18 changes: 17 additions & 1 deletion src/mod_presence.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
am_i_subscribed_to_presence/3,
presence_unavailable_stanza/0,
get_presence/1,
get_subscribed/1,
set_presence/2,
maybe_get_handler/1
]).
Expand All @@ -61,10 +62,14 @@ stop(HostType) ->
supported_features() ->
[dynamic_domains].

-spec get_presence(pid()) -> {binary(), binary()}.
-spec get_presence(pid()) -> {jid:luser(), jid:lresource(), binary(), binary()}.
get_presence(Pid) ->
gen_statem:call(Pid, get_presence).

-spec get_subscribed(pid()) -> [jid:jid()].
get_subscribed(Pid) ->
gen_statem:call(Pid, get_subscribed).

-spec set_presence(pid(), exml:element()) -> ok.

set_presence(Pid, Message) ->
Expand Down Expand Up @@ -152,6 +157,17 @@ foreign_event(Acc, #{c2s_data := StateData,
Reply = {User, Resource, get_showtag(PresLast), get_statustag(PresLast)},
Acc1 = mongoose_c2s_acc:to_acc(Acc, actions, [{reply, From, Reply}]),
{stop, Acc1};
foreign_event(Acc, #{c2s_data := StateData,
event_type := {call, From},
event_content := get_subscribed}, _Extra) ->
Subscribed = case get_mod_state(StateData) of
{error, not_found} ->
[];
#presences_state{pres_f = PresF} ->
gb_sets:to_list(PresF)
end,
Acc1 = mongoose_c2s_acc:to_acc(Acc, actions, [{reply, From, Subscribed}]),
{stop, Acc1};
foreign_event(Acc, #{c2s_data := StateData,
event_type := cast,
event_content := {set_presence, Message}}, _Extra) ->
Expand Down
12 changes: 12 additions & 0 deletions src/mongoose_hooks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
xmpp_stanza_dropped/4]).

-export([c2s_broadcast_recipients/4,
filter_pep_recipient/4,
c2s_stream_features/2,
check_bl_c2s/1,
forbidden_session_hook/3,
Expand Down Expand Up @@ -466,6 +467,17 @@ c2s_broadcast_recipients(State, Type, From, Packet) ->
HostType = mongoose_c2s:get_host_type(State),
run_hook_for_host_type(c2s_broadcast_recipients, HostType, [], Params).

-spec filter_pep_recipient(State, Feature, To, Packet) -> Result when
State :: ejabberd_c2s:state(),
Feature :: binary(),
To :: jid:jid(),
Packet :: exml:element(),
Result :: boolean().
filter_pep_recipient(State, Feature, To, Packet) ->
Params = #{state => State, feature => Feature, to => To, packet => Packet},
HostType = mongoose_c2s:get_host_type(State),
run_hook_for_host_type(filter_pep_recipient, HostType, true, Params).

-spec c2s_stream_features(HostType, LServer) -> Result when
HostType :: mongooseim:host_type(),
LServer :: jid:lserver(),
Expand Down
Loading

0 comments on commit 571431a

Please sign in to comment.