Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from inside shapers to opuntia #4187

Merged
merged 3 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ delete_users(Config) ->

increase_limits(Config) ->
Config1 = mongoose_helper:backup_and_set_config(Config, increased_limits()),
rpc_apply(shaper_srv, reset_all_shapers, [host_type()]),
rpc_apply(mongoose_shaper, reset_all_shapers, [host_type()]),
Config1.

increased_limits() ->
Expand Down
6 changes: 3 additions & 3 deletions include/mod_muc_room.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@

-record(activity, {message_time = 0,
presence_time = 0,
message_shaper :: shaper:shaper(),
presence_shaper :: shaper:shaper(),
message_shaper :: mongoose_shaper:shaper(),
presence_shaper :: mongoose_shaper:shaper(),
message,
presence
}).
Expand All @@ -85,7 +85,7 @@
subject_timestamp = <<>>,
just_created = false :: boolean(),
activity = treap:empty() :: treap:treap(),
room_shaper :: shaper:shaper(),
room_shaper :: mongoose_shaper:shaper(),
room_queue = queue:new(),
http_auth_pool = none :: none | mongoose_http_client:pool(),
http_auth_pids = [] :: [pid()],
Expand Down
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
{flatlog, "0.1.2"},

%%% Stateless libraries
{opuntia, "0.3.0"},
{fast_tls, "1.1.16"},
{fast_scram, "0.5.0"},
{idna, "6.1.1"},
Expand Down
3 changes: 3 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
{ref,"c71265c00bd7b465b7214770895503383dbe299e"}},
0},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.7.4">>},0},
{<<"opuntia">>,{pkg,<<"opuntia">>,<<"0.3.0">>},0},
{<<"p1_utils">>,{pkg,<<"p1_utils">>,<<"1.0.25">>},1},
{<<"pa">>,
{git,"https://github.com/erszcz/pa.git",
Expand Down Expand Up @@ -170,6 +171,7 @@
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"mysql">>, <<"0E4BBCF701B7D8EA5C3750F220F26323C0FC18B844960988BE196B33A8A9F3C1">>},
{<<"observer_cli">>, <<"3C1BFB6D91BF68F6A3D15F46AE20DA0F7740D363EE5BC041191CE8722A6C4FAE">>},
{<<"opuntia">>, <<"FB365A48C48CEE76C76407FB0B0E13065E6CB23F4918F7C6B084499FAEB1CCDF">>},
{<<"p1_utils">>, <<"2D39B5015A567BBD2CC7033EEB93A7C60D8C84EFE1EF69A3473FAA07FA268187">>},
{<<"parse_trans">>, <<"BB87AC362A03CA674EBB7D9D498F45C03256ADED7214C9101F7035EF44B798C7">>},
{<<"pooler">>, <<"898CD1FA301FC42D4A8ED598CE139B71CA85B54C16AB161152B5CC5FBDCFA1A8">>},
Expand Down Expand Up @@ -233,6 +235,7 @@
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>},
{<<"mysql">>, <<"D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3">>},
{<<"observer_cli">>, <<"50DE6D95D814F447458BD5D72666A74624EDDB0EF98BDCEE61A0153AAE0865FF">>},
{<<"opuntia">>, <<"AB49DB4DA96D30116C85A6D28AC4D6C4BE9C91799D7E2452C37F92C477B9BB88">>},
{<<"p1_utils">>, <<"9219214428F2C6E5D3187FF8EB9A8783695C2427420BE9A259840E07ADA32847">>},
{<<"parse_trans">>, <<"F99E368830BEA44552224E37E04943A54874F08B8590485DE8D13832B63A2DC3">>},
{<<"pooler">>, <<"058D85C5081289B90E97E4DDDBC3BB5A3B4A19A728AB3BC88C689EFCC36A07C7">>},
Expand Down
6 changes: 3 additions & 3 deletions src/c2s/mongoose_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
jid :: undefined | jid:jid(),
socket :: undefined | mongoose_c2s_socket:socket(),
parser :: undefined | exml_stream:parser(),
shaper :: undefined | shaper:shaper(),
shaper :: undefined | mongoose_shaper:shaper(),
listener_opts :: undefined | listener_opts(),
state_mod = #{} :: #{module() => term()},
info = #{} :: info()
Expand Down Expand Up @@ -90,7 +90,7 @@ handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect,
StateData = #c2s_data{listener_opts = #{shaper := ShaperName,
max_stanza_size := MaxStanzaSize} = LOpts}) ->
{ok, Parser} = exml_stream:new_parser([{max_child_size, MaxStanzaSize}]),
Shaper = shaper:new(ShaperName),
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved
Shaper = mongoose_shaper:new(ShaperName),
C2SSocket = mongoose_c2s_socket:new(SocketModule, SocketOpts, LOpts),
StateData1 = StateData#c2s_data{socket = C2SSocket, parser = Parser, shaper = Shaper},
{next_state, {wait_for_stream, stream_start}, StateData1, state_timeout(LOpts)};
Expand Down Expand Up @@ -246,7 +246,7 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) ->

-spec handle_socket_elements(data(), [exml:element()], non_neg_integer()) -> fsm_res().
handle_socket_elements(StateData = #c2s_data{shaper = Shaper}, Elements, Size) ->
{NewShaper, Pause} = shaper:update(Shaper, Size),
{NewShaper, Pause} = mongoose_shaper:update(Shaper, Size),
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], Size),
NewStateData = StateData#c2s_data{shaper = NewShaper},
MaybePauseTimeout = maybe_pause(NewStateData, Pause),
Expand Down
4 changes: 2 additions & 2 deletions src/ejabberd_s2s_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

-record(state, {socket :: mongoose_transport:socket_data(),
streamid :: ejabberd_s2s:stream_id(),
shaper :: shaper:shaper(),
shaper :: mongoose_shaper:shaper(),
tls = false :: boolean(),
tls_enabled = false :: boolean(),
tls_required = false :: boolean(),
Expand All @@ -84,7 +84,7 @@
tls_enabled => boolean(),
tls_options => mongoose_tls:options(),
authenticated => boolean(),
shaper => shaper:shaper(),
shaper => mongoose_shaper:shaper(),
domains => [jid:lserver()]}.

-type statename() :: 'stream_established' | 'wait_for_feature_request'.
Expand Down
2 changes: 1 addition & 1 deletion src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ init([]) ->
SMBackendSupervisor = supervisor_spec(ejabberd_sm_backend_sup),
OutgoingPoolsSupervisor = supervisor_spec(mongoose_wpool_sup),
Listener = supervisor_spec(mongoose_listener_sup),
ShaperSup = supervisor_spec(mongoose_shaper_sup),
ShaperSup = mongoose_shaper:child_spec(),
DomainSup = supervisor_spec(mongoose_domain_sup),
ReceiverSupervisor =
ejabberd_tmp_sup_spec(mongoose_transport_sup, [mongoose_transport_sup, mongoose_transport]),
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%%%
%%% This module should be started for each host.
%%% Message archivation is not shaped here (use standard support for this).
%%% MAM's IQs are shaped inside {@link shaper_srv}.
%%% MAM's IQs are shaped inside {@link opuntia_srv}.
%%%
%%% Message identifiers (or UIDs in the spec) are generated based on:
%%%
Expand Down Expand Up @@ -225,7 +225,7 @@ room_process_mam_iq(Acc, From, To, IQ, #{host_type := HostType}) ->
case check_action_allowed(HostType, Acc, To#jid.lserver, Action, MucAction, From, To) of
ok ->
case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, MucAction, From) of
ok ->
continue ->
handle_error_iq(Acc, HostType, To, Action,
handle_mam_iq(HostType, Action, From, To, IQ));
{error, max_delay_reached} ->
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%%%
%%% This module should be started for each host.
%%% Message archivation is not shaped here (use standard support for this).
%%% MAM's IQs are shaped inside {@link shaper_srv}.
%%% MAM's IQs are shaped inside {@link opuntia_srv}.
%%%
%%% Message identifiers (or UIDs in the spec) are generated based on:
%%%
Expand Down Expand Up @@ -142,7 +142,7 @@ process_mam_iq(Acc, From, To, IQ, _Extra) ->
case is_action_allowed(HostType, Action, From, To) of
true ->
case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, Action, From) of
ok ->
continue ->
handle_error_iq(HostType, Acc, To, Action,
handle_mam_iq(Action, From, To, IQ, Acc));
{error, max_delay_reached} ->
Expand Down
14 changes: 8 additions & 6 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1022,13 +1022,15 @@
list_to_atom(atom_to_list(Action) ++ "_global_shaper").

-spec wait_shaper(mongooseim:host_type(), jid:server(), mam_iq:action(), jid:jid()) ->
'ok' | {'error', 'max_delay_reached'}.
continue | {error, max_delay_reached}.
wait_shaper(HostType, Host, Action, From) ->
case shaper_srv:wait(HostType, Host, action_to_shaper_name(Action), From, 1) of
ok ->
shaper_srv:wait(HostType, Host, action_to_global_shaper_name(Action), global, 1);
Err ->
Err
case mongoose_shaper:wait(
HostType, Host, action_to_shaper_name(Action), From, 1) of
continue ->
mongoose_shaper:wait(
global, Host, action_to_global_shaper_name(Action), From, 1);
chrzaszcz marked this conversation as resolved.
Show resolved Hide resolved
{error, max_delay_reached} ->
{error, max_delay_reached}

Check warning on line 1033 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L1033

Added line #L1033 was not covered by tests
end.

%% -----------------------------------------------------------------------
Expand Down
70 changes: 70 additions & 0 deletions src/mongoose_shaper.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-module(mongoose_shaper).

-export([child_spec/0]).
-export([new/1, update/2, wait/5, reset_all_shapers/1]).
-ignore_xref([reset_all_shapers/1]).

-type shaper() :: opuntia:shaper().
-export_type([shaper/0]).

-spec child_spec() -> supervisor:child_spec().
child_spec() ->
WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {?MODULE, #{}}}}],
#{id => ?MODULE,
start => {wpool, start_pool, [?MODULE, WPoolOpts]},
restart => permanent,
shutdown => infinity,
type => supervisor}.

-spec new(atom()) -> opuntia:shaper().
new(Name) ->
opuntia:new(get_shaper_config(Name)).

-spec update(opuntia:shaper(), opuntia:tokens()) -> {opuntia:shaper(), opuntia:delay()}.
update(Shaper, Tokens) ->
opuntia:update(Shaper, Tokens).

%% @doc Shapes the caller from executing the action.
-spec wait(HostType :: mongooseim:host_type_or_global(),
Domain :: jid:lserver(),
Action :: atom(),
FromJID :: jid:jid(),
Size :: integer()) -> continue | {error, max_delay_reached}.
wait(HostType, Domain, Action, FromJID, Size) ->
Worker = wpool_pool:hash_worker(?MODULE, FromJID),
Config = get_shaper_config(get_shaper_name(HostType, Domain, Action, FromJID)),
Key = new_key(Domain, Action, FromJID),
opuntia_srv:wait(Worker, Key, Size, Config).

-type key() :: {global | jid:server(), atom(), jid:jid()}.
-spec new_key(jid:server() | global, atom(), jid:jid()) -> key().
new_key(Domain, Action, FromJID) ->
{Domain, Action, FromJID}.

%% @doc Ask all shaper servers to forget current shapers and read settings again
reset_all_shapers(_HostType) ->
[ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(?MODULE) ],
ok.

-spec get_shaper_name(HostType :: mongooseim:host_type_or_global(),
Domain :: global | jid:server(),
Action :: atom(),
FromJID :: jid:jid()) -> allow | none.
get_shaper_name(HostType, Domain, Action, FromJID) ->
case acl:match_rule(HostType, Domain, Action, FromJID) of
deny -> default_shaper();
Value -> Value
end.

-spec get_shaper_config(atom()) -> number().
get_shaper_config(Name) ->
case mongoose_config:lookup_opt([shaper, Name]) of
{ok, #{max_rate := MaxRatePerSecond}} ->
Rate = MaxRatePerSecond div 1000,
#{bucket_size => MaxRatePerSecond, rate => Rate, start_full => true};
{error, not_found} ->
0
end.

default_shaper() ->
none.
58 changes: 0 additions & 58 deletions src/mongoose_shaper_sup.erl

This file was deleted.

18 changes: 9 additions & 9 deletions src/mongoose_transport.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

-record(state, {socket :: socket(),
sockmod = gen_tcp :: socket_module(),
shaper_state :: shaper:shaper(),
shaper_state :: mongoose_shaper:shaper(),
dest_pid :: undefined | pid(), %% gen_fsm_compat pid
max_stanza_size :: stanza_size(),
parser :: exml_stream:parser(),
Expand Down Expand Up @@ -164,7 +164,7 @@ send_text(SocketData, Data) ->
#socket_data{sockmod = SockMod, socket = Socket,
connection_type = ConnectionType} = SocketData,
case catch SockMod:send(Socket, Data) of
ok ->
ok ->
update_transport_metrics(byte_size(Data), sent, ConnectionType),
ok;
{error, timeout} ->
Expand Down Expand Up @@ -215,7 +215,7 @@ start_link(Socket, Shaper, Opts) ->
gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []).

init([Socket, Shaper, Opts]) ->
ShaperState = shaper:new(Shaper),
ShaperState = mongoose_shaper:new(Shaper),
#{max_stanza_size := MaxStanzaSize,
hibernate_after := HibernateAfter,
connection_type := ConnectionType} = Opts,
Expand Down Expand Up @@ -267,7 +267,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State, hibernate_or_timeout(State)}.

handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = shaper:new(Shaper),
NewShaperState = mongoose_shaper:new(Shaper),
NewState = State#state{shaper_state = NewShaperState},
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_cast(close, State) ->
Expand All @@ -277,7 +277,7 @@ handle_cast(_Msg, State) ->

handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) ->
NewState = process_data(Data, State),
{noreply, NewState, hibernate_or_timeout(NewState)};
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_info({Tag, _TCPSocket, Data},
#state{socket = Socket,
sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl ->
Expand Down Expand Up @@ -394,21 +394,21 @@ process_data(Data, #state{parser = Parser,
Size = byte_size(Data),
{Events, NewParser} =
case exml_stream:parse(Parser, Data) of
{ok, NParser, Elems} ->
{ok, NParser, Elems} ->
{[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser};
{error, Reason} ->
{[{xmlstreamerror, Reason}], Parser}
end,
{NewShaperState, Pause} = shaper:update(ShaperState, Size),
{NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size),
update_transport_metrics(Size, received, State#state.connection_type),
[gen_fsm_compat:send_event(DestPid, Event) || Event <- Events],
maybe_pause(Pause, State),
State#state{parser = NewParser, shaper_state = NewShaperState}.

wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
{xmlstreamelement, E};
wrap_xml_elements_and_update_metrics(E) ->
wrap_xml_elements_and_update_metrics(E) ->
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
E.

Expand Down
3 changes: 2 additions & 1 deletion src/mongooseim.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
cowboy_swagger,
tomerl,
flatlog,
segmented_cache
segmented_cache,
opuntia
NelsonVides marked this conversation as resolved.
Show resolved Hide resolved
]},
{included_applications, [mnesia, cets]},
{env, []},
Expand Down
Loading