diff --git a/deps/rabbit/src/rabbit_channel_tracking.erl b/deps/rabbit/src/rabbit_channel_tracking.erl index 0df63fddab10..bbdd2d616999 100644 --- a/deps/rabbit/src/rabbit_channel_tracking.erl +++ b/deps/rabbit/src/rabbit_channel_tracking.erl @@ -29,16 +29,14 @@ -export([list/0, list_of_user/1, list_on_node/1, tracked_channel_table_name_for/1, tracked_channel_per_user_table_name_for/1, - get_all_tracked_channel_table_names_for_node/1, ensure_tracked_tables_for_this_node/0, delete_tracked_channel_user_entry/1]). -%% All nodes (that support the `tracking_records_in_ets' feature) must -%% export this function with the same spec, as they are called via -%% RPC from other nodes. (Their implementation can differ.) -export([count_local_tracked_items_of_user/1]). --export([migrate_tracking_records/0]). +-ifdef(TEST). +-export([get_all_tracked_channel_table_names_for_node/1]). +-endif. -include_lib("rabbit_common/include/rabbit.hrl"). @@ -464,27 +462,3 @@ close_channels(TrackedChannels = [#tracked_channel{}|_]) -> || #tracked_channel{pid = ChPid} <- TrackedChannels], ok; close_channels(_TrackedChannels = []) -> ok. - -migrate_tracking_records() -> - Node = node(), - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - Table = tracked_channel_table_name_for(Node), - _ = mnesia:lock({table, Table}, read), - Channels = mnesia:select(Table, [{'$1',[],['$1']}]), - lists:foreach( - fun(Channel) -> - ets:insert(tracked_channel, Channel) - end, Channels) - end), - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - Table = tracked_channel_per_user_table_name_for(Node), - _ = mnesia:lock({table, Table}, read), - Channels = mnesia:select(Table, [{'$1',[],['$1']}]), - lists:foreach( - fun(#tracked_channel_per_user{channel_count = C, - user = Username}) -> - ets:update_counter(tracked_channel_per_user, Username, C, {Username, 0}) - end, Channels) - end). diff --git a/deps/rabbit/src/rabbit_connection_tracking.erl b/deps/rabbit/src/rabbit_connection_tracking.erl index 607a63474ca1..1a85b1341524 100644 --- a/deps/rabbit/src/rabbit_connection_tracking.erl +++ b/deps/rabbit/src/rabbit_connection_tracking.erl @@ -29,7 +29,6 @@ -export([tracked_connection_table_name_for/1, tracked_connection_per_vhost_table_name_for/1, tracked_connection_per_user_table_name_for/1, - get_all_tracked_connection_table_names_for_node/1, clear_tracked_connection_tables_for_this_node/0, ensure_tracked_tables_for_this_node/0, @@ -41,13 +40,12 @@ tracked_connection_from_connection_state/1, lookup/1, count/0]). -%% All nodes (that support the `tracking_records_in_ets' feature) must -%% export these functions with the same spec, as they are called via -%% RPC from other nodes. (Their implementation can differ.) -export([count_local_tracked_items_in_vhost/1, count_local_tracked_items_of_user/1]). --export([migrate_tracking_records/0]). +-ifdef(TEST). +-export([get_all_tracked_connection_table_names_for_node/1]). +-endif. -include_lib("rabbit_common/include/rabbit.hrl"). @@ -728,38 +726,3 @@ close_connection(#tracked_connection{pid = Pid}, Message) -> % best effort, this will work for connections to the stream plugin Node = node(Pid), rpc:call(Node, gen_server, call, [Pid, {shutdown, Message}, infinity]). - -migrate_tracking_records() -> - Node = node(), - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - Table = tracked_connection_table_name_for(Node), - _ = mnesia:lock({table, Table}, read), - Connections = mnesia:select(Table, [{'$1',[],['$1']}]), - lists:foreach( - fun(Connection) -> - ets:insert(tracked_connection, Connection) - end, Connections) - end), - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - Table = tracked_connection_per_user_table_name_for(Node), - _ = mnesia:lock({table, Table}, read), - Connections = mnesia:select(Table, [{'$1',[],['$1']}]), - lists:foreach( - fun(#tracked_connection_per_user{connection_count = C, - user = Username}) -> - ets:update_counter(tracked_connection_per_user, Username, C, {Username, 0}) - end, Connections) - end), - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - Table = tracked_connection_per_vhost_table_name_for(Node), - _ = mnesia:lock({table, Table}, read), - Connections = mnesia:select(Table, [{'$1',[],['$1']}]), - lists:foreach( - fun(#tracked_connection_per_vhost{connection_count = C, - vhost = VHost}) -> - ets:update_counter(tracked_connection_per_vhost, VHost, C, {VHost, 0}) - end, Connections) - end). diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 1a816d03a585..8cd06f24b5bb 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -7,20 +7,11 @@ -module(rabbit_core_ff). --include_lib("kernel/include/logger.hrl"). - --include_lib("rabbit_common/include/logging.hrl"). - --export([direct_exchange_routing_v2_enable/1, - listener_records_in_ets_enable/1, - listener_records_in_ets_post_enable/1, - tracking_records_in_ets_enable/1, - tracking_records_in_ets_post_enable/1]). - -rabbit_feature_flag( {classic_mirrored_queue_version, #{desc => "Support setting version for classic mirrored queues", - stability => stable + %%TODO remove compatibility code + stability => required }}). -rabbit_feature_flag( @@ -68,7 +59,8 @@ {stream_single_active_consumer, #{desc => "Single active consumer for streams", doc_url => "https://www.rabbitmq.com/stream.html", - stability => stable, + %%TODO remove compatibility code + stability => required, depends_on => [stream_queue] }}). @@ -81,31 +73,25 @@ -rabbit_feature_flag( {direct_exchange_routing_v2, #{desc => "v2 direct exchange routing implementation", - stability => stable, - depends_on => [feature_flags_v2, implicit_default_bindings], - callbacks => #{enable => {?MODULE, direct_exchange_routing_v2_enable}} + %%TODO remove compatibility code + stability => required, + depends_on => [feature_flags_v2, implicit_default_bindings] }}). -rabbit_feature_flag( {listener_records_in_ets, #{desc => "Store listener records in ETS instead of Mnesia", - stability => stable, - depends_on => [feature_flags_v2], - callbacks => #{enable => - {?MODULE, listener_records_in_ets_enable}, - post_enable => - {?MODULE, listener_records_in_ets_post_enable}} + %%TODO remove compatibility code + stability => required, + depends_on => [feature_flags_v2] }}). -rabbit_feature_flag( {tracking_records_in_ets, #{desc => "Store tracking records in ETS instead of Mnesia", - stability => stable, - depends_on => [feature_flags_v2], - callbacks => #{enable => - {?MODULE, tracking_records_in_ets_enable}, - post_enable => - {?MODULE, tracking_records_in_ets_post_enable}} + %%TODO remove compatibility code + stability => required, + depends_on => [feature_flags_v2] }}). -rabbit_feature_flag( @@ -124,129 +110,3 @@ stability => stable, depends_on => [stream_queue] }}). - -%% ------------------------------------------------------------------- -%% Direct exchange routing v2. -%% ------------------------------------------------------------------- - --spec direct_exchange_routing_v2_enable(Args) -> Ret when - Args :: rabbit_feature_flags:enable_callback_args(), - Ret :: rabbit_feature_flags:enable_callback_ret(). -direct_exchange_routing_v2_enable(#{feature_name := FeatureName}) -> - TableName = rabbit_index_route, - ok = rabbit_table:wait([rabbit_route, rabbit_exchange], _Retry = true), - try - case rabbit_db_binding:create_index_route_table() of - ok -> - ok; - {error, Err} = Error -> - ?LOG_ERROR( - "Feature flags: `~ts`: failed to add copy of table ~ts to " - "node ~tp: ~tp", - [FeatureName, TableName, node(), Err], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - Error - end - catch throw:{error, Reason} -> - ?LOG_ERROR( - "Feature flags: `~ts`: enable callback failure: ~tp", - [FeatureName, Reason], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - {error, Reason} - end. - -%% ------------------------------------------------------------------- -%% Listener records moved from Mnesia to ETS. -%% ------------------------------------------------------------------- - -listener_records_in_ets_enable(#{feature_name := FeatureName}) -> - try - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - _ = mnesia:lock({table, rabbit_listener}, read), - Listeners = mnesia:select( - rabbit_listener, [{'$1',[],['$1']}]), - lists:foreach( - fun(Listener) -> - ets:insert(rabbit_listener_ets, Listener) - end, Listeners) - end) - catch - throw:{error, {no_exists, rabbit_listener}} -> - ok; - throw:{error, Reason} -> - ?LOG_ERROR( - "Feature flags: `~ts`: failed to migrate Mnesia table: ~tp", - [FeatureName, Reason], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - {error, Reason} - end. - -listener_records_in_ets_post_enable(#{feature_name := FeatureName}) -> - try - case mnesia:delete_table(rabbit_listener) of - {atomic, ok} -> - ok; - {aborted, {no_exists, _}} -> - ok; - {aborted, Err} -> - ?LOG_ERROR( - "Feature flags: `~ts`: failed to delete Mnesia table: ~tp", - [FeatureName, Err], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - ok - end - catch - throw:{error, Reason} -> - ?LOG_ERROR( - "Feature flags: `~ts`: failed to delete Mnesia table: ~tp", - [FeatureName, Reason], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - ok - end. - -tracking_records_in_ets_enable(#{feature_name := FeatureName}) -> - try - rabbit_connection_tracking:migrate_tracking_records(), - rabbit_channel_tracking:migrate_tracking_records() - catch - throw:{error, {no_exists, _}} -> - ok; - throw:{error, Reason} -> - ?LOG_ERROR( - "Enabling feature flag ~ts failed: ~tp", - [FeatureName, Reason], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - {error, Reason} - end. - -tracking_records_in_ets_post_enable(#{feature_name := FeatureName}) -> - try - [delete_table(FeatureName, Tab) || - Tab <- rabbit_connection_tracking:get_all_tracked_connection_table_names_for_node(node())], - [delete_table(FeatureName, Tab) || - Tab <- rabbit_channel_tracking:get_all_tracked_channel_table_names_for_node(node())] - catch - throw:{error, Reason} -> - ?LOG_ERROR( - "Enabling feature flag ~ts failed: ~tp", - [FeatureName, Reason], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - %% adheres to the callback interface - ok - end. - -delete_table(FeatureName, Tab) -> - case mnesia:delete_table(Tab) of - {atomic, ok} -> - ok; - {aborted, {no_exists, _}} -> - ok; - {aborted, Err} -> - ?LOG_ERROR( - "Enabling feature flag ~ts failed to delete mnesia table ~tp: ~tp", - [FeatureName, Tab, Err], - #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), - %% adheres to the callback interface - ok - end. diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index b653cc834a5d..e7d63906c925 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -32,8 +32,6 @@ -export([recover/0, recover/1]). --export([create_index_route_table/0]). - %% For testing -export([clear/0]). @@ -472,52 +470,6 @@ recover_in_mnesia(RecoverFun) -> source = Src}} = Route <- rabbit_mnesia:dirty_read_all(?MNESIA_SEMI_DURABLE_TABLE)]. -%% ------------------------------------------------------------------- -%% create_index_route_table(). -%% ------------------------------------------------------------------- - --spec create_index_route_table() -> ok | {error, any()}. -create_index_route_table() -> - rabbit_db:run( - #{mnesia => fun() -> create_index_route_table_in_mnesia() end - }). - -create_index_route_table_in_mnesia() -> - DependantTables = [?MNESIA_TABLE, rabbit_exchange], - ok = rabbit_table:wait(DependantTables, _Retry = true), - [ok = rabbit_table:create_local_copy(Tab, ram_copies) || Tab <- DependantTables], - ok = rabbit_table:create( - ?MNESIA_INDEX_TABLE, rabbit_table:rabbit_index_route_definition()), - case rabbit_table:ensure_table_copy(?MNESIA_INDEX_TABLE, node(), ram_copies) of - ok -> - ok = populate_index_route_table_in_mnesia(); - Error -> - Error - end. - -populate_index_route_table_in_mnesia() -> - rabbit_mnesia:execute_mnesia_transaction( - fun () -> - _ = mnesia:lock({table, ?MNESIA_TABLE}, read), - _ = mnesia:lock({table, rabbit_exchange}, read), - _ = mnesia:lock({table, ?MNESIA_INDEX_TABLE}, write), - Routes = rabbit_mnesia:dirty_read_all(?MNESIA_TABLE), - lists:foreach(fun(#route{binding = #binding{source = Exchange}} = Route) -> - case rabbit_db_exchange:get(Exchange) of - {ok, X} -> - case should_index_table(X) of - true -> - mnesia:dirty_write(?MNESIA_INDEX_TABLE, - rabbit_binding:index_route(Route)); - false -> - ok - end; - _ -> - ok - end - end, Routes) - end). - %% ------------------------------------------------------------------- %% delete_all_for_exchange_in_mnesia(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_table.erl b/deps/rabbit/src/rabbit_table.erl index a0314a235adb..ff842a71383d 100644 --- a/deps/rabbit/src/rabbit_table.erl +++ b/deps/rabbit/src/rabbit_table.erl @@ -9,7 +9,7 @@ -export([ create/0, create/2, ensure_local_copies/1, ensure_table_copy/3, - create_local_copy/2, wait_for_replicated/1, wait/1, wait/2, + wait_for_replicated/1, wait/1, wait/2, force_load/0, is_present/0, is_empty/0, needs_default_data/0, check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, wait_for_replicated/0, exists/1]). diff --git a/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl b/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl index 081461a41bf6..083ad75b66ff 100644 --- a/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl +++ b/deps/rabbit/test/direct_exchange_routing_v2_SUITE.erl @@ -6,8 +6,6 @@ -module(direct_exchange_routing_v2_SUITE). -%% Test suite for the feature flag direct_exchange_routing_v2 - -compile([export_all, nowarn_export_all]). -include_lib("common_test/include/ct.hrl"). @@ -15,7 +13,6 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). --define(FEATURE_FLAG, direct_exchange_routing_v2). -define(INDEX_TABLE_NAME, rabbit_index_route). %% in file direct_exchange_routing_v2_SUITE_data/definition.json: @@ -33,42 +30,28 @@ all() -> [ {group, cluster_size_1}, {group, cluster_size_2}, - {group, cluster_size_3}, {group, unclustered_cluster_size_2} ]. groups() -> [ {cluster_size_1, [], - [{start_feature_flag_enabled, [], [remove_binding_unbind_queue, - remove_binding_delete_queue, - remove_binding_delete_queue_multiple, - remove_binding_delete_exchange, - recover_bindings, - route_exchange_to_exchange, - reset]}, - {start_feature_flag_disabled, [], [enable_feature_flag]} - ]}, + [remove_binding_unbind_queue, + remove_binding_delete_queue, + remove_binding_delete_queue_multiple, + remove_binding_delete_exchange, + recover_bindings, + route_exchange_to_exchange, + reset]}, {cluster_size_2, [], - [{start_feature_flag_enabled, [], [ - remove_binding_node_down_transient_queue, - keep_binding_node_down_durable_queue - ]}, - {start_feature_flag_disabled, [], [enable_feature_flag_during_definition_import]} + [remove_binding_node_down_transient_queue, + keep_binding_node_down_durable_queue ]}, {unclustered_cluster_size_2, [], - [{start_feature_flag_enabled, [], [join_cluster]} - ]}, - {cluster_size_3, [], - [{start_feature_flag_disabled, [], [enable_feature_flag_during_binding_churn]} - ]} + [join_cluster]} ]. -suite() -> - [ - %% If a test hangs, no need to wait for 30 minutes. - {timetrap, {minutes, 8}} - ]. +suite() -> [{timetrap, {minutes, 3}}]. init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), @@ -77,65 +60,32 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(cluster_size_1, Config) -> - rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 1}); -init_per_group(cluster_size_2, Config) -> - rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 2}); -init_per_group(cluster_size_3, Config) -> - rabbit_ct_helpers:set_config(Config, {rmq_nodes_count, 3}); -init_per_group(unclustered_cluster_size_2, Config0) -> - case rabbit_ct_helpers:is_mixed_versions() of - true -> - {skip, "This test group won't work in mixed mode with pre 3.11 releases"}; - false -> - rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 2}, - {rmq_nodes_clustered, false}]) - end; -init_per_group(start_feature_flag_enabled = Group, Config0) -> - Config = start_broker(Group, Config0), - case rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG) of - ok -> - Config; - {skip, _} = Skip -> - end_per_group(Group, Config), - Skip - end; -init_per_group(start_feature_flag_disabled = Group, Config0) -> - Config1 = rabbit_ct_helpers:merge_app_env( - Config0, {rabbit, [{forced_feature_flags_on_init, []}]}), - Config = start_broker(Group, Config1), - case rabbit_ct_broker_helpers:is_feature_flag_supported(Config, ?FEATURE_FLAG) of - true -> - assert_no_index_table(Config), - Config; - false -> - end_per_group(Group, Config), - {skip, io_lib:format("'~ts' feature flag is unsupported", [?FEATURE_FLAG])} - end. +init_per_group(Group = cluster_size_1, Config0) -> + Config = rabbit_ct_helpers:set_config(Config0, {rmq_nodes_count, 1}), + start_broker(Group, Config); +init_per_group(Group = cluster_size_2, Config0) -> + Config = rabbit_ct_helpers:set_config(Config0, {rmq_nodes_count, 2}), + start_broker(Group, Config); +init_per_group(Group = unclustered_cluster_size_2, Config0) -> + Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 2}, + {rmq_nodes_clustered, false}]), + start_broker(Group, Config). start_broker(Group, Config0) -> - Size = rabbit_ct_helpers:get_config(Config0, rmq_nodes_count), - Clustered = rabbit_ct_helpers:get_config(Config0, rmq_nodes_clustered, true), - Config = rabbit_ct_helpers:set_config(Config0, {rmq_nodename_suffix, - io_lib:format("cluster_size_~b-clustered_~tp-~ts", - [Size, Clustered, Group])}), + Config = rabbit_ct_helpers:set_config(Config0, {rmq_nodename_suffix, Group}), rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). -end_per_group(Group, Config) - when Group =:= start_feature_flag_enabled; - Group =:= start_feature_flag_disabled -> +end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()); -end_per_group(_Group, Config) -> - Config. + rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(_TestCase, Config) -> +init_per_testcase(_, Config) -> Config. -end_per_testcase(_TestCase, Config) -> +end_per_testcase(_, Config) -> %% Test that all bindings got removed from the database. ?assertEqual([0,0,0,0,0], lists:map(fun(Table) -> @@ -366,170 +316,6 @@ route_exchange_to_exchange(Config) -> routing_key = RKey}), ok. -enable_feature_flag(Config) -> - Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - - DirectX = <<"amq.direct">>, - Q1 = <<"q1">>, - Q2 = <<"q2">>, - RKey = <<"k">>, - - declare_queue(Ch, Q1, true), - bind_queue(Ch, Q1, DirectX, RKey), - bind_queue(Ch, Q1, <<"amq.fanout">>, RKey), - - declare_queue(Ch, Q2, false), - bind_queue(Ch, Q2, DirectX, RKey), - - amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:register_confirm_handler(Ch, self()), - - %% Publishing via "direct exchange routing v1" works. - publish(Ch, DirectX, RKey), - assert_confirm(), - - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG), - - %% The feature flag migration should have created an index table with a ram copy on all nodes. - ?assertEqual(lists:sort(Nodes), index_table_ram_copies(Config, 0)), - %% The feature flag migration should have populated the index table with all bindings whose source exchange - %% is a direct exchange. - ?assertEqual([{rabbit_misc:r(<<"/">>, exchange, DirectX), RKey}], - rabbit_ct_broker_helpers:rpc(Config, 0, mnesia, dirty_all_keys, [?INDEX_TABLE_NAME])), - ?assertEqual(2, table_size(Config, ?INDEX_TABLE_NAME)), - - %% Publishing via "direct exchange routing v2" works. - publish(Ch, DirectX, RKey), - assert_confirm(), - - delete_queue(Ch, Q1), - delete_queue(Ch, Q2), - ok. - -%% Test that enabling feature flag works when bindings are imported concurrently. -enable_feature_flag_during_definition_import(Config) -> - Nodes = [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Path = filename:join([?config(data_dir, Config), "definition.json"]), - - {Pid, Ref} = spawn_monitor( - fun() -> - ct:pal("importing definitions..."), - rabbit_ct_broker_helpers:rabbitmqctl( - Config, Server1, ["import_definitions", Path] - ), - ct:pal("imported definitions") - end), - - timer:sleep(rand:uniform(400)), - ct:pal("enabling feature flag..."), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG), - ct:pal("enabled feature flag"), - - receive {'DOWN', Ref, process, Pid, normal} -> - ok - after 10_000 -> - ct:fail(timeout) - end, - - ?assertEqual(lists:sort(Nodes), index_table_ram_copies(Config, 0)), - ?assertEqual(?NUM_BINDINGS_TO_DIRECT_ECHANGE, - table_size(Config, ?INDEX_TABLE_NAME)), - - %% cleanup - delete_queue(Ch, <<"durable-q">>), - delete_queue(Ch, <<"transient-q">>), - ok. - -%% Test that enabling feature flag works when clients concurrently -%% create and delete bindings and send messages. -enable_feature_flag_during_binding_churn(Config) -> - Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - {_Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - {_Conn2, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1), - - DirectX = <<"amq.direct">>, - FanoutX = <<"amq.fanout">>, - Q = <<"q">>, - - NumMessages = 500, - BindingsDirectX = 1000, - BindingsFanoutX = 10, - - %% setup - declare_queue(Ch1, Q, true), - lists:foreach(fun(N) -> - bind_queue(Ch1, Q, DirectX, integer_to_binary(N)) - end, lists:seq(1, trunc(0.4 * BindingsDirectX))), - lists:foreach(fun(N) -> - bind_queue(Ch1, Q, FanoutX, integer_to_binary(N)) - end, lists:seq(1, BindingsFanoutX)), - lists:foreach(fun(N) -> - bind_queue(Ch1, Q, DirectX, integer_to_binary(N)) - end, lists:seq(trunc(0.4 * BindingsDirectX) + 1, trunc(0.8 * BindingsDirectX))), - - {_, Ref1} = spawn_monitor( - fun() -> - ct:pal("sending ~b messages...", [NumMessages]), - lists:foreach( - fun(_) -> - publish(Ch1, DirectX, integer_to_binary(trunc(0.8 * BindingsDirectX))), - timer:sleep(1) - end, lists:seq(1, NumMessages)), - ct:pal("sent ~b messages", [NumMessages]) - end), - {_, Ref2} = spawn_monitor( - fun() -> - ct:pal("creating bindings..."), - lists:foreach( - fun(N) -> - bind_queue(Ch1, Q, DirectX, integer_to_binary(N)), - timer:sleep(1) - end, lists:seq(trunc(0.8 * BindingsDirectX) + 1, BindingsDirectX)), - ct:pal("created bindings") - end), - {_, Ref3} = spawn_monitor( - fun() -> - ct:pal("deleting bindings..."), - lists:foreach( - fun(N) -> - unbind_queue(Ch2, Q, DirectX, integer_to_binary(N)), - timer:sleep(1) - end, lists:seq(1, trunc(0.2 * BindingsDirectX))), - ct:pal("deleted bindings") - end), - - timer:sleep(rand:uniform(300)), - ct:pal("enabling feature flag..."), - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FEATURE_FLAG), - ct:pal("enabled feature flag"), - - lists:foreach( - fun(Ref) -> - receive {'DOWN', Ref, process, _Pid, normal} -> - ok - after 300_000 -> - ct:fail(timeout) - end - end, [Ref1, Ref2, Ref3]), - - NumMessagesBin = integer_to_binary(NumMessages), - quorum_queue_utils:wait_for_messages(Config, [[Q, NumMessagesBin, NumMessagesBin, <<"0">>]]), - - ?assertEqual(lists:sort(Nodes), index_table_ram_copies(Config, 0)), - - ExpectedKeys = lists:map( - fun(N) -> - {rabbit_misc:r(<<"/">>, exchange, DirectX), integer_to_binary(N)} - end, lists:seq(trunc(0.2 * BindingsDirectX) + 1, BindingsDirectX)), - ActualKeys = rabbit_ct_broker_helpers:rpc(Config, 0, mnesia, dirty_all_keys, [?INDEX_TABLE_NAME]), - ?assertEqual(lists:sort(ExpectedKeys), lists:sort(ActualKeys)), - - %% cleanup - delete_queue(Ch1, Q), - ok. - reset(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -646,10 +432,6 @@ assert_index_table_empty(Config) -> assert_index_table_non_empty(Config) -> ?assertNotEqual(0, table_size(Config, ?INDEX_TABLE_NAME)). -assert_no_index_table(Config) -> - Tids = rabbit_ct_broker_helpers:rpc_all(Config, ets, whereis, [?INDEX_TABLE_NAME]), - ?assert(lists:all(fun(Tid) -> Tid =:= undefined end, Tids)). - index_table_ram_copies(Config, Node) -> RamCopies = rabbit_ct_broker_helpers:rpc(Config, Node, mnesia, table_info, [?INDEX_TABLE_NAME, ram_copies]), diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 5fc44a6be16f..55f7fa6e7696 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -1,9 +1,6 @@ -module(queue_type_SUITE). --compile(export_all). - --export([ - ]). +-compile([export_all, nowarn_export_all]). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl index 9f0edee30f43..4bc47db143b5 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_ff.erl @@ -10,11 +10,13 @@ -rabbit_feature_flag( {empty_basic_get_metric, #{desc => "Count AMQP `basic.get` on empty queues in stats", - stability => stable + %%TODO remove compatibility code + stability => required }}). -rabbit_feature_flag( {drop_unroutable_metric, #{desc => "Count unroutable publishes to be dropped in stats", - stability => stable + %%TODO remove compatibility code + stability => required }}). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 7a249227cb6e..1dc11fb8c553 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -33,8 +33,7 @@ all() -> groups() -> [{single_node, [], - [sac_ff, %% must stay at the top, stream sac feature flag disabled for this one - test_stream, + [test_stream, test_stream_tls, test_gc_consumers, test_gc_publishers, @@ -130,19 +129,6 @@ end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(_TestCase, Config) -> - Config. - -end_per_testcase(sac_ff, Config) -> - rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_feature_flags, - enable, - [stream_single_active_consumer]), - ok; -end_per_testcase(_Test, _Config) -> - ok. - test_global_counters(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(gen_tcp, Stream, Config), @@ -360,41 +346,6 @@ timeout_close_sent(Config) -> % Now, rabbit_stream_reader is in state close_sent. ?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)). -sac_ff(Config) -> - Port = get_stream_port(Config), - {ok, S} = - gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]), - C = rabbit_stream_core:init(0), - test_peer_properties(gen_tcp, S, C), - test_authenticate(gen_tcp, S, C), - Stream = atom_to_binary(?FUNCTION_NAME, utf8), - test_create_stream(gen_tcp, S, Stream, C), - test_declare_publisher(gen_tcp, S, 1, Stream, C), - ?awaitMatch(#{publishers := 1}, get_global_counters(Config), ?WAIT), - Body = <<"hello">>, - test_publish_confirm(gen_tcp, S, 1, Body, C), - - SubscriptionId = 42, - SubCmd = - {request, 1, - {subscribe, - SubscriptionId, - Stream, - 0, - 10, - #{<<"single-active-consumer">> => <<"true">>, - <<"name">> => <<"foo">>}}}, - SubscribeFrame = rabbit_stream_core:frame(SubCmd), - ok = gen_tcp:send(S, SubscribeFrame), - {Cmd, C} = receive_commands(gen_tcp, S, C), - ?assertMatch({response, 1, - {subscribe, ?RESPONSE_CODE_PRECONDITION_FAILED}}, - Cmd), - test_delete_stream(gen_tcp, S, Stream, C), - test_close(gen_tcp, S, C), - closed = wait_for_socket_close(gen_tcp, S, 10), - ok. - max_segment_size_bytes_validation(Config) -> Transport = gen_tcp, Port = get_stream_port(Config), diff --git a/rabbitmq.bzl b/rabbitmq.bzl index 9c0eded22383..d6c96804462d 100644 --- a/rabbitmq.bzl +++ b/rabbitmq.bzl @@ -267,22 +267,16 @@ def rabbitmq_integration_suite( data = data, test_env = dict({ "SKIP_MAKE_TEST_DIST": "true", - # The feature flags listed below are required. This means they must - # be enabled in mixed-version testing before even starting cluster - # because newer node don't have the corresponding - # compatibility/migration code. - # - # Starting from 3.11.0: - # quorum_queue - # implicit_default_bindings - # virtual_host_metadata - # maintenance_mode_status - # user_limits - # Starting from 3.12.0: - # feature_flags_v2 - # stream_queue - # classic_queue_type_delivery_support - "RABBITMQ_FEATURE_FLAGS": "quorum_queue,implicit_default_bindings,virtual_host_metadata,maintenance_mode_status,user_limits,feature_flags_v2,stream_queue,classic_queue_type_delivery_support", + # The feature flags listed below are required. This means they must be enabled in mixed-version testing + # before even starting the cluster because newer nodes don't have the corresponding compatibility/migration code. + "RABBITMQ_FEATURE_FLAGS": + # required starting from 3.11.0 in rabbit: + "quorum_queue,implicit_default_bindings,virtual_host_metadata,maintenance_mode_status,user_limits," + + # required starting from 3.12.0 in rabbit: + "feature_flags_v2,stream_queue,classic_queue_type_delivery_support,classic_mirrored_queue_version," + + "stream_single_active_consumer,direct_exchange_routing_v2,listener_records_in_ets,tracking_records_in_ets", + # required starting from 3.12.0 in rabbitmq_management_agent: + # empty_basic_get_metric, drop_unroutable_metric "RABBITMQ_RUN": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/rabbitmq-for-tests-run".format(package), "RABBITMQCTL": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmqctl".format(package), "RABBITMQ_PLUGINS": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmq-plugins".format(package),