Skip to content

Commit

Permalink
Require all feature flags introduced before 3.11.1
Browse files Browse the repository at this point in the history
RabbitMQ 3.12 requires feature flag `feature_flags_v2` which got
introduced in 3.11.0 (see
#6810).

Therefore, we can mark all feature flags that got introduced in 3.11.0
or before 3.11.0 as required because users will have to upgrade to
3.11.x first, before upgrading to 3.12.x

The advantage of marking these feature flags as required is that we can
start deleting any compatibliy code for these feature flags, similarly
as done in #5215

This list shows when a given feature flag was first introduced:

```
classic_mirrored_queue_version 3.11.0
stream_single_active_consumer 3.11.0
direct_exchange_routing_v2 3.11.0
listener_records_in_ets 3.11.0
tracking_records_in_ets 3.11.0

empty_basic_get_metric 3.8.10
drop_unroutable_metric 3.8.10
```

In this commit, we also force all required feature flags in Erlang
application `rabbit` to be enabled in mixed version cluster testing
and delete any tests that were about a feature flag starting as disabled.

Furthermore, this commit already deletes the callback (migration) functions
given they do not run anymore in 3.12.x.

All other clean up (i.e. branching depending on whether a feature flag
is enabled) will be done in separate commits.
  • Loading branch information
ansd committed Feb 8, 2023
1 parent 8b88e45 commit 5045fce
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 587 deletions.
32 changes: 3 additions & 29 deletions deps/rabbit/src/rabbit_channel_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@
-export([list/0, list_of_user/1, list_on_node/1,
tracked_channel_table_name_for/1,
tracked_channel_per_user_table_name_for/1,
get_all_tracked_channel_table_names_for_node/1,
ensure_tracked_tables_for_this_node/0,
delete_tracked_channel_user_entry/1]).

%% All nodes (that support the `tracking_records_in_ets' feature) must
%% export this function with the same spec, as they are called via
%% RPC from other nodes. (Their implementation can differ.)
-export([count_local_tracked_items_of_user/1]).

-export([migrate_tracking_records/0]).
-ifdef(TEST).
-export([get_all_tracked_channel_table_names_for_node/1]).
-endif.

-include_lib("rabbit_common/include/rabbit.hrl").

Expand Down Expand Up @@ -464,27 +462,3 @@ close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
|| #tracked_channel{pid = ChPid} <- TrackedChannels],
ok;
close_channels(_TrackedChannels = []) -> ok.

migrate_tracking_records() ->
Node = node(),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_channel_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Channels = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(Channel) ->
ets:insert(tracked_channel, Channel)
end, Channels)
end),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_channel_per_user_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Channels = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(#tracked_channel_per_user{channel_count = C,
user = Username}) ->
ets:update_counter(tracked_channel_per_user, Username, C, {Username, 0})
end, Channels)
end).
43 changes: 3 additions & 40 deletions deps/rabbit/src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
-export([tracked_connection_table_name_for/1,
tracked_connection_per_vhost_table_name_for/1,
tracked_connection_per_user_table_name_for/1,
get_all_tracked_connection_table_names_for_node/1,
clear_tracked_connection_tables_for_this_node/0,

ensure_tracked_tables_for_this_node/0,
Expand All @@ -41,13 +40,12 @@
tracked_connection_from_connection_state/1,
lookup/1, count/0]).

%% All nodes (that support the `tracking_records_in_ets' feature) must
%% export these functions with the same spec, as they are called via
%% RPC from other nodes. (Their implementation can differ.)
-export([count_local_tracked_items_in_vhost/1,
count_local_tracked_items_of_user/1]).

-export([migrate_tracking_records/0]).
-ifdef(TEST).
-export([get_all_tracked_connection_table_names_for_node/1]).
-endif.

-include_lib("rabbit_common/include/rabbit.hrl").

Expand Down Expand Up @@ -728,38 +726,3 @@ close_connection(#tracked_connection{pid = Pid}, Message) ->
% best effort, this will work for connections to the stream plugin
Node = node(Pid),
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).

migrate_tracking_records() ->
Node = node(),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_connection_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(Connection) ->
ets:insert(tracked_connection, Connection)
end, Connections)
end),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_connection_per_user_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(#tracked_connection_per_user{connection_count = C,
user = Username}) ->
ets:update_counter(tracked_connection_per_user, Username, C, {Username, 0})
end, Connections)
end),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_connection_per_vhost_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(#tracked_connection_per_vhost{connection_count = C,
vhost = VHost}) ->
ets:update_counter(tracked_connection_per_vhost, VHost, C, {VHost, 0})
end, Connections)
end).
166 changes: 13 additions & 153 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,11 @@

-module(rabbit_core_ff).

-include_lib("kernel/include/logger.hrl").

-include_lib("rabbit_common/include/logging.hrl").

-export([direct_exchange_routing_v2_enable/1,
listener_records_in_ets_enable/1,
listener_records_in_ets_post_enable/1,
tracking_records_in_ets_enable/1,
tracking_records_in_ets_post_enable/1]).

-rabbit_feature_flag(
{classic_mirrored_queue_version,
#{desc => "Support setting version for classic mirrored queues",
stability => stable
%%TODO remove compatibility code
stability => required
}}).

-rabbit_feature_flag(
Expand Down Expand Up @@ -68,7 +59,8 @@
{stream_single_active_consumer,
#{desc => "Single active consumer for streams",
doc_url => "https://www.rabbitmq.com/stream.html",
stability => stable,
%%TODO remove compatibility code
stability => required,
depends_on => [stream_queue]
}}).

Expand All @@ -81,31 +73,25 @@
-rabbit_feature_flag(
{direct_exchange_routing_v2,
#{desc => "v2 direct exchange routing implementation",
stability => stable,
depends_on => [feature_flags_v2, implicit_default_bindings],
callbacks => #{enable => {?MODULE, direct_exchange_routing_v2_enable}}
%%TODO remove compatibility code
stability => required,
depends_on => [feature_flags_v2, implicit_default_bindings]
}}).

-rabbit_feature_flag(
{listener_records_in_ets,
#{desc => "Store listener records in ETS instead of Mnesia",
stability => stable,
depends_on => [feature_flags_v2],
callbacks => #{enable =>
{?MODULE, listener_records_in_ets_enable},
post_enable =>
{?MODULE, listener_records_in_ets_post_enable}}
%%TODO remove compatibility code
stability => required,
depends_on => [feature_flags_v2]
}}).

-rabbit_feature_flag(
{tracking_records_in_ets,
#{desc => "Store tracking records in ETS instead of Mnesia",
stability => stable,
depends_on => [feature_flags_v2],
callbacks => #{enable =>
{?MODULE, tracking_records_in_ets_enable},
post_enable =>
{?MODULE, tracking_records_in_ets_post_enable}}
%%TODO remove compatibility code
stability => required,
depends_on => [feature_flags_v2]
}}).

-rabbit_feature_flag(
Expand All @@ -124,129 +110,3 @@
stability => stable,
depends_on => [stream_queue]
}}).

%% -------------------------------------------------------------------
%% Direct exchange routing v2.
%% -------------------------------------------------------------------

-spec direct_exchange_routing_v2_enable(Args) -> Ret when
Args :: rabbit_feature_flags:enable_callback_args(),
Ret :: rabbit_feature_flags:enable_callback_ret().
direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
TableName = rabbit_index_route,
ok = rabbit_table:wait([rabbit_route, rabbit_exchange], _Retry = true),
try
case rabbit_db_binding:create_index_route_table() of
ok ->
ok;
{error, Err} = Error ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to add copy of table ~ts to "
"node ~tp: ~tp",
[FeatureName, TableName, node(), Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
Error
end
catch throw:{error, Reason} ->
?LOG_ERROR(
"Feature flags: `~ts`: enable callback failure: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Reason}
end.

%% -------------------------------------------------------------------
%% Listener records moved from Mnesia to ETS.
%% -------------------------------------------------------------------

listener_records_in_ets_enable(#{feature_name := FeatureName}) ->
try
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
_ = mnesia:lock({table, rabbit_listener}, read),
Listeners = mnesia:select(
rabbit_listener, [{'$1',[],['$1']}]),
lists:foreach(
fun(Listener) ->
ets:insert(rabbit_listener_ets, Listener)
end, Listeners)
end)
catch
throw:{error, {no_exists, rabbit_listener}} ->
ok;
throw:{error, Reason} ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to migrate Mnesia table: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Reason}
end.

listener_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
try
case mnesia:delete_table(rabbit_listener) of
{atomic, ok} ->
ok;
{aborted, {no_exists, _}} ->
ok;
{aborted, Err} ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to delete Mnesia table: ~tp",
[FeatureName, Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
ok
end
catch
throw:{error, Reason} ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to delete Mnesia table: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
ok
end.

tracking_records_in_ets_enable(#{feature_name := FeatureName}) ->
try
rabbit_connection_tracking:migrate_tracking_records(),
rabbit_channel_tracking:migrate_tracking_records()
catch
throw:{error, {no_exists, _}} ->
ok;
throw:{error, Reason} ->
?LOG_ERROR(
"Enabling feature flag ~ts failed: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Reason}
end.

tracking_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
try
[delete_table(FeatureName, Tab) ||
Tab <- rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(node())],
[delete_table(FeatureName, Tab) ||
Tab <- rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(node())]
catch
throw:{error, Reason} ->
?LOG_ERROR(
"Enabling feature flag ~ts failed: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
%% adheres to the callback interface
ok
end.

delete_table(FeatureName, Tab) ->
case mnesia:delete_table(Tab) of
{atomic, ok} ->
ok;
{aborted, {no_exists, _}} ->
ok;
{aborted, Err} ->
?LOG_ERROR(
"Enabling feature flag ~ts failed to delete mnesia table ~tp: ~tp",
[FeatureName, Tab, Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
%% adheres to the callback interface
ok
end.
48 changes: 0 additions & 48 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

-export([recover/0, recover/1]).

-export([create_index_route_table/0]).

%% For testing
-export([clear/0]).

Expand Down Expand Up @@ -472,52 +470,6 @@ recover_in_mnesia(RecoverFun) ->
source = Src}} = Route <-
rabbit_mnesia:dirty_read_all(?MNESIA_SEMI_DURABLE_TABLE)].

%% -------------------------------------------------------------------
%% create_index_route_table().
%% -------------------------------------------------------------------

-spec create_index_route_table() -> ok | {error, any()}.
create_index_route_table() ->
rabbit_db:run(
#{mnesia => fun() -> create_index_route_table_in_mnesia() end
}).

create_index_route_table_in_mnesia() ->
DependantTables = [?MNESIA_TABLE, rabbit_exchange],
ok = rabbit_table:wait(DependantTables, _Retry = true),
[ok = rabbit_table:create_local_copy(Tab, ram_copies) || Tab <- DependantTables],
ok = rabbit_table:create(
?MNESIA_INDEX_TABLE, rabbit_table:rabbit_index_route_definition()),
case rabbit_table:ensure_table_copy(?MNESIA_INDEX_TABLE, node(), ram_copies) of
ok ->
ok = populate_index_route_table_in_mnesia();
Error ->
Error
end.

populate_index_route_table_in_mnesia() ->
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
_ = mnesia:lock({table, ?MNESIA_TABLE}, read),
_ = mnesia:lock({table, rabbit_exchange}, read),
_ = mnesia:lock({table, ?MNESIA_INDEX_TABLE}, write),
Routes = rabbit_mnesia:dirty_read_all(?MNESIA_TABLE),
lists:foreach(fun(#route{binding = #binding{source = Exchange}} = Route) ->
case rabbit_db_exchange:get(Exchange) of
{ok, X} ->
case should_index_table(X) of
true ->
mnesia:dirty_write(?MNESIA_INDEX_TABLE,
rabbit_binding:index_route(Route));
false ->
ok
end;
_ ->
ok
end
end, Routes)
end).

%% -------------------------------------------------------------------
%% delete_all_for_exchange_in_mnesia().
%% -------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_table.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-export([
create/0, create/2, ensure_local_copies/1, ensure_table_copy/3,
create_local_copy/2, wait_for_replicated/1, wait/1, wait/2,
wait_for_replicated/1, wait/1, wait/2,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
wait_for_replicated/0, exists/1]).
Expand Down
Loading

0 comments on commit 5045fce

Please sign in to comment.