Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require all feature flags introduced before 3.11.1 #7219

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 3 additions & 29 deletions deps/rabbit/src/rabbit_channel_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@
-export([list/0, list_of_user/1, list_on_node/1,
tracked_channel_table_name_for/1,
tracked_channel_per_user_table_name_for/1,
get_all_tracked_channel_table_names_for_node/1,
ensure_tracked_tables_for_this_node/0,
delete_tracked_channel_user_entry/1]).

%% All nodes (that support the `tracking_records_in_ets' feature) must
%% export this function with the same spec, as they are called via
%% RPC from other nodes. (Their implementation can differ.)
-export([count_local_tracked_items_of_user/1]).

-export([migrate_tracking_records/0]).
-ifdef(TEST).
-export([get_all_tracked_channel_table_names_for_node/1]).
-endif.

-include_lib("rabbit_common/include/rabbit.hrl").

Expand Down Expand Up @@ -464,27 +462,3 @@ close_channels(TrackedChannels = [#tracked_channel{}|_]) ->
|| #tracked_channel{pid = ChPid} <- TrackedChannels],
ok;
close_channels(_TrackedChannels = []) -> ok.

migrate_tracking_records() ->
Node = node(),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_channel_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Channels = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(Channel) ->
ets:insert(tracked_channel, Channel)
end, Channels)
end),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_channel_per_user_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Channels = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(#tracked_channel_per_user{channel_count = C,
user = Username}) ->
ets:update_counter(tracked_channel_per_user, Username, C, {Username, 0})
end, Channels)
end).
43 changes: 3 additions & 40 deletions deps/rabbit/src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
-export([tracked_connection_table_name_for/1,
tracked_connection_per_vhost_table_name_for/1,
tracked_connection_per_user_table_name_for/1,
get_all_tracked_connection_table_names_for_node/1,
clear_tracked_connection_tables_for_this_node/0,

ensure_tracked_tables_for_this_node/0,
Expand All @@ -41,13 +40,12 @@
tracked_connection_from_connection_state/1,
lookup/1, count/0]).

%% All nodes (that support the `tracking_records_in_ets' feature) must
%% export these functions with the same spec, as they are called via
%% RPC from other nodes. (Their implementation can differ.)
-export([count_local_tracked_items_in_vhost/1,
count_local_tracked_items_of_user/1]).

-export([migrate_tracking_records/0]).
-ifdef(TEST).
-export([get_all_tracked_connection_table_names_for_node/1]).
-endif.

-include_lib("rabbit_common/include/rabbit.hrl").

Expand Down Expand Up @@ -728,38 +726,3 @@ close_connection(#tracked_connection{pid = Pid}, Message) ->
% best effort, this will work for connections to the stream plugin
Node = node(Pid),
rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]).

migrate_tracking_records() ->
Node = node(),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_connection_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(Connection) ->
ets:insert(tracked_connection, Connection)
end, Connections)
end),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_connection_per_user_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(#tracked_connection_per_user{connection_count = C,
user = Username}) ->
ets:update_counter(tracked_connection_per_user, Username, C, {Username, 0})
end, Connections)
end),
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
Table = tracked_connection_per_vhost_table_name_for(Node),
_ = mnesia:lock({table, Table}, read),
Connections = mnesia:select(Table, [{'$1',[],['$1']}]),
lists:foreach(
fun(#tracked_connection_per_vhost{connection_count = C,
vhost = VHost}) ->
ets:update_counter(tracked_connection_per_vhost, VHost, C, {VHost, 0})
end, Connections)
end).
166 changes: 13 additions & 153 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,11 @@

-module(rabbit_core_ff).

-include_lib("kernel/include/logger.hrl").

-include_lib("rabbit_common/include/logging.hrl").

-export([direct_exchange_routing_v2_enable/1,
listener_records_in_ets_enable/1,
listener_records_in_ets_post_enable/1,
tracking_records_in_ets_enable/1,
tracking_records_in_ets_post_enable/1]).

-rabbit_feature_flag(
{classic_mirrored_queue_version,
#{desc => "Support setting version for classic mirrored queues",
stability => stable
%%TODO remove compatibility code
stability => required
}}).

-rabbit_feature_flag(
Expand Down Expand Up @@ -68,7 +59,8 @@
{stream_single_active_consumer,
#{desc => "Single active consumer for streams",
doc_url => "https://www.rabbitmq.com/stream.html",
stability => stable,
%%TODO remove compatibility code
stability => required,
depends_on => [stream_queue]
}}).

Expand All @@ -81,31 +73,25 @@
-rabbit_feature_flag(
{direct_exchange_routing_v2,
#{desc => "v2 direct exchange routing implementation",
stability => stable,
depends_on => [feature_flags_v2, implicit_default_bindings],
callbacks => #{enable => {?MODULE, direct_exchange_routing_v2_enable}}
%%TODO remove compatibility code
stability => required,
depends_on => [feature_flags_v2, implicit_default_bindings]
}}).

-rabbit_feature_flag(
{listener_records_in_ets,
#{desc => "Store listener records in ETS instead of Mnesia",
stability => stable,
depends_on => [feature_flags_v2],
callbacks => #{enable =>
{?MODULE, listener_records_in_ets_enable},
post_enable =>
{?MODULE, listener_records_in_ets_post_enable}}
%%TODO remove compatibility code
stability => required,
depends_on => [feature_flags_v2]
}}).

-rabbit_feature_flag(
{tracking_records_in_ets,
#{desc => "Store tracking records in ETS instead of Mnesia",
stability => stable,
depends_on => [feature_flags_v2],
callbacks => #{enable =>
{?MODULE, tracking_records_in_ets_enable},
post_enable =>
{?MODULE, tracking_records_in_ets_post_enable}}
%%TODO remove compatibility code
stability => required,
depends_on => [feature_flags_v2]
}}).

-rabbit_feature_flag(
Expand All @@ -124,129 +110,3 @@
stability => stable,
depends_on => [stream_queue]
}}).

%% -------------------------------------------------------------------
%% Direct exchange routing v2.
%% -------------------------------------------------------------------

-spec direct_exchange_routing_v2_enable(Args) -> Ret when
Args :: rabbit_feature_flags:enable_callback_args(),
Ret :: rabbit_feature_flags:enable_callback_ret().
direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) ->
TableName = rabbit_index_route,
ok = rabbit_table:wait([rabbit_route, rabbit_exchange], _Retry = true),
try
case rabbit_db_binding:create_index_route_table() of
ok ->
ok;
{error, Err} = Error ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to add copy of table ~ts to "
"node ~tp: ~tp",
[FeatureName, TableName, node(), Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
Error
end
catch throw:{error, Reason} ->
?LOG_ERROR(
"Feature flags: `~ts`: enable callback failure: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Reason}
end.

%% -------------------------------------------------------------------
%% Listener records moved from Mnesia to ETS.
%% -------------------------------------------------------------------

listener_records_in_ets_enable(#{feature_name := FeatureName}) ->
try
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
_ = mnesia:lock({table, rabbit_listener}, read),
Listeners = mnesia:select(
rabbit_listener, [{'$1',[],['$1']}]),
lists:foreach(
fun(Listener) ->
ets:insert(rabbit_listener_ets, Listener)
end, Listeners)
end)
catch
throw:{error, {no_exists, rabbit_listener}} ->
ok;
throw:{error, Reason} ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to migrate Mnesia table: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Reason}
end.

listener_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
try
case mnesia:delete_table(rabbit_listener) of
{atomic, ok} ->
ok;
{aborted, {no_exists, _}} ->
ok;
{aborted, Err} ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to delete Mnesia table: ~tp",
[FeatureName, Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
ok
end
catch
throw:{error, Reason} ->
?LOG_ERROR(
"Feature flags: `~ts`: failed to delete Mnesia table: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
ok
end.

tracking_records_in_ets_enable(#{feature_name := FeatureName}) ->
try
rabbit_connection_tracking:migrate_tracking_records(),
rabbit_channel_tracking:migrate_tracking_records()
catch
throw:{error, {no_exists, _}} ->
ok;
throw:{error, Reason} ->
?LOG_ERROR(
"Enabling feature flag ~ts failed: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
{error, Reason}
end.

tracking_records_in_ets_post_enable(#{feature_name := FeatureName}) ->
try
[delete_table(FeatureName, Tab) ||
Tab <- rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(node())],
[delete_table(FeatureName, Tab) ||
Tab <- rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(node())]
catch
throw:{error, Reason} ->
?LOG_ERROR(
"Enabling feature flag ~ts failed: ~tp",
[FeatureName, Reason],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
%% adheres to the callback interface
ok
end.

delete_table(FeatureName, Tab) ->
case mnesia:delete_table(Tab) of
{atomic, ok} ->
ok;
{aborted, {no_exists, _}} ->
ok;
{aborted, Err} ->
?LOG_ERROR(
"Enabling feature flag ~ts failed to delete mnesia table ~tp: ~tp",
[FeatureName, Tab, Err],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
%% adheres to the callback interface
ok
end.
48 changes: 0 additions & 48 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@

-export([recover/0, recover/1]).

-export([create_index_route_table/0]).

%% For testing
-export([clear/0]).

Expand Down Expand Up @@ -472,52 +470,6 @@ recover_in_mnesia(RecoverFun) ->
source = Src}} = Route <-
rabbit_mnesia:dirty_read_all(?MNESIA_SEMI_DURABLE_TABLE)].

%% -------------------------------------------------------------------
%% create_index_route_table().
%% -------------------------------------------------------------------

-spec create_index_route_table() -> ok | {error, any()}.
create_index_route_table() ->
rabbit_db:run(
#{mnesia => fun() -> create_index_route_table_in_mnesia() end
}).

create_index_route_table_in_mnesia() ->
DependantTables = [?MNESIA_TABLE, rabbit_exchange],
ok = rabbit_table:wait(DependantTables, _Retry = true),
[ok = rabbit_table:create_local_copy(Tab, ram_copies) || Tab <- DependantTables],
ok = rabbit_table:create(
?MNESIA_INDEX_TABLE, rabbit_table:rabbit_index_route_definition()),
case rabbit_table:ensure_table_copy(?MNESIA_INDEX_TABLE, node(), ram_copies) of
ok ->
ok = populate_index_route_table_in_mnesia();
Error ->
Error
end.

populate_index_route_table_in_mnesia() ->
rabbit_mnesia:execute_mnesia_transaction(
fun () ->
_ = mnesia:lock({table, ?MNESIA_TABLE}, read),
_ = mnesia:lock({table, rabbit_exchange}, read),
_ = mnesia:lock({table, ?MNESIA_INDEX_TABLE}, write),
Routes = rabbit_mnesia:dirty_read_all(?MNESIA_TABLE),
lists:foreach(fun(#route{binding = #binding{source = Exchange}} = Route) ->
case rabbit_db_exchange:get(Exchange) of
{ok, X} ->
case should_index_table(X) of
true ->
mnesia:dirty_write(?MNESIA_INDEX_TABLE,
rabbit_binding:index_route(Route));
false ->
ok
end;
_ ->
ok
end
end, Routes)
end).

%% -------------------------------------------------------------------
%% delete_all_for_exchange_in_mnesia().
%% -------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_table.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-export([
create/0, create/2, ensure_local_copies/1, ensure_table_copy/3,
create_local_copy/2, wait_for_replicated/1, wait/1, wait/2,
wait_for_replicated/1, wait/1, wait/2,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
wait_for_replicated/0, exists/1]).
Expand Down
Loading