Skip to content

Commit

Permalink
Add support for automatic stream management
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Zelazny committed Jul 22, 2014
1 parent fa2c06f commit 1658f58
Show file tree
Hide file tree
Showing 11 changed files with 493 additions and 118 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ test: rebar compile
./rebar skip_deps=true eunit

clean: rebar

ct: compile
./run_ct TESTSPEC=$(TESTSPEC)
./rebar clean

rebar:
Expand Down
3 changes: 3 additions & 0 deletions include/escalus_xmlns.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,9 @@
% Defined by XEP-0297: Stanza Forwarding
-define(NS_FORWARD_0, <<"urn:xmpp:forward:0">>).

% Defined by XEP-0313: Message Archive Management (MAM)
-define(NS_MAM, <<"urn:xmpp:mam:tmp">>).

% Defined by XHTML 1.0.
-define(NS_XHTML, <<"http://www.w3.org/1999/xhtml">>).

Expand Down
2 changes: 1 addition & 1 deletion run_ct
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
ct_run -config test/test.config -logdir logs -dir test -erl_args -pa $PWD/ebin $PWD/deps/exml/ebin
ct_run -config test/test.config -logdir logs -dir test -erl_args -pa $PWD/ebin $PWD/deps/exml/ebin $PWD/deps/base16/ebin
21 changes: 18 additions & 3 deletions src/escalus_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
-export([connect/1,
send/2,
get_stanza/2,
get_sm_h/1,
set_sm_h/2,
reset_parser/1,
is_connected/1,
kill/1]).
Expand Down Expand Up @@ -46,7 +48,8 @@ start(Props) ->
authenticate,
bind,
session,
maybe_stream_management]).
maybe_stream_management,
maybe_use_carbons]).

%% Usage:
%%
Expand Down Expand Up @@ -133,6 +136,18 @@ get_stanza(Conn, Name) ->
throw({timeout, Name})
end.

-spec get_sm_h(#client{}) -> non_neg_integer().
get_sm_h(#client{module = escalus_tcp} = Conn) ->
escalus_tcp:get_sm_h(Conn);
get_sm_h(#client{module = Mod}) ->
error({get_sm_h, {undefined_for_escalus_module, Mod}}).

-spec set_sm_h(#client{}, non_neg_integer()) -> non_neg_integer().
set_sm_h(#client{module = escalus_tcp} = Conn, H) ->
escalus_tcp:set_sm_h(Conn, H);
set_sm_h(#client{module = Mod}, _) ->
error({set_sm_h, {undefined_for_escalus_module, Mod}}).

reset_parser(#client{module = Mod} = Client) ->
Mod:reset_parser(Client).

Expand All @@ -143,8 +158,8 @@ stop(#client{module = Mod} = Client) ->
Mod:stop(Client).

%% Brutally kill the connection without terminating the XMPP stream.
kill(#client{module = Mod} = Transport) ->
Mod:kill(Transport).
kill(#client{module = Mod} = Client) ->
Mod:kill(Client).

%%%===================================================================
%%% Helpers
Expand Down
25 changes: 21 additions & 4 deletions src/escalus_pred.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@
is_failed/2,
is_ack/1, is_ack/2,
is_ack_request/1,
is_resumed/1,
is_resumed/2,
has_ns/2,
is_compressed/1
is_compressed/1,
is_mam_archived_message/2
]).

-export(['not'/1]).
Expand Down Expand Up @@ -177,7 +179,7 @@ has_carbon(Type, From, To, Msg, Stanza) ->
is_forwarded_message(From, To, Msg, #xmlel{name = <<"forwarded">>} = Stanza) ->
has_ns(?NS_FORWARD_0, Stanza)
andalso
is_chat_message_from_to(From, To, Msg,
is_chat_message_from_to(From, To, Msg,
exml_query:subelement(Stanza, <<"message">>)).

-spec is_chat_message(binary(), xmlterm()) -> boolean().
Expand All @@ -201,6 +203,14 @@ is_groupchat_message(Stanza) ->
andalso
has_type(<<"groupchat">>, Stanza).

%% Xep-0313 archived messages
is_mam_archived_message(Msg, #xmlel{} = Stanza) ->
M = exml_query:path(Stanza, [{element, <<"result">>},
{element, <<"forwarded">>},
{element, <<"message">>}]),
is_chat_message(Msg,M).


%% TODO: escalus_compat:bin/1 should be deprecated;
%% let's just use binaries instead of "maybe strings, maybe binaries"
-spec is_groupchat_message(binary(), xmlterm()) -> boolean().
Expand Down Expand Up @@ -229,7 +239,7 @@ has_type(Type, Stanza) ->
is_0184_request(#xmlel{children = Els}) ->
#xmlel{ name = <<"request">>,
attrs = [{<<"xmlns">>, <<"urn:xmpp:receipts">>}],
children = [] } =:= lists:keyfind(<<"request">>, 2, Els).
children = [] } =:= lists:keyfind(<<"request">>, 2, Els).

-spec is_0184_receipt(xmlterm(), xmlterm()) -> boolean().
is_0184_receipt(#xmlel{ attrs = ReqAttrs } = Request, Receipt) ->
Expand Down Expand Up @@ -556,6 +566,14 @@ is_ack_request(#xmlel{name = <<"r">>} = Stanza) ->
is_ack_request(_) ->
false.

is_resumed(#xmlel{name = <<"resumed">>} = Stanza) ->
%% Less strict checking (no SMID verification)
has_ns(?NS_STREAM_MGNT_3, Stanza)
andalso
exml_query:attr(Stanza, <<"h">>) /= undefined;
is_resumed(_) ->
false.

is_resumed(SMID, #xmlel{name = <<"resumed">>} = Stanza) ->
has_ns(?NS_STREAM_MGNT_3, Stanza)
andalso
Expand Down Expand Up @@ -597,4 +615,3 @@ get_roster_items(Stanza) ->
-spec has_path(xmlterm(), exml_query:path()) -> boolean().
has_path(Stanza, Path) ->
exml_query:path(Stanza, Path) /= undefined.

99 changes: 44 additions & 55 deletions src/escalus_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
bind/2,
compress/2,
use_ssl/2,
can_use_amp/2,
can_use_compression/2,
can_use_stream_management/2,
session/2]).

%% New style connection initiation
-export([start_stream/3,
maybe_use_ssl/3,
maybe_use_carbons/3,
maybe_use_compression/3,
maybe_stream_management/3,
maybe_stream_resumption/3,
Expand All @@ -40,12 +42,12 @@
-type step() :: fun(?CONNECTION_STEP).
-export_type([step/0]).

-type step_state() :: {escalus_connection:client(),
-type step_state() :: {escalus_connection:transport(),
escalus_users:spec(),
features()}.
-export_type([step_state/0]).

-include_lib("exml/include/exml_stream.hrl").
-include_lib("exml/include/exml.hrl").
-define(DEFAULT_RESOURCE, <<"escalus-default-resource">>).

%%%===================================================================
Expand All @@ -58,17 +60,12 @@ start_stream(Conn, Props) ->
{server, _} -> <<"jabber:server">>;
_ -> <<"jabber:client">>
end,
Transport = proplists:get_value(transport, Props, tcp),
IsLegacy = proplists:get_value(wslegacy, Props, false),
StreamStartReq = case {Transport, IsLegacy} of
{ws, false} -> escalus_stanza:ws_open(Server);
_ -> escalus_stanza:stream_start(Server, XMLNS)
end,
StreamStartReq = escalus_stanza:stream_start(Server, XMLNS),
ok = escalus_connection:send(Conn, StreamStartReq),
StreamStartRep = escalus_connection:get_stanza(Conn, wait_for_stream),
assert_stream_start(StreamStartRep, Transport, IsLegacy),
%% FIXME: verify StreamStartRep
StreamFeatures = escalus_connection:get_stanza(Conn, wait_for_features),
assert_stream_features(StreamFeatures, Transport, IsLegacy),
%% FIXME: verify StreamFeatures
{Props, get_stream_features(StreamFeatures)}.

starttls(Conn, Props) ->
Expand Down Expand Up @@ -118,12 +115,22 @@ use_ssl(Props, Features) ->

-spec can_use_compression(escalus_users:spec(), features()) -> boolean().
can_use_compression(Props, Features) ->
false /= proplists:get_value(compression, Props, false) andalso
false /= proplists:get_value(compression, Features).
can_use(compression, Props, Features).

can_use_stream_management(Props, Features) ->
false /= proplists:get_value(stream_management, Props, false) andalso
false /= proplists:get_value(stream_management, Features).
can_use(stream_management, Props, Features).

can_use_carbons(Props, _Features) ->
false /= proplists:get_value(carbons, Props, false).

can_use_amp(Props, Features) ->
false /= proplists:get_value(advanced_message_processing, Features).

can_use(Feature, Props, Features) ->
false /= proplists:get_value(Feature, Props, false) andalso
false /= proplists:get_value(Feature, Features).



%%%===================================================================
%%% New style connection initiation
Expand All @@ -144,6 +151,22 @@ maybe_use_ssl(Conn, Props, Features) ->
{Conn, Props, Features}
end.

-spec maybe_use_carbons/3 :: ?CONNECTION_STEP.
maybe_use_carbons(Conn, Props, Features) ->
case can_use_carbons(Props, Features) of
true ->
use_carbons(Conn, Props, Features);
false ->
{Conn, Props, Features}
end.

-spec use_carbons/3 :: ?CONNECTION_STEP.
use_carbons(Conn, Props, Features) ->
escalus_connection:send(Conn, escalus_stanza:carbons_enable()),
Result = escalus_connection:get_stanza(Conn, carbon_iq_response),
escalus:assert(is_iq, [<<"result">>], Result),
{Conn, Props, Features}.

-spec maybe_use_compression/3 :: ?CONNECTION_STEP.
maybe_use_compression(Conn, Props, Features) ->
case can_use_compression(Props, Features) of
Expand Down Expand Up @@ -207,7 +230,9 @@ session(Conn, Props, Features) ->
get_stream_features(Features) ->
[{compression, get_compression(Features)},
{starttls, get_starttls(Features)},
{stream_management, get_stream_management(Features)}].
{stream_management, get_stream_management(Features)},
{advanced_message_processing, get_advanced_message_processing(Features)}
].

-spec get_compression(xmlterm()) -> boolean().
get_compression(Features) ->
Expand All @@ -219,46 +244,10 @@ get_compression(Features) ->

-spec get_starttls(xmlterm()) -> boolean().
get_starttls(Features) ->
case exml_query:subelement(Features, <<"starttls">>) of
undefined ->
false;
_ -> true
end.
undefined =/= exml_query:subelement(Features, <<"starttls">>).

get_stream_management(Features) ->
case exml_query:subelement(Features, <<"sm">>) of
undefined ->
false;
_ -> true
end.

assert_stream_start(StreamStartRep, Transport, IsLegacy) ->
case {StreamStartRep, Transport, IsLegacy} of
{#xmlel{name = <<"open">>}, ws, false} ->
ok;
{#xmlel{name = <<"open">>}, ws, true} ->
error("<open/> with legacy WebSocket",
[StreamStartRep]);
{#xmlstreamstart{}, ws, false} ->
error("<stream:stream> with non-legacy WebSocket",
[StreamStartRep]);
{#xmlstreamstart{}, _, _} ->
ok;
_ ->
error("Not a valid stream start", [StreamStartRep])
end.
undefined =/= exml_query:subelement(Features, <<"sm">>).

assert_stream_features(StreamFeatures, Transport, IsLegacy) ->
case {StreamFeatures, Transport, IsLegacy} of
{#xmlel{name = <<"features">>}, ws, false} ->
ok;
{#xmlel{name = <<"features">>}, ws, true} ->
error("<features> with legacy WebSocket");
{#xmlel{name = <<"stream:features">>}, ws, false} ->
error("<stream:features> with non-legacy WebSocket",
[StreamFeatures]);
{#xmlel{name = <<"stream:features">>}, _, _} ->
ok;
_ ->
error("Expected stream features", [StreamFeatures])
end.
get_advanced_message_processing(Features) ->
undefined =/= exml_query:subelement(Features, <<"amp">>).
Loading

0 comments on commit 1658f58

Please sign in to comment.