Skip to content

Commit

Permalink
Migrate from inside shapers to opuntia
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 7, 2023
1 parent f44b5b0 commit 243696a
Show file tree
Hide file tree
Showing 18 changed files with 101 additions and 326 deletions.
2 changes: 1 addition & 1 deletion big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ delete_users(Config) ->

increase_limits(Config) ->
Config1 = mongoose_helper:backup_and_set_config(Config, increased_limits()),
rpc_apply(shaper_srv, reset_all_shapers, [host_type()]),
rpc_apply(mongoose_shapers, reset_all_shapers, [host_type()]),
Config1.

increased_limits() ->
Expand Down
6 changes: 3 additions & 3 deletions include/mod_muc_room.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@

-record(activity, {message_time = 0,
presence_time = 0,
message_shaper :: shaper:shaper(),
presence_shaper :: shaper:shaper(),
message_shaper :: opuntia:shaper(),
presence_shaper :: opuntia:shaper(),
message,
presence
}).
Expand All @@ -85,7 +85,7 @@
subject_timestamp = <<>>,
just_created = false :: boolean(),
activity = treap:empty() :: treap:treap(),
room_shaper :: shaper:shaper(),
room_shaper :: opuntia:shaper(),
room_queue = queue:new(),
http_auth_pool = none :: none | mongoose_http_client:pool(),
http_auth_pids = [] :: [pid()],
Expand Down
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
{flatlog, "0.1.2"},

%%% Stateless libraries
{opuntia, "0.2.0"},
{fast_tls, "1.1.16"},
{fast_scram, "0.5.0"},
{idna, "6.1.1"},
Expand Down
3 changes: 3 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
{ref,"c71265c00bd7b465b7214770895503383dbe299e"}},
0},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.7.4">>},0},
{<<"opuntia">>,{pkg,<<"opuntia">>,<<"0.2.0">>},0},
{<<"p1_utils">>,{pkg,<<"p1_utils">>,<<"1.0.25">>},1},
{<<"pa">>,
{git,"https://github.com/erszcz/pa.git",
Expand Down Expand Up @@ -170,6 +171,7 @@
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"mysql">>, <<"0E4BBCF701B7D8EA5C3750F220F26323C0FC18B844960988BE196B33A8A9F3C1">>},
{<<"observer_cli">>, <<"3C1BFB6D91BF68F6A3D15F46AE20DA0F7740D363EE5BC041191CE8722A6C4FAE">>},
{<<"opuntia">>, <<"1F43D0DCB542FBBC44B4D62391EFCFC698C9EDC4E359E568A54227DACA403245">>},
{<<"p1_utils">>, <<"2D39B5015A567BBD2CC7033EEB93A7C60D8C84EFE1EF69A3473FAA07FA268187">>},
{<<"parse_trans">>, <<"BB87AC362A03CA674EBB7D9D498F45C03256ADED7214C9101F7035EF44B798C7">>},
{<<"pooler">>, <<"898CD1FA301FC42D4A8ED598CE139B71CA85B54C16AB161152B5CC5FBDCFA1A8">>},
Expand Down Expand Up @@ -233,6 +235,7 @@
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>},
{<<"mysql">>, <<"D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3">>},
{<<"observer_cli">>, <<"50DE6D95D814F447458BD5D72666A74624EDDB0EF98BDCEE61A0153AAE0865FF">>},
{<<"opuntia">>, <<"C8970AC270174AC3EBC4539ADDA64622CFCA6609BF498A486CCE5AA365F3E058">>},
{<<"p1_utils">>, <<"9219214428F2C6E5D3187FF8EB9A8783695C2427420BE9A259840E07ADA32847">>},
{<<"parse_trans">>, <<"F99E368830BEA44552224E37E04943A54874F08B8590485DE8D13832B63A2DC3">>},
{<<"pooler">>, <<"058D85C5081289B90E97E4DDDBC3BB5A3B4A19A728AB3BC88C689EFCC36A07C7">>},
Expand Down
6 changes: 3 additions & 3 deletions src/c2s/mongoose_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
jid :: undefined | jid:jid(),
socket :: undefined | mongoose_c2s_socket:socket(),
parser :: undefined | exml_stream:parser(),
shaper :: undefined | shaper:shaper(),
shaper :: undefined | opuntia:shaper(),
listener_opts :: undefined | listener_opts(),
state_mod = #{} :: #{module() => term()},
info = #{} :: info()
Expand Down Expand Up @@ -90,7 +90,7 @@ handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect,
StateData = #c2s_data{listener_opts = #{shaper := ShaperName,
max_stanza_size := MaxStanzaSize} = LOpts}) ->
{ok, Parser} = exml_stream:new_parser([{max_child_size, MaxStanzaSize}]),
Shaper = shaper:new(ShaperName),
Shaper = opuntia:new(mongoose_shapers:get_shaper_rate(ShaperName)),
C2SSocket = mongoose_c2s_socket:new(SocketModule, SocketOpts, LOpts),
StateData1 = StateData#c2s_data{socket = C2SSocket, parser = Parser, shaper = Shaper},
{next_state, {wait_for_stream, stream_start}, StateData1, state_timeout(LOpts)};
Expand Down Expand Up @@ -246,7 +246,7 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) ->

-spec handle_socket_elements(data(), [exml:element()], non_neg_integer()) -> fsm_res().
handle_socket_elements(StateData = #c2s_data{shaper = Shaper}, Elements, Size) ->
{NewShaper, Pause} = shaper:update(Shaper, Size),
{NewShaper, Pause} = opuntia:update(Shaper, Size),
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], Size),
NewStateData = StateData#c2s_data{shaper = NewShaper},
MaybePauseTimeout = maybe_pause(NewStateData, Pause),
Expand Down
4 changes: 2 additions & 2 deletions src/ejabberd_s2s_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

-record(state, {socket :: mongoose_transport:socket_data(),
streamid :: ejabberd_s2s:stream_id(),
shaper :: shaper:shaper(),
shaper :: opuntia:shaper(),
tls = false :: boolean(),
tls_enabled = false :: boolean(),
tls_required = false :: boolean(),
Expand All @@ -84,7 +84,7 @@
tls_enabled => boolean(),
tls_options => mongoose_tls:options(),
authenticated => boolean(),
shaper => shaper:shaper(),
shaper => opuntia:shaper(),
domains => [jid:lserver()]}.

-type statename() :: 'stream_established' | 'wait_for_feature_request'.
Expand Down
2 changes: 1 addition & 1 deletion src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ init([]) ->
SMBackendSupervisor = supervisor_spec(ejabberd_sm_backend_sup),
OutgoingPoolsSupervisor = supervisor_spec(mongoose_wpool_sup),
Listener = supervisor_spec(mongoose_listener_sup),
ShaperSup = supervisor_spec(mongoose_shaper_sup),
ShaperSup = mongoose_shapers:child_spec(),
DomainSup = supervisor_spec(mongoose_domain_sup),
ReceiverSupervisor =
ejabberd_tmp_sup_spec(mongoose_transport_sup, [mongoose_transport_sup, mongoose_transport]),
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%%%
%%% This module should be started for each host.
%%% Message archivation is not shaped here (use standard support for this).
%%% MAM's IQs are shaped inside {@link shaper_srv}.
%%% MAM's IQs are shaped inside {@link opuntia_srv}.
%%%
%%% Message identifiers (or UIDs in the spec) are generated based on:
%%%
Expand Down Expand Up @@ -225,7 +225,7 @@ room_process_mam_iq(Acc, From, To, IQ, #{host_type := HostType}) ->
case check_action_allowed(HostType, Acc, To#jid.lserver, Action, MucAction, From, To) of
ok ->
case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, MucAction, From) of
ok ->
continue ->
handle_error_iq(Acc, HostType, To, Action,
handle_mam_iq(HostType, Action, From, To, IQ));
{error, max_delay_reached} ->
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%%%
%%% This module should be started for each host.
%%% Message archivation is not shaped here (use standard support for this).
%%% MAM's IQs are shaped inside {@link shaper_srv}.
%%% MAM's IQs are shaped inside {@link opuntia_srv}.
%%%
%%% Message identifiers (or UIDs in the spec) are generated based on:
%%%
Expand Down Expand Up @@ -142,7 +142,7 @@ process_mam_iq(Acc, From, To, IQ, _Extra) ->
case is_action_allowed(HostType, Action, From, To) of
true ->
case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, Action, From) of
ok ->
continue ->
handle_error_iq(HostType, Acc, To, Action,
handle_mam_iq(Action, From, To, IQ, Acc));
{error, max_delay_reached} ->
Expand Down
14 changes: 8 additions & 6 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1022,13 +1022,15 @@ action_to_global_shaper_name(Action) ->
list_to_atom(atom_to_list(Action) ++ "_global_shaper").

-spec wait_shaper(mongooseim:host_type(), jid:server(), mam_iq:action(), jid:jid()) ->
'ok' | {'error', 'max_delay_reached'}.
continue | {error, max_delay_reached}.
wait_shaper(HostType, Host, Action, From) ->
case shaper_srv:wait(HostType, Host, action_to_shaper_name(Action), From, 1) of
ok ->
shaper_srv:wait(HostType, Host, action_to_global_shaper_name(Action), global, 1);
Err ->
Err
case mongoose_shapers:wait(

Check warning on line 1027 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L1027

Added line #L1027 was not covered by tests
HostType, Host, action_to_shaper_name(Action), From, 1) of
continue ->
mongoose_shapers:wait(

Check warning on line 1030 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L1030

Added line #L1030 was not covered by tests
global, Host, action_to_global_shaper_name(Action), From, 1);
{error, max_delay_reached} ->
{error, max_delay_reached}

Check warning on line 1033 in src/mam/mod_mam_utils.erl

View check run for this annotation

Codecov / codecov/patch

src/mam/mod_mam_utils.erl#L1033

Added line #L1033 was not covered by tests
end.

%% -----------------------------------------------------------------------
Expand Down
58 changes: 0 additions & 58 deletions src/mongoose_shaper_sup.erl

This file was deleted.

54 changes: 54 additions & 0 deletions src/mongoose_shapers.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
-module(mongoose_shapers).

-export([child_spec/0, get_shaper_rate/1, wait/5, reset_all_shapers/1]).
-ignore_xref([reset_all_shapers/1]).

-spec child_spec() -> supervisor:child_spec().
child_spec() ->
WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {mongoose_shapers, #{}}}}],
{mongoose_shapers,
{wpool, start_pool, [mongoose_shapers, WPoolOpts]},
permanent, infinity, supervisor, [opuntia_srv]}.

-spec get_shaper_rate(atom()) -> number().
get_shaper_rate(Name) ->
case mongoose_config:lookup_opt([shaper, Name]) of
{ok, #{max_rate := MaxRatePerSecond}} ->
MaxRatePerSecond / 1000;
{error, not_found} -> 0
end.

%% @doc Shapes the caller from executing the action.
-spec wait(HostType :: mongooseim:host_type_or_global(),
Domain :: jid:server(),
Action :: atom(),
FromJID :: jid:jid(),
Size :: integer()) -> continue | {error, max_delay_reached}.
wait(HostType, Domain, Action, FromJID, Size) ->
Worker = wpool_pool:hash_worker(mongoose_shapers, FromJID),
Config = get_shaper_rate(get_shaper_name(HostType, Domain, Action, FromJID)),
Key = new_key(Domain, Action, FromJID),
opuntia_srv:wait(Worker, Key, Size, Config).

Check warning on line 31 in src/mongoose_shapers.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_shapers.erl#L28-L31

Added lines #L28 - L31 were not covered by tests

-type key() :: {global | jid:server(), atom(), jid:jid()}.
-spec new_key(jid:server() | global, atom(), jid:jid()) -> key().
new_key(Domain, Action, FromJID) ->
{Domain, Action, FromJID}.

Check warning on line 36 in src/mongoose_shapers.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_shapers.erl#L36

Added line #L36 was not covered by tests

%% @doc Ask all shaper servers to forget current shapers and read settings again
reset_all_shapers(_HostType) ->
[ opuntia_srv:reset_shapers(ProcName)
|| ProcName <- wpool:get_workers(mongoose_shapers)].

Check warning on line 41 in src/mongoose_shapers.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_shapers.erl#L40-L41

Added lines #L40 - L41 were not covered by tests

-spec get_shaper_name(HostType :: mongooseim:host_type_or_global(),
Domain :: global | jid:server(),
Action :: atom(),
FromJID :: jid:jid()) -> allow | none.
get_shaper_name(HostType, Domain, Action, FromJID) ->
case acl:match_rule(HostType, Domain, Action, FromJID) of
deny -> default_shaper();
Value -> Value

Check warning on line 50 in src/mongoose_shapers.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_shapers.erl#L48-L50

Added lines #L48 - L50 were not covered by tests
end.

default_shaper() ->
none.

Check warning on line 54 in src/mongoose_shapers.erl

View check run for this annotation

Codecov / codecov/patch

src/mongoose_shapers.erl#L54

Added line #L54 was not covered by tests
8 changes: 4 additions & 4 deletions src/mongoose_transport.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

-record(state, {socket :: socket(),
sockmod = gen_tcp :: socket_module(),
shaper_state :: shaper:shaper(),
shaper_state :: opuntia:shaper(),
dest_pid :: undefined | pid(), %% gen_fsm_compat pid
max_stanza_size :: stanza_size(),
parser :: exml_stream:parser(),
Expand Down Expand Up @@ -215,7 +215,7 @@ start_link(Socket, Shaper, Opts) ->
gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []).

init([Socket, Shaper, Opts]) ->
ShaperState = shaper:new(Shaper),
ShaperState = opuntia:new(mongoose_shapers:get_shaper_rate(Shaper)),
#{max_stanza_size := MaxStanzaSize,
hibernate_after := HibernateAfter,
connection_type := ConnectionType} = Opts,
Expand Down Expand Up @@ -267,7 +267,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State, hibernate_or_timeout(State)}.

handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = shaper:new(Shaper),
NewShaperState = opuntia:new(mongoose_shapers:get_shaper_rate(Shaper)),
NewState = State#state{shaper_state = NewShaperState},
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_cast(close, State) ->
Expand Down Expand Up @@ -399,7 +399,7 @@ process_data(Data, #state{parser = Parser,
{error, Reason} ->
{[{xmlstreamerror, Reason}], Parser}
end,
{NewShaperState, Pause} = shaper:update(ShaperState, Size),
{NewShaperState, Pause} = opuntia:update(ShaperState, Size),
update_transport_metrics(Size, received, State#state.connection_type),
[gen_fsm_compat:send_event(DestPid, Event) || Event <- Events],
maybe_pause(Pause, State),
Expand Down
3 changes: 2 additions & 1 deletion src/mongooseim.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
cowboy_swagger,
tomerl,
flatlog,
segmented_cache
segmented_cache,
opuntia
]},
{included_applications, [mnesia, cets]},
{env, []},
Expand Down
4 changes: 2 additions & 2 deletions src/muc/mod_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
access,
history_size :: integer(),
default_room_opts :: list(),
room_shaper :: shaper:shaper(),
room_shaper :: opuntia:shaper(),
http_auth_pool :: mongoose_http_client:pool(),
hibernated_room_check_interval :: timeout(),
hibernated_room_timeout :: timeout() }).
Expand Down Expand Up @@ -873,7 +873,7 @@ check_user_can_create_room(HostType, ServerHost, AccessCreate, From, RoomID) ->

-spec start_room(HostType :: host_type(), ServerHost :: jid:lserver(),
MucHost :: muc_host(), Access :: access(), room(),
HistorySize :: undefined | integer(), RoomShaper :: shaper:shaper(),
HistorySize :: undefined | integer(), RoomShaper :: opuntia:shaper(),
HttpAuthPool :: none | mongoose_http_client:pool(), From :: jid:jid(), nick(),
DefRoomOpts :: undefined | [any()], Acc :: mongoose_acc:t())
-> {error, {failed_to_restore, Reason :: term()}} | {ok, pid()}.
Expand Down
Loading

0 comments on commit 243696a

Please sign in to comment.