Skip to content

Commit

Permalink
Apply reviews and isolate shaping into the mongoose_shaper module
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 15, 2023
1 parent ccf0496 commit cff2834
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 51 deletions.
6 changes: 3 additions & 3 deletions include/mod_muc_room.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@

-record(activity, {message_time = 0,
presence_time = 0,
message_shaper :: opuntia:shaper(),
presence_shaper :: opuntia:shaper(),
message_shaper :: mongoose_shaper:shaper(),
presence_shaper :: mongoose_shaper:shaper(),
message,
presence
}).
Expand All @@ -85,7 +85,7 @@
subject_timestamp = <<>>,
just_created = false :: boolean(),
activity = treap:empty() :: treap:treap(),
room_shaper :: opuntia:shaper(),
room_shaper :: mongoose_shaper:shaper(),
room_queue = queue:new(),
http_auth_pool = none :: none | mongoose_http_client:pool(),
http_auth_pids = [] :: [pid()],
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
{flatlog, "0.1.2"},

%%% Stateless libraries
{opuntia, "0.2.1"},
{opuntia, {git, "https://github.com/NelsonVides/opuntia", {branch, "improvements"}}},
{fast_tls, "1.1.16"},
{fast_scram, "0.5.0"},
{idna, "6.1.1"},
Expand Down
7 changes: 4 additions & 3 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@
{ref,"c71265c00bd7b465b7214770895503383dbe299e"}},
0},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.7.4">>},0},
{<<"opuntia">>,{pkg,<<"opuntia">>,<<"0.2.1">>},0},
{<<"opuntia">>,
{git,"https://github.com/NelsonVides/opuntia",
{ref,"accb6f8846460235138742bc4943bc1be027616c"}},
0},
{<<"p1_utils">>,{pkg,<<"p1_utils">>,<<"1.0.25">>},1},
{<<"pa">>,
{git,"https://github.com/erszcz/pa.git",
Expand Down Expand Up @@ -171,7 +174,6 @@
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"mysql">>, <<"0E4BBCF701B7D8EA5C3750F220F26323C0FC18B844960988BE196B33A8A9F3C1">>},
{<<"observer_cli">>, <<"3C1BFB6D91BF68F6A3D15F46AE20DA0F7740D363EE5BC041191CE8722A6C4FAE">>},
{<<"opuntia">>, <<"7F86F2581FE97A43B1C6CAF4B839EC42D35C09DC64B3F1CCC11FDFD37340809E">>},
{<<"p1_utils">>, <<"2D39B5015A567BBD2CC7033EEB93A7C60D8C84EFE1EF69A3473FAA07FA268187">>},
{<<"parse_trans">>, <<"BB87AC362A03CA674EBB7D9D498F45C03256ADED7214C9101F7035EF44B798C7">>},
{<<"pooler">>, <<"898CD1FA301FC42D4A8ED598CE139B71CA85B54C16AB161152B5CC5FBDCFA1A8">>},
Expand Down Expand Up @@ -235,7 +237,6 @@
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>},
{<<"mysql">>, <<"D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3">>},
{<<"observer_cli">>, <<"50DE6D95D814F447458BD5D72666A74624EDDB0EF98BDCEE61A0153AAE0865FF">>},
{<<"opuntia">>, <<"CC73528A67918836BD9C7440A6A3CB452F2B6DCE7A5A7696843192B9C4A0ED09">>},
{<<"p1_utils">>, <<"9219214428F2C6E5D3187FF8EB9A8783695C2427420BE9A259840E07ADA32847">>},
{<<"parse_trans">>, <<"F99E368830BEA44552224E37E04943A54874F08B8590485DE8D13832B63A2DC3">>},
{<<"pooler">>, <<"058D85C5081289B90E97E4DDDBC3BB5A3B4A19A728AB3BC88C689EFCC36A07C7">>},
Expand Down
6 changes: 3 additions & 3 deletions src/c2s/mongoose_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
jid :: undefined | jid:jid(),
socket :: undefined | mongoose_c2s_socket:socket(),
parser :: undefined | exml_stream:parser(),
shaper :: undefined | opuntia:shaper(),
shaper :: undefined | mongoose_shaper:shaper(),
listener_opts :: undefined | listener_opts(),
state_mod = #{} :: #{module() => term()},
info = #{} :: info()
Expand Down Expand Up @@ -90,7 +90,7 @@ handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect,
StateData = #c2s_data{listener_opts = #{shaper := ShaperName,
max_stanza_size := MaxStanzaSize} = LOpts}) ->
{ok, Parser} = exml_stream:new_parser([{max_child_size, MaxStanzaSize}]),
Shaper = opuntia:new(mongoose_shaper:get_shaper_rate(ShaperName)),
Shaper = mongoose_shaper:new(ShaperName),
C2SSocket = mongoose_c2s_socket:new(SocketModule, SocketOpts, LOpts),
StateData1 = StateData#c2s_data{socket = C2SSocket, parser = Parser, shaper = Shaper},
{next_state, {wait_for_stream, stream_start}, StateData1, state_timeout(LOpts)};
Expand Down Expand Up @@ -246,7 +246,7 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) ->

-spec handle_socket_elements(data(), [exml:element()], non_neg_integer()) -> fsm_res().
handle_socket_elements(StateData = #c2s_data{shaper = Shaper}, Elements, Size) ->
{NewShaper, Pause} = opuntia:update(Shaper, Size),
{NewShaper, Pause} = mongoose_shaper:update(Shaper, Size),
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], Size),
NewStateData = StateData#c2s_data{shaper = NewShaper},
MaybePauseTimeout = maybe_pause(NewStateData, Pause),
Expand Down
4 changes: 2 additions & 2 deletions src/ejabberd_s2s_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

-record(state, {socket :: mongoose_transport:socket_data(),
streamid :: ejabberd_s2s:stream_id(),
shaper :: opuntia:shaper(),
shaper :: mongoose_shaper:shaper(),
tls = false :: boolean(),
tls_enabled = false :: boolean(),
tls_required = false :: boolean(),
Expand All @@ -84,7 +84,7 @@
tls_enabled => boolean(),
tls_options => mongoose_tls:options(),
authenticated => boolean(),
shaper => opuntia:shaper(),
shaper => mongoose_shaper:shaper(),
domains => [jid:lserver()]}.

-type statename() :: 'stream_established' | 'wait_for_feature_request'.
Expand Down
46 changes: 31 additions & 15 deletions src/mongoose_shaper.erl
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
-module(mongoose_shaper).

-export([child_spec/0, get_shaper_rate/1, wait/5, reset_all_shapers/1]).
-export([child_spec/0]).
-export([new/1, update/2, wait/5, reset_all_shapers/1]).
-ignore_xref([reset_all_shapers/1]).

-type shaper() :: opuntia:shaper().
-export_type([shaper/0]).

-spec child_spec() -> supervisor:child_spec().
child_spec() ->
WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {mongoose_shaper, #{}}}}],
{mongoose_shaper,
{wpool, start_pool, [mongoose_shaper, WPoolOpts]},
permanent, infinity, supervisor, [opuntia_srv]}.
WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {?MODULE, #{}}}}],
#{id => ?MODULE,
start => {wpool, start_pool, [?MODULE, WPoolOpts]},
restart => permanent,
shutdown => infinity,
type => supervisor}.

-spec get_shaper_rate(atom()) -> number().
get_shaper_rate(Name) ->
case mongoose_config:lookup_opt([shaper, Name]) of
{ok, #{max_rate := MaxRatePerSecond}} ->
MaxRatePerSecond / 1000;
{error, not_found} -> 0
end.
-spec new(atom()) -> opuntia:shaper().
new(Name) ->
opuntia:new(get_shaper_config(Name)).

-spec update(opuntia:shaper(), opuntia:tokens()) -> {opuntia:shaper(), opuntia:delay()}.
update(Shaper, Tokens) ->
opuntia:update(Shaper, Tokens).

%% @doc Shapes the caller from executing the action.
-spec wait(HostType :: mongooseim:host_type_or_global(),
Expand All @@ -25,8 +31,8 @@ get_shaper_rate(Name) ->
FromJID :: jid:jid(),
Size :: integer()) -> continue | {error, max_delay_reached}.
wait(HostType, Domain, Action, FromJID, Size) ->
Worker = wpool_pool:hash_worker(mongoose_shaper, FromJID),
Config = get_shaper_rate(get_shaper_name(HostType, Domain, Action, FromJID)),
Worker = wpool_pool:hash_worker(?MODULE, FromJID),
Config = get_shaper_config(get_shaper_name(HostType, Domain, Action, FromJID)),
Key = new_key(Domain, Action, FromJID),
opuntia_srv:wait(Worker, Key, Size, Config).

Expand All @@ -37,7 +43,7 @@ new_key(Domain, Action, FromJID) ->

%% @doc Ask all shaper servers to forget current shapers and read settings again
reset_all_shapers(_HostType) ->
[ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(mongoose_shaper) ],
[ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(?MODULE) ],
ok.

-spec get_shaper_name(HostType :: mongooseim:host_type_or_global(),
Expand All @@ -50,5 +56,15 @@ get_shaper_name(HostType, Domain, Action, FromJID) ->
Value -> Value
end.

-spec get_shaper_config(atom()) -> number().
get_shaper_config(Name) ->
case mongoose_config:lookup_opt([shaper, Name]) of
{ok, #{max_rate := MaxRatePerSecond}} ->
Rate = MaxRatePerSecond div 1000,
{MaxRatePerSecond, Rate, millisecond};
{error, not_found} ->
0
end.

default_shaper() ->
none.
18 changes: 9 additions & 9 deletions src/mongoose_transport.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

-record(state, {socket :: socket(),
sockmod = gen_tcp :: socket_module(),
shaper_state :: opuntia:shaper(),
shaper_state :: mongoose_shaper:shaper(),
dest_pid :: undefined | pid(), %% gen_fsm_compat pid
max_stanza_size :: stanza_size(),
parser :: exml_stream:parser(),
Expand Down Expand Up @@ -164,7 +164,7 @@ send_text(SocketData, Data) ->
#socket_data{sockmod = SockMod, socket = Socket,
connection_type = ConnectionType} = SocketData,
case catch SockMod:send(Socket, Data) of
ok ->
ok ->
update_transport_metrics(byte_size(Data), sent, ConnectionType),
ok;
{error, timeout} ->
Expand Down Expand Up @@ -215,7 +215,7 @@ start_link(Socket, Shaper, Opts) ->
gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []).

init([Socket, Shaper, Opts]) ->
ShaperState = opuntia:new(mongoose_shaper:get_shaper_rate(Shaper)),
ShaperState = mongoose_shaper:new(Shaper),
#{max_stanza_size := MaxStanzaSize,
hibernate_after := HibernateAfter,
connection_type := ConnectionType} = Opts,
Expand Down Expand Up @@ -267,7 +267,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State, hibernate_or_timeout(State)}.

handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = opuntia:new(mongoose_shaper:get_shaper_rate(Shaper)),
NewShaperState = mongoose_shaper:new(Shaper),
NewState = State#state{shaper_state = NewShaperState},
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_cast(close, State) ->
Expand All @@ -277,7 +277,7 @@ handle_cast(_Msg, State) ->

handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) ->
NewState = process_data(Data, State),
{noreply, NewState, hibernate_or_timeout(NewState)};
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_info({Tag, _TCPSocket, Data},
#state{socket = Socket,
sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl ->
Expand Down Expand Up @@ -394,21 +394,21 @@ process_data(Data, #state{parser = Parser,
Size = byte_size(Data),
{Events, NewParser} =
case exml_stream:parse(Parser, Data) of
{ok, NParser, Elems} ->
{ok, NParser, Elems} ->
{[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser};
{error, Reason} ->
{[{xmlstreamerror, Reason}], Parser}
end,
{NewShaperState, Pause} = opuntia:update(ShaperState, Size),
{NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size),
update_transport_metrics(Size, received, State#state.connection_type),
[gen_fsm_compat:send_event(DestPid, Event) || Event <- Events],
maybe_pause(Pause, State),
State#state{parser = NewParser, shaper_state = NewShaperState}.

wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
{xmlstreamelement, E};
wrap_xml_elements_and_update_metrics(E) ->
wrap_xml_elements_and_update_metrics(E) ->
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
E.

Expand Down
4 changes: 2 additions & 2 deletions src/muc/mod_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
access,
history_size :: integer(),
default_room_opts :: list(),
room_shaper :: opuntia:shaper(),
room_shaper :: mongoose_shaper:shaper(),
http_auth_pool :: mongoose_http_client:pool(),
hibernated_room_check_interval :: timeout(),
hibernated_room_timeout :: timeout() }).
Expand Down Expand Up @@ -873,7 +873,7 @@ check_user_can_create_room(HostType, ServerHost, AccessCreate, From, RoomID) ->

-spec start_room(HostType :: host_type(), ServerHost :: jid:lserver(),
MucHost :: muc_host(), Access :: access(), room(),
HistorySize :: undefined | integer(), RoomShaper :: opuntia:shaper(),
HistorySize :: undefined | integer(), RoomShaper :: mongoose_shaper:shaper(),
HttpAuthPool :: none | mongoose_http_client:pool(), From :: jid:jid(), nick(),
DefRoomOpts :: undefined | [any()], Acc :: mongoose_acc:t())
-> {error, {failed_to_restore, Reason :: term()}} | {ok, pid()}.
Expand Down
26 changes: 13 additions & 13 deletions src/muc/mod_muc_room.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@

-spec start_new(HostType :: mongooseim:host_type(), Host :: jid:lserver(), ServerHost :: jid:lserver(),
Access :: _, Room :: mod_muc:room(), HistorySize :: integer(),
RoomShaper :: opuntia:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(),
RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(),
Creator :: jid:jid(), Nick :: mod_muc:nick(),
DefRoomOpts :: list()) -> {ok, pid()}.
start_new(HostType, Host, ServerHost, Access, Room,
Expand All @@ -171,7 +171,7 @@ start_new(HostType, Host, ServerHost, Access, Room,

-spec start_restored(HostType :: mongooseim:host_type(), Host :: jid:lserver(), ServerHost :: jid:lserver(),
Access :: _, Room :: mod_muc:room(), HistorySize :: integer(),
RoomShaper :: opuntia:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(),
RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(),
Opts :: list()) -> {ok, pid()}.
start_restored(HostType, Host, ServerHost, Access, Room,
HistorySize, RoomShaper, HttpAuthPool, Opts)
Expand Down Expand Up @@ -305,7 +305,7 @@ init_new(#{init_type := start_new, host_type := HostType, muc_host := Host,
http_auth_pool := HttpAuthPool, creator := Creator, nick := _Nick,
def_opts := DefRoomOpts}) when is_list(DefRoomOpts) ->
process_flag(trap_exit, true),
Shaper = opuntia:new(mongoose_shaper:get_shaper_rate(RoomShaper)),
Shaper = mongoose_shaper:new(RoomShaper),
State = #state{host = Host, host_type = HostType, server_host = ServerHost,
access = Access,
room = Room,
Expand Down Expand Up @@ -345,7 +345,7 @@ init_restored(#{init_type := start_restored,
room_shaper := RoomShaper, http_auth_pool := HttpAuthPool,
opts := Opts}) ->
process_flag(trap_exit, true),
Shaper = opuntia:new(mongoose_shaper:get_shaper_rate(RoomShaper)),
Shaper = mongoose_shaper:new(RoomShaper),
State = set_opts(Opts, #state{host = Host, host_type = HostType,
server_host = ServerHost,
access = Access,
Expand Down Expand Up @@ -1545,8 +1545,8 @@ get_user_activity(JID, StateData) ->
case treap:lookup(jid:to_lower(JID), StateData#state.activity) of
{ok, _P, A} -> A;
error ->
MessageShaper = opuntia:new(mongoose_shaper:get_shaper_rate(get_opt(StateData, user_message_shaper))),
PresenceShaper = opuntia:new(mongoose_shaper:get_shaper_rate(get_opt(StateData, user_presence_shaper))),
MessageShaper = mongoose_shaper:new(get_opt(StateData, user_message_shaper)),
PresenceShaper = mongoose_shaper:new(get_opt(StateData, user_presence_shaper)),
#activity{message_shaper = MessageShaper,
presence_shaper = PresenceShaper}
end.
Expand Down Expand Up @@ -1581,10 +1581,10 @@ store_user_activity(JID, UserActivity, StateData) ->
(UserActivity#activity.presence == undefined) of
true ->
{_, MessageShaperInterval} =
opuntia:update(UserActivity#activity.message_shaper,
mongoose_shaper:update(UserActivity#activity.message_shaper,
100000),
{_, PresenceShaperInterval} =
opuntia:update(UserActivity#activity.presence_shaper,
mongoose_shaper:update(UserActivity#activity.presence_shaper,
100000),
Delay = lists:max([MessageShaperInterval,
PresenceShaperInterval,
Expand Down Expand Up @@ -1632,7 +1632,7 @@ prepare_room_queue(StateData) ->
Packet = Activity#activity.message,
Size = element_size(Packet),
{RoomShaper, RoomShaperInterval} =
opuntia:update(StateData#state.room_shaper, Size),
mongoose_shaper:update(StateData#state.room_shaper, Size),
erlang:send_after(
RoomShaperInterval, self(),
process_room_queue),
Expand All @@ -1643,7 +1643,7 @@ prepare_room_queue(StateData) ->
{_Nick, Packet} = Activity#activity.presence,
Size = element_size(Packet),
{RoomShaper, RoomShaperInterval} =
opuntia:update(StateData#state.room_shaper, Size),
mongoose_shaper:update(StateData#state.room_shaper, Size),
erlang:send_after(
RoomShaperInterval, self(),
process_room_queue),
Expand Down Expand Up @@ -4220,7 +4220,7 @@ route_message(#routed_message{allowed = true, type = <<"groupchat">>,
Now = os:system_time(microsecond),
MinMessageInterval = trunc(get_opt(StateData, min_message_interval) * 1000000),
Size = element_size(Packet),
{MessageShaper, MessageShaperInterval} = opuntia:update(Activity#activity.message_shaper, Size),
{MessageShaper, MessageShaperInterval} = mongoose_shaper:update(Activity#activity.message_shaper, Size),
case {Activity#activity.message /= undefined,
Now >= Activity#activity.message_time + MinMessageInterval,
MessageShaperInterval} of
Expand All @@ -4230,7 +4230,7 @@ route_message(#routed_message{allowed = true, type = <<"groupchat">>,
ejabberd_router:route(StateData#state.jid, From, Err),
StateData;
{false, true, 0} ->
{RoomShaper, RoomShaperInterval} = opuntia:update(StateData#state.room_shaper, Size),
{RoomShaper, RoomShaperInterval} = mongoose_shaper:update(StateData#state.room_shaper, Size),
RoomQueueEmpty = queue:is_empty(StateData#state.room_queue),
case {RoomShaperInterval, RoomQueueEmpty} of
{0, true} ->
Expand Down Expand Up @@ -4310,7 +4310,7 @@ route_message(#routed_message{from = From, packet = Packet, lang = Lang},
StateData.

-spec schedule_queue_processing_when_empty(RoomQueueEmpty :: boolean(),
RoomShaper :: opuntia:shaper(),
RoomShaper :: mongoose_shaper:shaper(),
RoomShaperInterval :: non_neg_integer(),
StateData :: state()) -> state().
schedule_queue_processing_when_empty(true, RoomShaper, RoomShaperInterval, StateData) ->
Expand Down

0 comments on commit cff2834

Please sign in to comment.