Skip to content

Commit

Permalink
Use different AMQP address format for v1 and v2
Browse files Browse the repository at this point in the history
to distinguish between v1 and v2 address formats.

Previously, v1 and v2 address formats overlapped and behaved differently
for example for:
```
/queue/:queue
/exchange/:exchange
```

This PR changes the v2 format to:
```
/e/:exchange/:routing-key
/e/:exchange
/q/:queue
```
to distinguish between v1 and v2 addresses.

This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)`
only if we know that the user requests address format v1.

Note that `rabbit_deprecated_features:is_permitted/1` should only
be called when the old feature is actually used.

Use percent encoding / decoding for address URI format v2.
This allows to use any UTF-8 encoded characters including slashes (`/`)
in routing keys, exchange names, and queue names and is more future
safe.
  • Loading branch information
ansd committed Jul 3, 2024
1 parent 1bc0d89 commit 0de9591
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 173 deletions.
231 changes: 130 additions & 101 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
-rabbit_deprecated_feature(
{amqp_address_v1,
#{deprecation_phase => permitted_by_default,
doc_url => "https://www.rabbitmq.com/docs/next/amqp#address",
messages =>
#{when_permitted =>
"RabbitMQ AMQP address version 1 is deprecated. "
"Clients should use RabbitMQ AMQP address version 2."}}
"Clients should use RabbitMQ AMQP address version 2.",
when_denied =>
"RabbitMQ AMQP address version 1 is unsupported. "
"Clients must use RabbitMQ AMQP address version 2."
}}
}).

-define(PROTOCOL, amqp10).
Expand Down Expand Up @@ -2422,12 +2427,20 @@ ensure_source(#'v1_0.source'{address = Address,
durable = Durable},
Vhost, User, PermCache, TopicPermCache) ->
case Address of
{utf8, <<"/q/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is:
%% /q/:queue
QNameBin = unquote(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache};
{utf8, SourceAddr} ->
case address_v1_permitted() of
true -> ensure_source_v1(
SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache);
false -> ensure_source_v2(
SourceAddr, Vhost, PermCache, TopicPermCache)
true ->
ensure_source_v1(SourceAddr, Vhost, User, Durable,
PermCache, TopicPermCache);
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end;
_ ->
{error, {bad_address, Address}}
Expand Down Expand Up @@ -2467,19 +2480,10 @@ ensure_source_v1(Address,
Err
end
end;
{error, _} ->
ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0)
{error, _} = Err ->
Err
end.

%% The only possible v2 source address format is:
%% /queue/:queue
ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache};
ensure_source_v2(Address, _, _, _) ->
{error, {bad_address, Address}}.

-spec ensure_target(#'v1_0.target'{},
rabbit_types:vhost(),
rabbit_types:user(),
Expand All @@ -2495,29 +2499,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
ensure_target(#'v1_0.target'{address = Address,
durable = Durable},
Vhost, User, PermCache) ->
case address_v1_permitted() of
true ->
try_target_v1(Address, Vhost, User, Durable, PermCache);
false ->
try_target_v2(Address, Vhost, User, PermCache)
end.

try_target_v1(Address, Vhost, User, Durable, PermCache0) ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
{ok, XNameBin, RKey, QNameBin, PermCache} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} ->
try_target_v2(Address, Vhost, User, PermCache0)
end.

try_target_v2(Address, Vhost, User, PermCache) ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} = Err ->
Err
case target_address_version(Address) of
2 ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} = Err ->
Err
end;
1 ->
case address_v1_permitted() of
true ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
{error, _} = Err ->
Err
end;
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end
end.

check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
Expand All @@ -2539,29 +2542,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
exit_not_found(XName)
end.

ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
case rabbit_routing_parser:parse_endpoint(Address, true) of
{ok, Dest} ->
{QNameBin, PermCache} = ensure_terminus(
target, Dest, Vhost, User, Durable, PermCache0),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XNameBin = unicode:characters_to_binary(XNameList1),
RoutingKey = case RK of
undefined -> subject;
[] -> subject;
_ -> unicode:characters_to_binary(RK)
end,
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
{error, _} = Err ->
Err
end;
ensure_target_v1(Address, _, _, _, _) ->
{error, {bad_address, Address}}.
address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

target_address_version({utf8, <<"/e/", _/binary>>}) ->
2;
target_address_version({utf8, <<"/q/", _/binary>>}) ->
2;
target_address_version(undefined) ->
%% anonymous terminus
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2;
target_address_version(_Address) ->
1.

%% The possible v2 target address formats are:
%% /exchange/:exchange/key/:routing-key
%% /exchange/:exchange
%% /queue/:queue
%% /e/:exchange/:routing-key
%% /e/:exchange
%% /q/:queue
%% <null>
ensure_target_v2({utf8, String}, Vhost) ->
case parse_target_v2_string(String) of
Expand All @@ -2576,43 +2574,77 @@ ensure_target_v2({utf8, String}, Vhost) ->
ensure_target_v2(undefined, _) ->
%% anonymous terminus
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
{ok, to, to, undefined};
ensure_target_v2(Address, _) ->
{error, {bad_address, Address}}.
{ok, to, to, undefined}.

parse_target_v2_string(<<"/exchange/", Rest/binary>>) ->
case split_exchange_target(Rest) of
{?DEFAULT_EXCHANGE_NAME, _} ->
parse_target_v2_string(<<"/e/", Rest/binary>>) ->
Key = cp_slash,
Pattern = try persistent_term:get(Key)
catch error:badarg ->
Cp = binary:compile_pattern(<<"/">>),
ok = persistent_term:put(Key, Cp),
Cp
end,
case binary:split(Rest, Pattern, [global]) of
[?DEFAULT_EXCHANGE_NAME | _] ->
{error, bad_address};
{<<"amq.default">>, _} ->
[<<"amq.default">> | _] ->
{error, bad_address};
{XNameBin, RKey} ->
{ok, XNameBin, RKey, undefined}
[XNameBinQuoted] ->
XNameBin = unquote(XNameBinQuoted),
{ok, XNameBin, <<>>, undefined};
[XNameBinQuoted, RKeyQuoted] ->
XNameBin = unquote(XNameBinQuoted),
RKey = unquote(RKeyQuoted),
{ok, XNameBin, RKey, undefined};
_ ->
{error, bad_address}
end;
parse_target_v2_string(<<"/queue/">>) ->
parse_target_v2_string(<<"/q/">>) ->
%% empty queue name is invalid
{error, bad_address};
parse_target_v2_string(<<"/queue/", QNameBin/binary>>) ->
parse_target_v2_string(<<"/q/", QNameBinQuoted/binary>>) ->
QNameBin = unquote(QNameBinQuoted),
{ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin};
parse_target_v2_string(_) ->
{error, bad_address}.

%% Empty exchange name (default exchange) is valid.
split_exchange_target(Target) ->
Key = cp_amqp_target_address,
Pattern = try persistent_term:get(Key)
catch error:badarg ->
Cp = binary:compile_pattern(<<"/key/">>),
ok = persistent_term:put(Key, Cp),
Cp
end,
case binary:split(Target, Pattern) of
[XNameBin] ->
{XNameBin, <<>>};
[XNameBin, RoutingKey] ->
{XNameBin, RoutingKey}
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
case rabbit_routing_parser:parse_endpoint(Address, true) of
{ok, Dest} ->
{QNameBin, PermCache} = ensure_terminus(
target, Dest, Vhost, User, Durable, PermCache0),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XNameBin = unicode:characters_to_binary(XNameList1),
RoutingKey = case RK of
undefined -> subject;
[] -> subject;
_ -> unicode:characters_to_binary(RK)
end,
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
{error, _} = Err ->
Err
end;
ensure_target_v1(Address, _, _, _, _) ->
{error, {bad_address, Address}}.

%% uri_string:unquote/1 is implemented inefficiently because it always creates
%% a new binary. We optimise for the common case: When no character is percent
%% encoded, we avoid a new binary being created.
unquote(Bin) ->
case is_quoted(Bin) of
true ->
uri_string:unquote(Bin);
false ->
Bin
end.

is_quoted(<<>>) ->
false;
is_quoted(<<$%, _/binary>>) ->
true;
is_quoted(<<_, Rest/binary>>) ->
is_quoted(Rest).

handle_outgoing_mgmt_link_flow_control(
#management_link{delivery_count = DeliveryCountSnd} = Link0,
#'v1_0.flow'{handle = Handle = ?UINT(HandleInt),
Expand Down Expand Up @@ -3355,14 +3387,24 @@ error_not_found(Resource) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}}.

address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

-spec cap_credit(rabbit_queue_type:credit()) ->
0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX.
cap_credit(DesiredCredit) ->
min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.

format_status(
#{state := #state{cfg = Cfg,
outgoing_pending = OutgoingPending,
Expand Down Expand Up @@ -3407,16 +3449,3 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.
Loading

0 comments on commit 0de9591

Please sign in to comment.