Skip to content

Commit

Permalink
replace context key tupels with records (#279)
Browse files Browse the repository at this point in the history
Instead of having to count fields in tuples, records allow us to
specify matches with the more readable record syntax.
They also give us `tagged keys`, that means we can deduce the type
of the key from the record tag.

This was originally part of the stateless work, but will also be
helpful for the cluster registry.
  • Loading branch information
RoadRunnr committed Dec 17, 2020
1 parent 22ded94 commit 09e97cb
Show file tree
Hide file tree
Showing 20 changed files with 250 additions and 165 deletions.
4 changes: 4 additions & 0 deletions include/ergw.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@
ip :: {inet:ip4_address(),1..32}|
{inet:ip6_address(),1..128}
}).

-record(seid_key, {seid}).
-record(context_key, {socket, id}).
-record(socket_teid_key, {name, type, teid}).
3 changes: 2 additions & 1 deletion src/ergw.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
terminate/2, code_change/3]).

-include_lib("kernel/include/logger.hrl").
-include("include/ergw.hrl").

-define(SERVER, ?MODULE).
-record(state, {tid :: ets:tid()}).
Expand Down Expand Up @@ -208,7 +209,7 @@ i(memory, path) ->

i(memory, context) ->
MemUsage =
lists:foldl(fun({{seid, _}, {_, Pid}}, Mem) ->
lists:foldl(fun({#seid_key{}, {_, Pid}}, Mem) ->
{memory, M} = erlang:process_info(Pid, memory),
Mem + M;
(_, Mem) ->
Expand Down
6 changes: 4 additions & 2 deletions src/ergw_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

-ignore_xref([peer/1, tunnel/1, memory/1]).

-include("include/ergw.hrl").

%%%===================================================================
%%% API
%%%===================================================================
Expand All @@ -38,8 +40,8 @@ tunnel(Socket) when is_atom(Socket) ->
lists:foldl(fun collext_path_contexts/2, [], gtp_path_reg:all(Socket)).

contexts(all) ->
lists:usort([Pid || {{_Socket, {teid, 'gtp-c', _TEID}}, {_, Pid}}
<- gtp_context_reg:all(), is_pid(Pid)]).
lists:usort([Pid || {#socket_teid_key{type = 'gtp-c'}, {_, Pid}}
<- gtp_context_reg:all(), is_pid(Pid)]).

delete_contexts(all) ->
lists:foreach(fun(Context) ->
Expand Down
28 changes: 12 additions & 16 deletions src/ergw_context.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,23 @@
%%% -----------------------------------------------------------------

sx_report(#pfcp{type = session_report_request, seid = SEID} = Report) ->
apply2context({seid, SEID}, sx_report, [Report]).
apply2context(#seid_key{seid = SEID}, sx_report, [Report]).

%% port_message/2
port_message(Request, Msg) ->
proc_lib:spawn(fun() -> port_message_h(Request, Msg) end),
ok.

%% port_message/3
port_message(Keys, #request{socket = Socket} = Request, Msg)
when is_list(Keys) ->
Contexts = gtp_context_reg:match_keys(Socket, Keys),
port_message_ctx(Contexts, Request, Msg).
port_message(Id, #request{socket = Socket} = Request, Msg) ->
Key = gtp_context:context_key(Socket, Id),
case gtp_context_reg:select(Key) of
[{Handler, Server}] when is_atom(Handler), is_pid(Server) ->
Handler:port_message(Server, Request, Msg, false);
_Other ->
?LOG(debug, "unable to find context ~p", [Key]),
throw({error, not_found})
end.

%% port_message/4
port_message(Key, Request, Msg, Resent) ->
Expand All @@ -69,9 +74,6 @@ apply2context(Key, F, A) ->
{error, not_found}
end.

port_request_key(#request{key = ReqKey, socket = Socket}) ->
gtp_context:socket_key(Socket, ReqKey).

%% TODO - MAYBE
%% it might be benificial to first perform the lookup and then enqueue
%%
Expand All @@ -94,9 +96,9 @@ port_message_h(Request, #gtp{} = Msg) ->

port_message_run(Request, #gtp{type = g_pdu} = Msg) ->
port_message_p(Request, Msg);
port_message_run(Request, Msg0) ->
port_message_run(#request{key = ReqKey} = Request, Msg0) ->
Msg = gtp_packet:decode_ies(Msg0),
case port_message(port_request_key(Request), Request, Msg, true) of
case port_message(ReqKey, Request, Msg, true) of
{error, not_found} ->
port_message_p(Request, Msg);
Result ->
Expand All @@ -113,12 +115,6 @@ port_message_p(#request{socket = Socket} = Request, #gtp{tei = TEI} = Msg) ->
Result
end.

port_message_ctx([{Handler, Server} | _], Request, Msg)
when is_atom(Handler), is_pid(Server) ->
Handler:port_message(Server, Request, Msg, false);
port_message_ctx(_, _Request, _Msg) ->
throw({error, not_found}).

load_class(#gtp{version = v1} = Msg) ->
gtp_v1_c:load_class(Msg);
load_class(#gtp{version = v2} = Msg) ->
Expand Down
5 changes: 3 additions & 2 deletions src/ergw_gtp_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ make_seq_id(#gtp{version = Version, seq_no = SeqNo})
make_seq_id(_) ->
undefined.

make_request(ArrivalTS, Src, IP, Port, Msg = #gtp{version = Version, type = Type}, Socket, Info) ->
make_request(ArrivalTS, Src, IP, Port, Msg = #gtp{version = Version, type = Type},
#socket{name = SocketName} = Socket, Info) ->
SeqId = make_seq_id(Msg),
#request{
key = {Socket, IP, Port, Type, SeqId},
key = {request, {SocketName, IP, Port, Type, SeqId}},
socket = Socket,
info = Info,
src = Src,
Expand Down
2 changes: 1 addition & 1 deletion src/ergw_pfcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ outer_header_removal(v6) ->
#outer_header_removal{header = 'GTP-U/UDP/IPv6'}.

ctx_teid_key(#pfcp_ctx{name = Name}, TEI) ->
{Name, {teid, 'gtp-u', TEI}}.
#socket_teid_key{name = Name, type = 'gtp-u', teid = TEI}.

up_inactivity_timer(#pfcp_ctx{up_inactivity_timer = Timer})
when is_integer(Timer) ->
Expand Down
2 changes: 1 addition & 1 deletion src/ergw_pfcp_context.erl
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ make_pctx_bearer_key(_, _, _, Keys) ->
Keys.

make_pctx_keys(Bearer, #pfcp_ctx{seid = #seid{cp = SEID}} = PCtx) ->
maps:fold(make_pctx_bearer_key(_, _, PCtx, _), [{seid, SEID}], Bearer).
maps:fold(make_pctx_bearer_key(_, _, PCtx, _), [#seid_key{seid = SEID}], Bearer).

register_ctx_ids(Handler, Bearer, PCtx) ->
Keys = make_pctx_keys(Bearer, PCtx),
Expand Down
2 changes: 1 addition & 1 deletion src/ergw_sx_node.erl
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ init([Parent, Node, NodeSelect, IP4, IP6, NotifyUp]) ->

RegKeys =
[gtp_context:socket_teid_key(Socket, TEI),
{seid, SEID}],
#seid_key{seid = SEID}],
gtp_context_reg:register(RegKeys, ?MODULE, self()),

Nodes = setup:get_env(ergw, nodes, #{}),
Expand Down
30 changes: 14 additions & 16 deletions src/gtp_context.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
validate_options/3,
validate_option/2,
generic_error/3,
socket_key/2, socket_teid_key/2]).
context_key/2, socket_teid_key/2]).
-export([usage_report_to_accounting/1,
collect_charging_events/2]).

Expand Down Expand Up @@ -167,7 +167,7 @@ collect_charging_events(OldS, NewS) ->
%% preexisting context should be deleted locally. This function does that.
terminate_colliding_context(#tunnel{socket = Socket}, #context{context_id = Id})
when Id /= undefined ->
case gtp_context_reg:lookup(socket_key(Socket, Id)) of
case gtp_context_reg:lookup(context_key(Socket, Id)) of
{?MODULE, Server} when is_pid(Server) ->
gtp_context:terminate_context(Server);
_ ->
Expand Down Expand Up @@ -276,15 +276,15 @@ sx_report(Server, Report) ->
port_message(Request, #gtp{version = v2, type = MsgType, tei = 0} = Msg)
when MsgType == change_notification_request;
MsgType == change_notification_response ->
Keys = gtp_v2_c:get_msg_keys(Msg),
ergw_context:port_message(Keys, Request, Msg);
Id = gtp_v2_c:get_context_id(Msg),
ergw_context:port_message(Id, Request, Msg);

%% same as above for GTPv2
port_message(Request, #gtp{version = v1, type = MsgType, tei = 0} = Msg)
when MsgType == ms_info_change_notification_request;
MsgType == ms_info_change_notification_response ->
Keys = gtp_v1_c:get_msg_keys(Msg),
ergw_context:port_message(Keys, Request, Msg);
Id = gtp_v1_c:get_context_id(Msg),
ergw_context:port_message(Id, Request, Msg);

port_message(#request{socket = Socket, info = Info} = Request,
#gtp{version = Version, tei = 0} = Msg) ->
Expand Down Expand Up @@ -733,11 +733,11 @@ generic_error(#request{socket = Socket} = Request,
%%% Internal functions
%%%===================================================================

register_request(Handler, Server, #request{key = ReqKey, socket = Socket}) ->
gtp_context_reg:register([socket_key(Socket, ReqKey)], Handler, Server).
register_request(Handler, Server, #request{key = ReqKey}) ->
gtp_context_reg:register([ReqKey], Handler, Server).

unregister_request(#request{key = ReqKey, socket = Socket}) ->
gtp_context_reg:unregister([socket_key(Socket, ReqKey)], ?MODULE, self()).
unregister_request(#request{key = ReqKey}) ->
gtp_context_reg:unregister([ReqKey], ?MODULE, self()).

get_handler_if(Socket, #gtp{version = v1} = Msg) ->
gtp_v1_c:get_handler(Socket, Msg);
Expand Down Expand Up @@ -794,7 +794,7 @@ context2keys(#tunnel{socket = Socket} = LeftTunnel, Bearer,
#context{apn = APN, context_id = ContextId}) ->
ordsets:from_list(
tunnel2keys(LeftTunnel)
++ [socket_key(Socket, ContextId) || ContextId /= undefined]
++ [context_key(Socket, ContextId) || ContextId /= undefined]
++ maps:fold(bsf_keys(APN, _, _, _), [], Bearer)).

tunnel2keys(Tunnel) ->
Expand All @@ -808,10 +808,8 @@ bsf_keys(APN, _, #bearer{vrf = VRF, local = #ue_ip{v4 = IPv4, v6 = IPv6}}, Keys)
bsf_keys(_, _, _, Keys) ->
Keys.

socket_key(#socket{name = Name}, Key) ->
{Name, Key};
socket_key({Name, _}, Key) ->
{Name, Key}.
context_key(#socket{name = Name}, Id) ->
#context_key{socket = Name, id = Id}.

tunnel_key(local, #tunnel{socket = Socket, local = #fq_teid{teid = TEID}}) ->
socket_teid_key(Socket, TEID);
Expand All @@ -823,7 +821,7 @@ socket_teid_key(#socket{type = Type} = Socket, TEI) ->
socket_teid_key(Socket, Type, TEI).

socket_teid_key(#socket{name = Name}, Type, TEI) ->
{Name, {teid, Type, TEI}}.
#socket_teid_key{name = Name, type = Type, teid = TEI}.

%%====================================================================
%% Experimental Trigger Support
Expand Down
14 changes: 0 additions & 14 deletions src/gtp_context_reg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
-export([start_link/0]).
-export([register/3, register_new/3, update/4, unregister/3,
lookup/1, select/1,
match_keys/2,
await_unreg/1]).
-export([all/0]).

Expand Down Expand Up @@ -50,19 +49,6 @@ lookup(Key) when is_tuple(Key) ->
select(Key) ->
ets:select(?SERVER, [{{Key, '$1'},[],['$1']}]).

match_key(#socket{name = Name}, Key) ->
select({Name, Key}).

match_keys(_, []) ->
throw({error, not_found});
match_keys(Socket, [H|T]) ->
case match_key(Socket, H) of
[_|_] = Match ->
Match;
_ ->
match_keys(Socket, T)
end.

register(Keys, Handler, Pid)
when is_list(Keys), is_atom(Handler), is_pid(Pid) ->
gen_server:call(?SERVER, {register, Keys, Handler, Pid}).
Expand Down
16 changes: 4 additions & 12 deletions src/gtp_v1_c.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
build_echo_request/0,
validate_teid/2,
type/0, port/0,
get_msg_keys/1, update_context_id/2,
get_context_id/1, update_context_id/2,
get_cause/1, get_common_flags/1,
find_sender_teid/1,
load_class/1]).
Expand Down Expand Up @@ -233,7 +233,7 @@ get_common_flags(IEs) ->
get_ext_common_flags(IEs) ->
get_element(?'Extended Common Flags', IEs, #extended_common_flags.flags, []).

get_context_id(IEs) ->
get_context_id(#gtp{version = v1, ie = IEs}) ->
NSAPI = get_element(?'NSAPI', IEs, #nsapi.nsapi, '_'),
UIMSI = proplists:get_bool('Unauthenticated IMSI', get_ext_common_flags(IEs)),
%% order of key selection, first match terminates:
Expand All @@ -251,16 +251,8 @@ get_context_id(IEs) ->
undefined
end.

get_msg_keys(#gtp{version = v1, ie = IEs}) ->
case get_context_id(IEs) of
undefined ->
[];
Id ->
[Id]
end.

update_context_id(#gtp{version = v1, ie = IEs}, Context) ->
case get_context_id(IEs) of
update_context_id(Msg, Context) ->
case get_context_id(Msg) of
{_, _, NSAPI} = Id when is_integer(NSAPI) ->
Context#context{context_id = Id};
_ ->
Expand Down
16 changes: 4 additions & 12 deletions src/gtp_v2_c.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
build_echo_request/0,
validate_teid/2,
type/0, port/0,
get_msg_keys/1, update_context_id/2,
get_context_id/1, update_context_id/2,
get_cause/1, get_indication_flags/1,
find_sender_teid/1,
load_class/1]).
Expand Down Expand Up @@ -258,7 +258,7 @@ get_context_ebi(#{?'Bearer Contexts' :=
get_context_ebi(_) ->
'_'.

get_context_id(IEs) ->
get_context_id(#gtp{version = v2, ie = IEs}) ->
EBI = get_context_ebi(IEs),
UIMSI = proplists:get_bool('UIMSI', get_indication_flags(IEs)),
case {UIMSI, IEs} of
Expand All @@ -270,16 +270,8 @@ get_context_id(IEs) ->
undefined
end.

get_msg_keys(#gtp{version = v2, ie = IEs}) ->
case get_context_id(IEs) of
undefined ->
[];
Id ->
[Id]
end.

update_context_id(#gtp{version = v2, ie = IEs}, Context) ->
case get_context_id(IEs) of
update_context_id(Msg, Context) ->
case get_context_id(Msg) of
{_, _, EBI} = Id when is_integer(EBI) ->
Context#context{context_id = Id};
_Other ->
Expand Down
4 changes: 3 additions & 1 deletion test/ergw_test_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
-export([init_ets/1]).

-include("ergw_test_lib.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("kernel/include/logger.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("gtplib/include/gtp_packet.hrl").
Expand Down Expand Up @@ -544,7 +545,8 @@ add_cfg_value([H | T], Value, Config) ->
%%%===================================================================

outstanding_requests() ->
ets:match_object(gtp_context_reg, {{'_', {'_', '_', '_', '_', '_'}}, '_'}).
Ms = ets:fun2ms(fun({Key, _} = Obj) when element(1, Key) == 'request' -> Obj end),
ets:select(gtp_context_reg, Ms).

wait4tunnels(Cnt) ->
case [X || X = #{tunnels := T} <- ergw_api:peer(all), T /= 0] of
Expand Down
3 changes: 2 additions & 1 deletion test/ergw_test_sx_up.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ handle_call({send, Msg}, _From,
#state{gtp = GtpSocket, cp_ip = IP, up_ip = UpIP} = State)
when is_binary(Msg) ->
{ok, SxPid} = ergw_sx_node_reg:lookup(ergw_inet:bin2ip(UpIP)),
[[SxTEI]] = ets:match(gtp_context_reg, {{'cp-socket',{teid,'gtp-u','$1'}},{'_',SxPid}}),
TEIDMatch = #socket_teid_key{name = 'cp-socket', type = 'gtp-u', teid = '$1', _ = '_'},
[[SxTEI]] = ets:match(gtp_context_reg, {TEIDMatch, {'_',SxPid}}),
BinMsg = gtp_packet:encode(#gtp{version = v1, type = g_pdu, tei = SxTEI, ie = Msg}),
ok = gen_udp:send(GtpSocket, IP, ?GTP1u_PORT, BinMsg),
{reply, ok, State};
Expand Down
Loading

0 comments on commit 09e97cb

Please sign in to comment.