diff --git a/big_tests/tests/mam_SUITE.erl b/big_tests/tests/mam_SUITE.erl index cea723c11ac..62fe03a733b 100644 --- a/big_tests/tests/mam_SUITE.erl +++ b/big_tests/tests/mam_SUITE.erl @@ -576,7 +576,7 @@ delete_users(Config) -> increase_limits(Config) -> Config1 = mongoose_helper:backup_and_set_config(Config, increased_limits()), - rpc_apply(shaper_srv, reset_all_shapers, [host_type()]), + rpc_apply(mongoose_shaper, reset_all_shapers, [host_type()]), Config1. increased_limits() -> diff --git a/include/mod_muc_room.hrl b/include/mod_muc_room.hrl index ac5c0d1911c..28c6aef0b93 100644 --- a/include/mod_muc_room.hrl +++ b/include/mod_muc_room.hrl @@ -62,8 +62,8 @@ -record(activity, {message_time = 0, presence_time = 0, - message_shaper :: shaper:shaper(), - presence_shaper :: shaper:shaper(), + message_shaper :: mongoose_shaper:shaper(), + presence_shaper :: mongoose_shaper:shaper(), message, presence }). @@ -85,7 +85,7 @@ subject_timestamp = <<>>, just_created = false :: boolean(), activity = treap:empty() :: treap:treap(), - room_shaper :: shaper:shaper(), + room_shaper :: mongoose_shaper:shaper(), room_queue = queue:new(), http_auth_pool = none :: none | mongoose_http_client:pool(), http_auth_pids = [] :: [pid()], diff --git a/rebar.config b/rebar.config index 5fb51113976..33afbeac85d 100644 --- a/rebar.config +++ b/rebar.config @@ -70,6 +70,7 @@ {flatlog, "0.1.2"}, %%% Stateless libraries + {opuntia, "0.3.0"}, {fast_tls, "1.1.16"}, {fast_scram, "0.5.0"}, {idna, "6.1.1"}, diff --git a/rebar.lock b/rebar.lock index c971a7cf93a..988b92ca7ea 100644 --- a/rebar.lock +++ b/rebar.lock @@ -94,6 +94,7 @@ {ref,"c71265c00bd7b465b7214770895503383dbe299e"}}, 0}, {<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.7.4">>},0}, + {<<"opuntia">>,{pkg,<<"opuntia">>,<<"0.3.0">>},0}, {<<"p1_utils">>,{pkg,<<"p1_utils">>,<<"1.0.25">>},1}, {<<"pa">>, {git,"https://github.com/erszcz/pa.git", @@ -170,6 +171,7 @@ {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, {<<"mysql">>, <<"0E4BBCF701B7D8EA5C3750F220F26323C0FC18B844960988BE196B33A8A9F3C1">>}, {<<"observer_cli">>, <<"3C1BFB6D91BF68F6A3D15F46AE20DA0F7740D363EE5BC041191CE8722A6C4FAE">>}, + {<<"opuntia">>, <<"FB365A48C48CEE76C76407FB0B0E13065E6CB23F4918F7C6B084499FAEB1CCDF">>}, {<<"p1_utils">>, <<"2D39B5015A567BBD2CC7033EEB93A7C60D8C84EFE1EF69A3473FAA07FA268187">>}, {<<"parse_trans">>, <<"BB87AC362A03CA674EBB7D9D498F45C03256ADED7214C9101F7035EF44B798C7">>}, {<<"pooler">>, <<"898CD1FA301FC42D4A8ED598CE139B71CA85B54C16AB161152B5CC5FBDCFA1A8">>}, @@ -233,6 +235,7 @@ {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, {<<"mysql">>, <<"D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3">>}, {<<"observer_cli">>, <<"50DE6D95D814F447458BD5D72666A74624EDDB0EF98BDCEE61A0153AAE0865FF">>}, + {<<"opuntia">>, <<"AB49DB4DA96D30116C85A6D28AC4D6C4BE9C91799D7E2452C37F92C477B9BB88">>}, {<<"p1_utils">>, <<"9219214428F2C6E5D3187FF8EB9A8783695C2427420BE9A259840E07ADA32847">>}, {<<"parse_trans">>, <<"F99E368830BEA44552224E37E04943A54874F08B8590485DE8D13832B63A2DC3">>}, {<<"pooler">>, <<"058D85C5081289B90E97E4DDDBC3BB5A3B4A19A728AB3BC88C689EFCC36A07C7">>}, diff --git a/src/c2s/mongoose_c2s.erl b/src/c2s/mongoose_c2s.erl index f45532bf4a2..870f7d250d2 100644 --- a/src/c2s/mongoose_c2s.erl +++ b/src/c2s/mongoose_c2s.erl @@ -35,7 +35,7 @@ jid :: undefined | jid:jid(), socket :: undefined | mongoose_c2s_socket:socket(), parser :: undefined | exml_stream:parser(), - shaper :: undefined | shaper:shaper(), + shaper :: undefined | mongoose_shaper:shaper(), listener_opts :: undefined | listener_opts(), state_mod = #{} :: #{module() => term()}, info = #{} :: info() @@ -90,7 +90,7 @@ handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect, StateData = #c2s_data{listener_opts = #{shaper := ShaperName, max_stanza_size := MaxStanzaSize} = LOpts}) -> {ok, Parser} = exml_stream:new_parser([{max_child_size, MaxStanzaSize}]), - Shaper = shaper:new(ShaperName), + Shaper = mongoose_shaper:new(ShaperName), C2SSocket = mongoose_c2s_socket:new(SocketModule, SocketOpts, LOpts), StateData1 = StateData#c2s_data{socket = C2SSocket, parser = Parser, shaper = Shaper}, {next_state, {wait_for_stream, stream_start}, StateData1, state_timeout(LOpts)}; @@ -246,7 +246,7 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) -> -spec handle_socket_elements(data(), [exml:element()], non_neg_integer()) -> fsm_res(). handle_socket_elements(StateData = #c2s_data{shaper = Shaper}, Elements, Size) -> - {NewShaper, Pause} = shaper:update(Shaper, Size), + {NewShaper, Pause} = mongoose_shaper:update(Shaper, Size), mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], Size), NewStateData = StateData#c2s_data{shaper = NewShaper}, MaybePauseTimeout = maybe_pause(NewStateData, Pause), diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index ba03fdcdc51..77d0390388d 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -58,7 +58,7 @@ -record(state, {socket :: mongoose_transport:socket_data(), streamid :: ejabberd_s2s:stream_id(), - shaper :: shaper:shaper(), + shaper :: mongoose_shaper:shaper(), tls = false :: boolean(), tls_enabled = false :: boolean(), tls_required = false :: boolean(), @@ -84,7 +84,7 @@ tls_enabled => boolean(), tls_options => mongoose_tls:options(), authenticated => boolean(), - shaper => shaper:shaper(), + shaper => mongoose_shaper:shaper(), domains => [jid:lserver()]}. -type statename() :: 'stream_established' | 'wait_for_feature_request'. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 0ccc21f47b2..d2e723d792d 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -49,7 +49,7 @@ init([]) -> SMBackendSupervisor = supervisor_spec(ejabberd_sm_backend_sup), OutgoingPoolsSupervisor = supervisor_spec(mongoose_wpool_sup), Listener = supervisor_spec(mongoose_listener_sup), - ShaperSup = supervisor_spec(mongoose_shaper_sup), + ShaperSup = mongoose_shaper:child_spec(), DomainSup = supervisor_spec(mongoose_domain_sup), ReceiverSupervisor = ejabberd_tmp_sup_spec(mongoose_transport_sup, [mongoose_transport_sup, mongoose_transport]), diff --git a/src/mam/mod_mam_muc.erl b/src/mam/mod_mam_muc.erl index 28ec3438d0a..23495a28af1 100644 --- a/src/mam/mod_mam_muc.erl +++ b/src/mam/mod_mam_muc.erl @@ -17,7 +17,7 @@ %%% %%% This module should be started for each host. %%% Message archivation is not shaped here (use standard support for this). -%%% MAM's IQs are shaped inside {@link shaper_srv}. +%%% MAM's IQs are shaped inside {@link opuntia_srv}. %%% %%% Message identifiers (or UIDs in the spec) are generated based on: %%% @@ -225,7 +225,7 @@ room_process_mam_iq(Acc, From, To, IQ, #{host_type := HostType}) -> case check_action_allowed(HostType, Acc, To#jid.lserver, Action, MucAction, From, To) of ok -> case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, MucAction, From) of - ok -> + continue -> handle_error_iq(Acc, HostType, To, Action, handle_mam_iq(HostType, Action, From, To, IQ)); {error, max_delay_reached} -> diff --git a/src/mam/mod_mam_pm.erl b/src/mam/mod_mam_pm.erl index 54b9189206d..e35f70c8fd9 100644 --- a/src/mam/mod_mam_pm.erl +++ b/src/mam/mod_mam_pm.erl @@ -17,7 +17,7 @@ %%% %%% This module should be started for each host. %%% Message archivation is not shaped here (use standard support for this). -%%% MAM's IQs are shaped inside {@link shaper_srv}. +%%% MAM's IQs are shaped inside {@link opuntia_srv}. %%% %%% Message identifiers (or UIDs in the spec) are generated based on: %%% @@ -142,7 +142,7 @@ process_mam_iq(Acc, From, To, IQ, _Extra) -> case is_action_allowed(HostType, Action, From, To) of true -> case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, Action, From) of - ok -> + continue -> handle_error_iq(HostType, Acc, To, Action, handle_mam_iq(Action, From, To, IQ, Acc)); {error, max_delay_reached} -> diff --git a/src/mam/mod_mam_utils.erl b/src/mam/mod_mam_utils.erl index d32f101d0f8..85b94434f5d 100644 --- a/src/mam/mod_mam_utils.erl +++ b/src/mam/mod_mam_utils.erl @@ -1022,13 +1022,15 @@ action_to_global_shaper_name(Action) -> list_to_atom(atom_to_list(Action) ++ "_global_shaper"). -spec wait_shaper(mongooseim:host_type(), jid:server(), mam_iq:action(), jid:jid()) -> - 'ok' | {'error', 'max_delay_reached'}. + continue | {error, max_delay_reached}. wait_shaper(HostType, Host, Action, From) -> - case shaper_srv:wait(HostType, Host, action_to_shaper_name(Action), From, 1) of - ok -> - shaper_srv:wait(HostType, Host, action_to_global_shaper_name(Action), global, 1); - Err -> - Err + case mongoose_shaper:wait( + HostType, Host, action_to_shaper_name(Action), From, 1) of + continue -> + mongoose_shaper:wait( + global, Host, action_to_global_shaper_name(Action), From, 1); + {error, max_delay_reached} -> + {error, max_delay_reached} end. %% ----------------------------------------------------------------------- diff --git a/src/mongoose_shaper.erl b/src/mongoose_shaper.erl new file mode 100644 index 00000000000..4fb9dc8c92f --- /dev/null +++ b/src/mongoose_shaper.erl @@ -0,0 +1,70 @@ +-module(mongoose_shaper). + +-export([child_spec/0]). +-export([new/1, update/2, wait/5, reset_all_shapers/1]). +-ignore_xref([reset_all_shapers/1]). + +-type shaper() :: opuntia:shaper(). +-export_type([shaper/0]). + +-spec child_spec() -> supervisor:child_spec(). +child_spec() -> + WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {?MODULE, #{}}}}], + #{id => ?MODULE, + start => {wpool, start_pool, [?MODULE, WPoolOpts]}, + restart => permanent, + shutdown => infinity, + type => supervisor}. + +-spec new(atom()) -> opuntia:shaper(). +new(Name) -> + opuntia:new(get_shaper_config(Name)). + +-spec update(opuntia:shaper(), opuntia:tokens()) -> {opuntia:shaper(), opuntia:delay()}. +update(Shaper, Tokens) -> + opuntia:update(Shaper, Tokens). + +%% @doc Shapes the caller from executing the action. +-spec wait(HostType :: mongooseim:host_type_or_global(), + Domain :: jid:lserver(), + Action :: atom(), + FromJID :: jid:jid(), + Size :: integer()) -> continue | {error, max_delay_reached}. +wait(HostType, Domain, Action, FromJID, Size) -> + Worker = wpool_pool:hash_worker(?MODULE, FromJID), + Config = get_shaper_config(get_shaper_name(HostType, Domain, Action, FromJID)), + Key = new_key(Domain, Action, FromJID), + opuntia_srv:wait(Worker, Key, Size, Config). + +-type key() :: {global | jid:server(), atom(), jid:jid()}. +-spec new_key(jid:server() | global, atom(), jid:jid()) -> key(). +new_key(Domain, Action, FromJID) -> + {Domain, Action, FromJID}. + +%% @doc Ask all shaper servers to forget current shapers and read settings again +reset_all_shapers(_HostType) -> + [ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(?MODULE) ], + ok. + +-spec get_shaper_name(HostType :: mongooseim:host_type_or_global(), + Domain :: global | jid:server(), + Action :: atom(), + FromJID :: jid:jid()) -> allow | none. +get_shaper_name(HostType, Domain, Action, FromJID) -> + case acl:match_rule(HostType, Domain, Action, FromJID) of + deny -> default_shaper(); + Value -> Value + end. + +-spec get_shaper_config(atom()) -> number(). +get_shaper_config(Name) -> + case mongoose_config:lookup_opt([shaper, Name]) of + {ok, #{max_rate := MaxRatePerSecond}} -> + Rate = MaxRatePerSecond div 1000, + #{bucket_size => MaxRatePerSecond, rate => Rate, start_full => true}; + {error, not_found} -> + 0 + end. + +default_shaper() -> + none. diff --git a/src/mongoose_shaper_sup.erl b/src/mongoose_shaper_sup.erl deleted file mode 100644 index 20d7094b699..00000000000 --- a/src/mongoose_shaper_sup.erl +++ /dev/null @@ -1,58 +0,0 @@ --module(mongoose_shaper_sup). - --behaviour(supervisor). - -%% API --export([start_link/0, get_workers/0, select_worker/1]). --ignore_xref([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - --spec start_link() -> {ok, pid()} | ignore | {error, term()}. -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - --spec init([]) -> {ok, {#{strategy => one_for_one, intensity => 100, period => 5}, - [supervisor:child_spec()]}}. -init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 100, period => 5}, - WorkerNames = worker_names(), - TupleNames = list_to_tuple(WorkerNames), - persistent_term:put(?MODULE, TupleNames), - Shapers = [child_spec(Name) || Name <- WorkerNames], - {ok, { SupFlags, Shapers }}. - --spec child_spec(atom()) -> supervisor:child_spec(). -child_spec(ProcName) -> - #{id => ProcName, - start => {shaper_srv, start_link, [ProcName]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [shaper_srv]}. - --spec get_workers() -> [atom()]. -get_workers() -> - tuple_to_list(persistent_term:get(?MODULE)). - --spec select_worker(term()) -> atom(). -select_worker(Key) -> - N = 1 + erlang:phash2(Key, worker_count()), - Workers = persistent_term:get(?MODULE), - element(N, Workers). - --spec worker_names() -> [atom()]. -worker_names() -> - [build_worker_name(N) || N <- lists:seq(1, worker_count())]. - --spec build_worker_name(integer()) -> atom(). -build_worker_name(N) -> - list_to_atom(worker_prefix() ++ integer_to_list(N)). - --spec worker_prefix() -> string(). -worker_prefix() -> - "mongoose_shaper_". - -worker_count() -> - 10. diff --git a/src/mongoose_transport.erl b/src/mongoose_transport.erl index b1dc8d04b5d..46f6d9e42cf 100644 --- a/src/mongoose_transport.erl +++ b/src/mongoose_transport.erl @@ -49,7 +49,7 @@ -record(state, {socket :: socket(), sockmod = gen_tcp :: socket_module(), - shaper_state :: shaper:shaper(), + shaper_state :: mongoose_shaper:shaper(), dest_pid :: undefined | pid(), %% gen_fsm_compat pid max_stanza_size :: stanza_size(), parser :: exml_stream:parser(), @@ -164,7 +164,7 @@ send_text(SocketData, Data) -> #socket_data{sockmod = SockMod, socket = Socket, connection_type = ConnectionType} = SocketData, case catch SockMod:send(Socket, Data) of - ok -> + ok -> update_transport_metrics(byte_size(Data), sent, ConnectionType), ok; {error, timeout} -> @@ -215,7 +215,7 @@ start_link(Socket, Shaper, Opts) -> gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []). init([Socket, Shaper, Opts]) -> - ShaperState = shaper:new(Shaper), + ShaperState = mongoose_shaper:new(Shaper), #{max_stanza_size := MaxStanzaSize, hibernate_after := HibernateAfter, connection_type := ConnectionType} = Opts, @@ -267,7 +267,7 @@ handle_call(_Request, _From, State) -> {reply, ok, State, hibernate_or_timeout(State)}. handle_cast({change_shaper, Shaper}, State) -> - NewShaperState = shaper:new(Shaper), + NewShaperState = mongoose_shaper:new(Shaper), NewState = State#state{shaper_state = NewShaperState}, {noreply, NewState, hibernate_or_timeout(NewState)}; handle_cast(close, State) -> @@ -277,7 +277,7 @@ handle_cast(_Msg, State) -> handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) -> NewState = process_data(Data, State), - {noreply, NewState, hibernate_or_timeout(NewState)}; + {noreply, NewState, hibernate_or_timeout(NewState)}; handle_info({Tag, _TCPSocket, Data}, #state{socket = Socket, sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl -> @@ -394,21 +394,21 @@ process_data(Data, #state{parser = Parser, Size = byte_size(Data), {Events, NewParser} = case exml_stream:parse(Parser, Data) of - {ok, NParser, Elems} -> + {ok, NParser, Elems} -> {[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser}; {error, Reason} -> {[{xmlstreamerror, Reason}], Parser} end, - {NewShaperState, Pause} = shaper:update(ShaperState, Size), + {NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size), update_transport_metrics(Size, received, State#state.connection_type), [gen_fsm_compat:send_event(DestPid, Event) || Event <- Events], maybe_pause(Pause, State), State#state{parser = NewParser, shaper_state = NewShaperState}. -wrap_xml_elements_and_update_metrics(#xmlel{} = E) -> +wrap_xml_elements_and_update_metrics(#xmlel{} = E) -> mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)), {xmlstreamelement, E}; -wrap_xml_elements_and_update_metrics(E) -> +wrap_xml_elements_and_update_metrics(E) -> mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)), E. diff --git a/src/mongooseim.app.src b/src/mongooseim.app.src index 630771af6bc..514ff22b057 100644 --- a/src/mongooseim.app.src +++ b/src/mongooseim.app.src @@ -49,7 +49,8 @@ cowboy_swagger, tomerl, flatlog, - segmented_cache + segmented_cache, + opuntia ]}, {included_applications, [mnesia, cets]}, {env, []}, diff --git a/src/muc/mod_muc.erl b/src/muc/mod_muc.erl index 89cff3c24d3..37c01a83de3 100644 --- a/src/muc/mod_muc.erl +++ b/src/muc/mod_muc.erl @@ -137,7 +137,7 @@ access, history_size :: integer(), default_room_opts :: list(), - room_shaper :: shaper:shaper(), + room_shaper :: mongoose_shaper:shaper(), http_auth_pool :: mongoose_http_client:pool(), hibernated_room_check_interval :: timeout(), hibernated_room_timeout :: timeout() }). @@ -873,7 +873,7 @@ check_user_can_create_room(HostType, ServerHost, AccessCreate, From, RoomID) -> -spec start_room(HostType :: host_type(), ServerHost :: jid:lserver(), MucHost :: muc_host(), Access :: access(), room(), - HistorySize :: undefined | integer(), RoomShaper :: shaper:shaper(), + HistorySize :: undefined | integer(), RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), From :: jid:jid(), nick(), DefRoomOpts :: undefined | [any()], Acc :: mongoose_acc:t()) -> {error, {failed_to_restore, Reason :: term()}} | {ok, pid()}. diff --git a/src/muc/mod_muc_room.erl b/src/muc/mod_muc_room.erl index 45d241c65dd..bd74029587b 100644 --- a/src/muc/mod_muc_room.erl +++ b/src/muc/mod_muc_room.erl @@ -156,7 +156,7 @@ -spec start_new(HostType :: mongooseim:host_type(), Host :: jid:lserver(), ServerHost :: jid:lserver(), Access :: _, Room :: mod_muc:room(), HistorySize :: integer(), - RoomShaper :: shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), + RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), Creator :: jid:jid(), Nick :: mod_muc:nick(), DefRoomOpts :: list()) -> {ok, pid()}. start_new(HostType, Host, ServerHost, Access, Room, @@ -171,7 +171,7 @@ start_new(HostType, Host, ServerHost, Access, Room, -spec start_restored(HostType :: mongooseim:host_type(), Host :: jid:lserver(), ServerHost :: jid:lserver(), Access :: _, Room :: mod_muc:room(), HistorySize :: integer(), - RoomShaper :: shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), + RoomShaper :: mongoose_shaper:shaper(), HttpAuthPool :: none | mongoose_http_client:pool(), Opts :: list()) -> {ok, pid()}. start_restored(HostType, Host, ServerHost, Access, Room, HistorySize, RoomShaper, HttpAuthPool, Opts) @@ -305,7 +305,7 @@ init_new(#{init_type := start_new, host_type := HostType, muc_host := Host, http_auth_pool := HttpAuthPool, creator := Creator, nick := _Nick, def_opts := DefRoomOpts}) when is_list(DefRoomOpts) -> process_flag(trap_exit, true), - Shaper = shaper:new(RoomShaper), + Shaper = mongoose_shaper:new(RoomShaper), State = #state{host = Host, host_type = HostType, server_host = ServerHost, access = Access, room = Room, @@ -345,7 +345,7 @@ init_restored(#{init_type := start_restored, room_shaper := RoomShaper, http_auth_pool := HttpAuthPool, opts := Opts}) -> process_flag(trap_exit, true), - Shaper = shaper:new(RoomShaper), + Shaper = mongoose_shaper:new(RoomShaper), State = set_opts(Opts, #state{host = Host, host_type = HostType, server_host = ServerHost, access = Access, @@ -1545,8 +1545,8 @@ get_user_activity(JID, StateData) -> case treap:lookup(jid:to_lower(JID), StateData#state.activity) of {ok, _P, A} -> A; error -> - MessageShaper = shaper:new(get_opt(StateData, user_message_shaper)), - PresenceShaper = shaper:new(get_opt(StateData, user_presence_shaper)), + MessageShaper = mongoose_shaper:new(get_opt(StateData, user_message_shaper)), + PresenceShaper = mongoose_shaper:new(get_opt(StateData, user_presence_shaper)), #activity{message_shaper = MessageShaper, presence_shaper = PresenceShaper} end. @@ -1581,10 +1581,10 @@ store_user_activity(JID, UserActivity, StateData) -> (UserActivity#activity.presence == undefined) of true -> {_, MessageShaperInterval} = - shaper:update(UserActivity#activity.message_shaper, + mongoose_shaper:update(UserActivity#activity.message_shaper, 100000), {_, PresenceShaperInterval} = - shaper:update(UserActivity#activity.presence_shaper, + mongoose_shaper:update(UserActivity#activity.presence_shaper, 100000), Delay = lists:max([MessageShaperInterval, PresenceShaperInterval, @@ -1632,7 +1632,7 @@ prepare_room_queue(StateData) -> Packet = Activity#activity.message, Size = element_size(Packet), {RoomShaper, RoomShaperInterval} = - shaper:update(StateData#state.room_shaper, Size), + mongoose_shaper:update(StateData#state.room_shaper, Size), erlang:send_after( RoomShaperInterval, self(), process_room_queue), @@ -1643,7 +1643,7 @@ prepare_room_queue(StateData) -> {_Nick, Packet} = Activity#activity.presence, Size = element_size(Packet), {RoomShaper, RoomShaperInterval} = - shaper:update(StateData#state.room_shaper, Size), + mongoose_shaper:update(StateData#state.room_shaper, Size), erlang:send_after( RoomShaperInterval, self(), process_room_queue), @@ -4220,7 +4220,7 @@ route_message(#routed_message{allowed = true, type = <<"groupchat">>, Now = os:system_time(microsecond), MinMessageInterval = trunc(get_opt(StateData, min_message_interval) * 1000000), Size = element_size(Packet), - {MessageShaper, MessageShaperInterval} = shaper:update(Activity#activity.message_shaper, Size), + {MessageShaper, MessageShaperInterval} = mongoose_shaper:update(Activity#activity.message_shaper, Size), case {Activity#activity.message /= undefined, Now >= Activity#activity.message_time + MinMessageInterval, MessageShaperInterval} of @@ -4230,7 +4230,7 @@ route_message(#routed_message{allowed = true, type = <<"groupchat">>, ejabberd_router:route(StateData#state.jid, From, Err), StateData; {false, true, 0} -> - {RoomShaper, RoomShaperInterval} = shaper:update(StateData#state.room_shaper, Size), + {RoomShaper, RoomShaperInterval} = mongoose_shaper:update(StateData#state.room_shaper, Size), RoomQueueEmpty = queue:is_empty(StateData#state.room_queue), case {RoomShaperInterval, RoomQueueEmpty} of {0, true} -> @@ -4310,7 +4310,7 @@ route_message(#routed_message{from = From, packet = Packet, lang = Lang}, StateData. -spec schedule_queue_processing_when_empty(RoomQueueEmpty :: boolean(), - RoomShaper :: shaper:shaper(), + RoomShaper :: mongoose_shaper:shaper(), RoomShaperInterval :: non_neg_integer(), StateData :: state()) -> state(). schedule_queue_processing_when_empty(true, RoomShaper, RoomShaperInterval, StateData) -> diff --git a/src/shaper.erl b/src/shaper.erl deleted file mode 100644 index 41e8940be73..00000000000 --- a/src/shaper.erl +++ /dev/null @@ -1,63 +0,0 @@ -%%============================================================================== -%% Copyright 2016 Erlang Solutions Ltd. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%============================================================================== - --module(shaper). --author('konrad.zemek@erlang-solutions.com'). - --export([new/1, update/2]). - --record(shaper, { - max_rate :: undefined | pos_integer(), - tokens = 0 :: non_neg_integer(), - last_update = erlang:monotonic_time(millisecond) :: integer() -}). - --type shaper() :: #shaper{} | none. - --export_type([shaper/0]). - --spec new(atom()) -> shaper(). -new(Name) -> - case mongoose_config:lookup_opt([shaper, Name]) of - {ok, #{max_rate := MaxRatePerSecond}} -> - #shaper{max_rate = MaxRatePerSecond, - tokens = MaxRatePerSecond, - last_update = erlang:monotonic_time(millisecond)}; - {error, not_found} -> none - end. - -%% @doc Update shaper. -%% `Delay' is how many milliseconds to wait. --spec update(shaper(), Size :: non_neg_integer()) -> {shaper(), Delay :: non_neg_integer()}. -update(none, _Size) -> - {none, 0}; -update(#shaper{max_rate = MaxRatePerSecond, - tokens = LastAvailableTokens, - last_update = LastUpdate}, NowUsed) -> - Now = erlang:monotonic_time(millisecond), - % How much we might have recovered since last time, in milliseconds arithmetic - GrowthPerMillisecond = MaxRatePerSecond / 1000, - MilliSecondsSinceLastUpdate = (Now - LastUpdate), - PossibleTokenGrowth = round(GrowthPerMillisecond * MilliSecondsSinceLastUpdate), - % Available plus recovered cannot grow higher than the actual rate limit - ExactlyAvailableNow = min(MaxRatePerSecond, LastAvailableTokens + PossibleTokenGrowth), - TokensAvailable = max(0, ExactlyAvailableNow - NowUsed), - TokensOverused = max(0, NowUsed - ExactlyAvailableNow), - MaybeDelay = round(TokensOverused / GrowthPerMillisecond), - {#shaper{max_rate = MaxRatePerSecond, - tokens = TokensAvailable, - last_update = Now + MaybeDelay}, - MaybeDelay}. diff --git a/src/shaper_srv.erl b/src/shaper_srv.erl deleted file mode 100644 index 86de92b7255..00000000000 --- a/src/shaper_srv.erl +++ /dev/null @@ -1,165 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author Uvarov Michael -%%% @copyright (C) 2013, Uvarov Michael -%%% @doc Shared shapers. -%%% @end -%%%------------------------------------------------------------------- --module(shaper_srv). --behaviour(gen_server). - -%% API Function Exports --export([start_link/1, wait/5, reset_all_shapers/1]). --ignore_xref([start_link/1, reset_all_shapers/1]). - -%% Record definitions --record(state, { - %% Maximum ammount of milliseconds to wait - max_delay :: non_neg_integer(), - %% How many seconds to store each shaper - ttl :: non_neg_integer(), - shapers :: map(), - a_times :: map() - }). --type state() :: #state{}. - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - -%% API Function Definitions --spec start_link(atom()) -> ignore | {error, _} | {ok, pid()}. -start_link(ProcName) -> - gen_server:start_link({local, ProcName}, ?MODULE, [], []). - -%% @doc Shapes the caller from executing the action. --spec wait(HostType :: mongooseim:host_type(), - Domain :: jid:server(), - Action :: atom(), - FromJID :: jid:jid() | global, - Size :: integer()) -> ok | {error, max_delay_reached}. -wait(HostType, Domain, Action, FromJID, Size) -> - Worker = mongoose_shaper_sup:select_worker(FromJID), - gen_server:call(Worker, {wait, HostType, Domain, Action, FromJID, Size}). - -%% @doc Ask all shaper servers to forget current shapers and read settings again -reset_all_shapers(_HostType) -> - [reset_shapers(ProcName) || ProcName <- mongoose_shaper_sup:get_workers()]. - -%% @doc Ask server to forget its shapers -reset_shapers(ProcName) -> - gen_server:call(ProcName, reset_shapers). - -%% gen_server Function Definitions -init(Args) -> - State = #state{max_delay = proplists:get_value(max_delay, Args, 3000), - ttl = proplists:get_value(ttl, Args, 120), - shapers = #{}, - a_times = #{} - }, - GCInt = proplists:get_value(gc_interval, Args, 30), - timer:send_interval(timer:seconds(GCInt), delete_old_shapers), - {ok, State}. - -handle_call({wait, HostType, Domain, Action, FromJID, Size}, - From, State = #state{max_delay = MaxDelayMs}) -> - Key = new_key(Domain, Action, FromJID), - Shaper = find_or_create_shaper(HostType, Key, State), - State1 = update_access_time(Key, erlang:system_time(), State), - case shaper:update(Shaper, Size) of - {UpdatedShaper, 0} -> - {reply, ok, save_shaper(Key, UpdatedShaper, State1)}; - {UpdatedShaper, DelayMs} when DelayMs < MaxDelayMs -> - reply_after(DelayMs, From, ok), - {noreply, save_shaper(Key, UpdatedShaper, State1)}; - {_, _} -> - {reply, {error, max_delay_reached}, State1} - end; -handle_call(reset_shapers, _From, State = #state{}) -> - {reply, ok, init_dicts(State)}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(delete_old_shapers, State) -> - {noreply, delete_old_shapers(State)}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ - --type key() :: {global | jid:server(), atom(), jid:jid()}. --spec new_key(jid:server() | global, atom(), jid:jid()) -> key(). -new_key(Domain, Action, FromJID) -> - {Domain, Action, FromJID}. - --spec find_or_create_shaper(mongooseim:host_type(), key(), state()) -> - shaper:shaper(). -find_or_create_shaper(HostType, Key, #state{shapers = Shapers}) -> - case Shapers of - #{Key := Shaper} -> Shaper; - _ -> create_shaper(HostType, Key) - end. - --spec update_access_time(key(), _, state()) -> state(). -update_access_time(Key, Now, State = #state{a_times = Times}) -> - State#state{a_times = maps:put(Key, Now, Times)}. - --spec save_shaper(key(), shaper:shaper(), state()) -> state(). -save_shaper(Key, Shaper, State = #state{shapers = Shapers}) -> - State#state{shapers = maps:put(Key, Shaper, Shapers)}. - --spec init_dicts(state()) -> state(). -init_dicts(State) -> - State#state{shapers = #{}, a_times = #{}}. - --spec delete_old_shapers(state()) -> state(). -delete_old_shapers(State = #state{shapers = Shapers, a_times = Times, ttl = TTL}) -> - Min = subtract_seconds(TTL), - %% Copy recently modified shapers - maps:fold(fun - (_, ATime, Acc) when ATime < Min -> Acc; %% skip too old - (Key, ATime, Acc) -> - Shaper = maps:get(Key, Shapers), - update_access_time(Key, ATime, save_shaper(Key, Shaper, Acc)) - end, init_dicts(State), Times). - --spec create_shaper(mongooseim:host_type(), key()) -> - none | shaper:shaper(). -create_shaper(HostType, Key) -> - shaper:new(request_shaper_name(HostType, Key)). - --spec request_shaper_name(mongooseim:host_type(), key()) -> atom(). -request_shaper_name(HostType, {Domain, Action, FromJID}) -> - get_shaper_name(HostType, Domain, Action, FromJID, default_shaper()). - -default_shaper() -> - none. - --spec get_shaper_name(HostType :: mongooseim:host_type(), - Domain :: global | jid:server(), - Action :: atom(), jid:jid(), - Default :: none) -> allow | none. -get_shaper_name(HostType, Domain, Action, FromJID, Default) -> - case acl:match_rule(HostType, Domain, Action, FromJID) of - deny -> Default; - Value -> Value - end. - -%% @doc It is a small hack -%% This function calls this in more efficient way: -%% timer:apply_after(DelayMs, gen_server, reply, [From, Reply]). --spec reply_after(pos_integer(), {atom() | pid(), _}, ok) -> reference(). -reply_after(DelayMs, {Pid, Tag}, Reply) -> - erlang:send_after(DelayMs, Pid, {Tag, Reply}). - --spec subtract_seconds(integer()) -> integer(). -subtract_seconds(TTL) -> - TimestampThreshold = erlang:system_time(second) - TTL, - erlang:convert_time_unit(TimestampThreshold, second, native).