From 59be93cfc050e9f70dc477fe76d9dcf1dec590d1 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 10 Jul 2024 11:40:24 +0200 Subject: [PATCH 1/3] Require all stable feature flags added up to 3.13.0 Since feature flag `message_containers` introduced in 3.13.0 is required in 4.0, we can also require all other feature flags introduced in or before 3.13.0 and remove their compatibility code for 4.0: * restart_streams * stream_sac_coordinator_unblock_group * stream_filtering * stream_update_config_command --- deps/rabbit/src/rabbit_core_ff.erl | 8 +- deps/rabbit/src/rabbit_stream_coordinator.erl | 48 +++++------ deps/rabbit/src/rabbit_stream_queue.erl | 77 ++++++----------- .../src/rabbit_stream_sac_coordinator.erl | 25 +----- deps/rabbit/test/amqp_client_SUITE.erl | 1 - .../rabbit/test/rabbit_stream_queue_SUITE.erl | 83 ++++++------------- deps/rabbitmq_stomp/test/system_SUITE.erl | 7 -- .../src/rabbit_stream_reader.erl | 67 +-------------- .../src/rabbit_stream_reader.hrl | 1 - .../src/rabbit_stream_utils.erl | 12 +-- .../test/rabbit_stream_SUITE.erl | 36 +------- rabbitmq.bzl | 2 +- 12 files changed, 79 insertions(+), 288 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index dd90aba3ca33..f8b149522be3 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -101,7 +101,7 @@ {restart_streams, #{desc => "Support for restarting streams with optional preferred next leader argument." "Used to implement stream leader rebalancing", - stability => stable, + stability => required, depends_on => [stream_queue] }}). @@ -109,14 +109,14 @@ {stream_sac_coordinator_unblock_group, #{desc => "Bug fix to unblock a group of consumers in a super stream partition", doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743", - stability => stable, + stability => required, depends_on => [stream_single_active_consumer] }}). -rabbit_feature_flag( {stream_filtering, #{desc => "Support for stream filtering.", - stability => stable, + stability => required, depends_on => [stream_queue] }}). @@ -153,7 +153,7 @@ {stream_update_config_command, #{desc => "A new internal command that is used to update streams as " "part of a policy.", - stability => stable, + stability => required, depends_on => [stream_queue] }}). diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 3859875df43e..954030b98581 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -174,19 +174,14 @@ restart_stream(QRes, Options) restart_stream(Q, Options) when ?is_amqqueue(Q) andalso ?amqqueue_is_stream(Q) -> - case rabbit_feature_flags:is_enabled(restart_streams) of - true -> - rabbit_log:info("restarting stream ~s in vhost ~s with options ~p", - [maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]), - #{name := StreamId} = amqqueue:get_type_state(Q), - case process_command({restart_stream, StreamId, Options}) of - {ok, {ok, LeaderPid}, _} -> - {ok, node(LeaderPid)}; - Err -> - Err - end; - false -> - {error, {feature_flag_not_enabled, restart_stream}} + rabbit_log:info("restarting stream ~s in vhost ~s with options ~p", + [maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]), + #{name := StreamId} = amqqueue:get_type_state(Q), + case process_command({restart_stream, StreamId, Options}) of + {ok, {ok, LeaderPid}, _} -> + {ok, node(LeaderPid)}; + Err -> + Err end. delete_stream(Q, ActingUser) @@ -254,22 +249,17 @@ policy_changed(Q) when ?is_amqqueue(Q) -> {ok, ok, ra:server_id()} | {error, not_supported | term()}. update_config(Q, Config) when ?is_amqqueue(Q) andalso is_map(Config) -> - case rabbit_feature_flags:is_enabled(stream_update_config_command) of - true -> - %% there are the only a few configuration keys that are safe to - %% update - StreamId = maps:get(name, amqqueue:get_type_state(Q)), - case maps:with([filter_size, - retention, - writer_mod, - replica_mod], Config) of - Conf when map_size(Conf) > 0 -> - process_command({update_config, StreamId, Conf}); - _ -> - {error, no_updatable_keys} - end; - false -> - {error, feature_not_enabled} + %% there are the only a few configuration keys that are safe to + %% update + StreamId = maps:get(name, amqqueue:get_type_state(Q)), + case maps:with([filter_size, + retention, + writer_mod, + replica_mod], Config) of + Conf when map_size(Conf) > 0 -> + process_command({update_config, StreamId, Conf}); + _ -> + {error, no_updatable_keys} end. sac_state(#?MODULE{single_active_consumer = SacState}) -> diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index e1ec09a502f0..2a3373004683 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -96,8 +96,7 @@ soft_limit :: non_neg_integer(), slow = false :: boolean(), readers = #{} :: #{rabbit_types:ctag() => #stream{}}, - writer_id :: binary(), - filtering_supported :: boolean() + writer_id :: binary() }). -import(rabbit_queue_type_util, [args_policy_lookup/3]). @@ -286,8 +285,7 @@ consume(Q, #{no_ack := true, consume(Q, #{limiter_active := true}, _State) when ?amqqueue_is_stream(Q) -> {error, global_qos_not_supported_for_queue_type}; -consume(Q, Spec, - #stream_client{filtering_supported = FilteringSupported} = QState0) +consume(Q, Spec, #stream_client{} = QState0) when ?amqqueue_is_stream(Q) -> %% Messages should include the offset as a custom header. case get_local_pid(QState0) of @@ -307,26 +305,19 @@ consume(Q, Spec, {error, _} = Err -> Err; {ok, OffsetSpec} -> - FilterSpec = filter_spec(Args), - case {FilterSpec, FilteringSupported} of - {#{filter_spec := _}, false} -> - {protocol_error, precondition_failed, - "Filtering is not supported", []}; - _ -> - ConsumerPrefetchCount = case Mode of - {simple_prefetch, C} -> C; - _ -> 0 - end, - AckRequired = not NoAck, - rabbit_core_metrics:consumer_created( - ChPid, ConsumerTag, ExclusiveConsume, AckRequired, - QName, ConsumerPrefetchCount, false, up, Args), - %% reply needs to be sent before the stream - %% begins sending - maybe_send_reply(ChPid, OkMsg), - _ = rabbit_stream_coordinator:register_local_member_listener(Q), - begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, FilterSpec) - end + ConsumerPrefetchCount = case Mode of + {simple_prefetch, C} -> C; + _ -> 0 + end, + AckRequired = not NoAck, + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, AckRequired, + QName, ConsumerPrefetchCount, false, up, Args), + %% reply needs to be sent before the stream + %% begins sending + maybe_send_reply(ChPid, OkMsg), + _ = rabbit_stream_coordinator:register_local_member_listener(Q), + begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, filter_spec(Args)) end; {undefined, _} -> {protocol_error, precondition_failed, @@ -510,8 +501,7 @@ deliver(QSs, Msg, Options) -> lists:foldl( fun({Q, stateless}, {Qs, Actions}) -> LeaderPid = amqqueue:get_pid(Q), - ok = osiris:write(LeaderPid, - stream_message(Msg, filtering_supported())), + ok = osiris:write(LeaderPid, stream_message(Msg)), {Qs, Actions}; ({Q, S0}, {Qs, Actions0}) -> {S, Actions} = deliver0(maps:get(correlation, Options, undefined), @@ -526,11 +516,9 @@ deliver0(MsgId, Msg, next_seq = Seq, correlation = Correlation0, soft_limit = SftLmt, - slow = Slow0, - filtering_supported = FilteringSupported} = State, + slow = Slow0} = State, Actions0) -> - ok = osiris:write(LeaderPid, WriterId, Seq, - stream_message(Msg, FilteringSupported)), + ok = osiris:write(LeaderPid, WriterId, Seq, stream_message(Msg)), Correlation = case MsgId of undefined -> Correlation0; @@ -547,19 +535,14 @@ deliver0(MsgId, Msg, correlation = Correlation, slow = Slow}, Actions}. -stream_message(Msg, FilteringSupported) -> +stream_message(Msg) -> McAmqp = mc:convert(mc_amqp, Msg), MsgData = mc:protocol_state(McAmqp), - case FilteringSupported of - true -> - case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of - undefined -> - MsgData; - {utf8, Value} -> - {Value, MsgData} - end; - false -> - MsgData + case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of + undefined -> + MsgData; + {utf8, Value} -> + {Value, MsgData} end. -spec dequeue(_, _, _, _, client()) -> no_return(). @@ -936,8 +919,7 @@ init(Q) when ?is_amqqueue(Q) -> name = amqqueue:get_name(Q), leader = Leader, writer_id = WriterId, - soft_limit = SoftLimit, - filtering_supported = filtering_supported()}}; + soft_limit = SoftLimit}}; {ok, stream_not_found, _} -> {error, stream_not_found}; {error, coordinator_unavailable} = E -> @@ -1294,8 +1276,7 @@ notify_decorators(Q) when ?is_amqqueue(Q) -> resend_all(#stream_client{leader = LeaderPid, writer_id = WriterId, - correlation = Corrs, - filtering_supported = FilteringSupported} = State) -> + correlation = Corrs} = State) -> Msgs = lists:sort(maps:values(Corrs)), case Msgs of [] -> ok; @@ -1304,8 +1285,7 @@ resend_all(#stream_client{leader = LeaderPid, [Seq, maps:size(Corrs)]) end, [begin - ok = osiris:write(LeaderPid, WriterId, Seq, - stream_message(Msg, FilteringSupported)) + ok = osiris:write(LeaderPid, WriterId, Seq, stream_message(Msg)) end || {Seq, Msg} <- Msgs], State. @@ -1340,9 +1320,6 @@ list_with_minimum_quorum() -> is_stateful() -> true. -filtering_supported() -> - rabbit_feature_flags:is_enabled(stream_filtering). - get_nodes(Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), Nodes. diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 84340ef77df6..6be7a32b93b9 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -629,32 +629,12 @@ handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) -> end end. -message_type() -> - case has_unblock_group_support() of - true -> - map; - false -> - tuple - end. - notify_consumer_effect(Pid, SubId, Stream, Name, Active) -> notify_consumer_effect(Pid, SubId, Stream, Name, Active, false). notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) -> - notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()). + notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, map). -notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) -> - mod_call_effect(Pid, - {sac, - {{subscription_id, SubId}, - {active, Active}, - {extra, []}}}); -notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) -> - mod_call_effect(Pid, - {sac, - {{subscription_id, SubId}, - {active, Active}, - {extra, [{stepping_down, true}]}}}); notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) -> mod_call_effect(Pid, {sac, #{subscription_id => SubId, @@ -776,6 +756,3 @@ mod_call_effect(Pid, Msg) -> send_message(ConnectionPid, Msg) -> ConnectionPid ! Msg, ok. - -has_unblock_group_support() -> - rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group). diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 806245f82b73..0ed9c9eb1110 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -3138,7 +3138,6 @@ global_counters(Config) -> ok = amqp10_client:close_connection(Connection). stream_filtering(Config) -> - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME), Stream = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(Stream), Ch = rabbit_ct_client_helpers:open_channel(Config), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 2103e700e66c..13ef45d712ea 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -239,27 +239,10 @@ init_per_group1(Group, Config) -> {skip, _} -> Ret; Config2 -> - EnableFF = rabbit_ct_broker_helpers:enable_feature_flag( - Config2, stream_queue), - case EnableFF of - ok -> - if Clustered -> - rabbit_ct_broker_helpers:enable_feature_flag( - Config2, stream_update_config_command); - true -> - ok - end, - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, application, set_env, - [rabbit, channel_tick_interval, 100]), - Config2; - {skip, _} = Skip -> - end_per_group(Group, Config2), - Skip; - Other -> - end_per_group(Group, Config2), - {skip, Other} - end + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_tick_interval, 100]), + Config2 end. end_per_group(_, Config) -> @@ -1437,34 +1420,27 @@ tracking_status(Config) -> rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]). restart_stream(Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, restart_stream) of - ok -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - Q = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - publish_confirm(Ch, Q, [<<"msg">>]), - Vhost = ?config(rmq_vhost, Config), - QName = #resource{virtual_host = Vhost, - kind = queue, - name = Q}, - %% restart the stream - ?assertMatch({ok, _}, - rabbit_ct_broker_helpers:rpc(Config, Server, - rabbit_stream_coordinator, - ?FUNCTION_NAME, [QName])), - - publish_confirm(Ch, Q, [<<"msg2">>]), - rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]), - ok; - _ -> - ct:pal("skipping test ~s as feature flag `restart_stream` not supported", - [?FUNCTION_NAME]), - ok - end. + publish_confirm(Ch, Q, [<<"msg">>]), + Vhost = ?config(rmq_vhost, Config), + QName = #resource{virtual_host = Vhost, + kind = queue, + name = Q}, + %% restart the stream + ?assertMatch({ok, _}, + rabbit_ct_broker_helpers:rpc(Config, Server, + rabbit_stream_coordinator, + ?FUNCTION_NAME, [QName])), + + publish_confirm(Ch, Q, [<<"msg2">>]), + rabbit_ct_broker_helpers:rpc(Config, Server, ?MODULE, delete_testcase_queue, [Q]), + ok. format(Config) -> %% tests rabbit_stream_queue:format/2 @@ -2817,18 +2793,7 @@ ensure_retention_applied(Config, Server) -> rabbit_ct_broker_helpers:rpc(Config, Server, gen_server, call, [osiris_retention, test]). rebalance(Config) -> - case rabbit_ct_broker_helpers:enable_feature_flag(Config, restart_stream) of - ok -> - rebalance0(Config); - _ -> - ct:pal("skipping test ~s as feature flag `restart_stream` not supported", - [?FUNCTION_NAME]), - ok - end. - -rebalance0(Config) -> - [Server0 | _] = - rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server0 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), Q1 = <<"st1">>, diff --git a/deps/rabbitmq_stomp/test/system_SUITE.erl b/deps/rabbitmq_stomp/test/system_SUITE.erl index 55eea0e1c01f..caf6de6ddc93 100644 --- a/deps/rabbitmq_stomp/test/system_SUITE.erl +++ b/deps/rabbitmq_stomp/test/system_SUITE.erl @@ -103,13 +103,6 @@ init_per_testcase0(publish_unauthorized_error, Config) -> StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp), {ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort), rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]); -init_per_testcase0(stream_filtering, Config) -> - case rabbit_ct_helpers:is_mixed_versions() of - true -> - {skip, "mixed version clusters are not supported for stream filtering"}; - _ -> - Config - end; init_per_testcase0(_, Config) -> Config. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 4c744338ca45..7db46016ede8 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -182,8 +182,7 @@ init([KeepaliveSup, correlation_id_sequence = 0, outstanding_requests = #{}, request_timeout = RequestTimeout, - deliver_version = DeliverVersion, - filtering_supported = rabbit_stream_utils:filtering_supported()}, + deliver_version = DeliverVersion}, State = #stream_connection_state{consumers = #{}, blocked = false, @@ -549,9 +548,6 @@ increase_messages_confirmed(Counters, Count) -> rabbit_global_counters:messages_confirmed(stream, Count), atomics:add(Counters, 2, Count). -increase_messages_errored(Counters, Count) -> - atomics:add(Counters, 3, Count). - messages_consumed(Counters) -> atomics:get(Counters, 1). @@ -714,19 +710,6 @@ open(info, {OK, S, Data}, StatemData#statem_data{connection = Connection1, connection_state = State2}} end; -open(info, - {sac, {{subscription_id, SubId}, - {active, Active}, {extra, Extra}}}, - State) -> - Msg0 = #{subscription_id => SubId, - active => Active}, - Msg1 = case Extra of - [{stepping_down, true}] -> - Msg0#{stepping_down => true}; - _ -> - Msg0 - end, - open(info, {sac, Msg1}, State); open(info, {sac, #{subscription_id := SubId, active := Active} = Msg}, @@ -1783,31 +1766,6 @@ handle_frame_post_auth(Transport, {publish, PublisherId, MessageCount, Messages}) -> handle_frame_post_auth(Transport, Connection, State, {publish, ?VERSION_1, PublisherId, MessageCount, Messages}); -handle_frame_post_auth(Transport, - #stream_connection{filtering_supported = false, - publishers = Publishers, - socket = S} = Connection, - State, - {publish_v2, PublisherId, MessageCount, Messages}) -> - case Publishers of - #{PublisherId := #publisher{message_counters = Counters}} -> - increase_messages_received(Counters, MessageCount), - increase_messages_errored(Counters, MessageCount), - ok; - _ -> - ok - end, - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), - PublishingIds = publishing_ids_from_messages(?VERSION_2, Messages), - Command = {publish_error, - PublisherId, - ?RESPONSE_CODE_PRECONDITION_FAILED, - PublishingIds}, - Frame = rabbit_stream_core:frame(Command), - send(Transport, S, Frame), - {Connection, State}; handle_frame_post_auth(Transport, Connection, State, @@ -1932,29 +1890,6 @@ handle_frame_post_auth(Transport, 1), {Connection0, State} end; -handle_frame_post_auth(Transport, - #stream_connection{filtering_supported = false} = Connection, - State, - {request, CorrelationId, - {subscribe, - SubscriptionId, _, _, _, Properties}} = Request) -> - case rabbit_stream_utils:filter_defined(Properties) of - true -> - rabbit_log:warning("Cannot create subcription ~tp, it defines a filter " - "and filtering is not active", - [SubscriptionId]), - response(Transport, - Connection, - subscribe, - CorrelationId, - ?RESPONSE_CODE_PRECONDITION_FAILED), - rabbit_global_counters:increase_protocol_counter(stream, - ?PRECONDITION_FAILED, - 1), - {Connection, State}; - false -> - handle_frame_post_auth(Transport, {ok, Connection}, State, Request) - end; handle_frame_post_auth(Transport, #stream_connection{} = Connection, State, {request, _, {subscribe, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl index 6f44c63a7c81..0c1bc2dcc683 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.hrl @@ -93,7 +93,6 @@ deliver_version :: rabbit_stream_core:command_version(), request_timeout :: pos_integer(), outstanding_requests_timer :: undefined | erlang:reference(), - filtering_supported :: boolean(), %% internal sequence used for publishers internal_sequence = 0 :: integer(), token_expiry_timer = undefined :: undefined | erlang:reference()}). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index b746c036a5a6..6f9c2edef5d2 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -34,7 +34,6 @@ filter_defined/1, filter_spec/1, command_versions/0, - filtering_supported/0, check_super_stream_management_permitted/4, offset_lag/4, consumer_offset/3]). @@ -298,14 +297,8 @@ filter_spec(Properties) -> end. command_versions() -> - PublishMaxVersion = case filtering_supported() of - false -> - ?VERSION_1; - true -> - ?VERSION_2 - end, [{declare_publisher, ?VERSION_1, ?VERSION_1}, - {publish, ?VERSION_1, PublishMaxVersion}, + {publish, ?VERSION_1, ?VERSION_2}, {query_publisher_sequence, ?VERSION_1, ?VERSION_1}, {delete_publisher, ?VERSION_1, ?VERSION_1}, {subscribe, ?VERSION_1, ?VERSION_1}, @@ -324,9 +317,6 @@ command_versions() -> {create_super_stream, ?VERSION_1, ?VERSION_1}, {delete_super_stream, ?VERSION_1, ?VERSION_1}]. -filtering_supported() -> - rabbit_feature_flags:is_enabled(stream_filtering). - q(VirtualHost, Name) -> rabbit_misc:r(VirtualHost, queue, Name). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index d764f17a874b..5f7ee115a025 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -37,8 +37,7 @@ all() -> groups() -> [{single_node, [], - [filtering_ff, %% must stay at the top, feature flag disabled for this one - test_stream, + [test_stream, test_stream_tls, test_publish_v2, test_super_stream_creation_deletion, @@ -185,13 +184,6 @@ end_per_testcase(cannot_update_username_after_authenticated = TestCase, Config) ok = rabbit_ct_broker_helpers:delete_user(Config, <<"other">>), rabbit_ct_helpers:testcase_finished(Config, TestCase); -end_per_testcase(filtering_ff = TestCase, Config) -> - _ = rabbit_ct_broker_helpers:rpc(Config, - 0, - rabbit_feature_flags, - enable, - [stream_filtering]), - rabbit_ct_helpers:testcase_finished(Config, TestCase); end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -212,32 +204,6 @@ end_per_testcase(store_offset_requires_read_access = TestCase, Config) -> end_per_testcase(TestCase, Config) -> rabbit_ct_helpers:testcase_finished(Config, TestCase). -filtering_ff(Config) -> - Stream = atom_to_binary(?FUNCTION_NAME, utf8), - Transport = gen_tcp, - Port = get_stream_port(Config), - Opts = [{active, false}, {mode, binary}], - {ok, S} = Transport:connect("localhost", Port, Opts), - C0 = rabbit_stream_core:init(0), - C1 = test_peer_properties(Transport, S, C0), - C2 = test_authenticate(Transport, S, C1), - C3 = test_create_stream(Transport, S, Stream, C2), - PublisherId = 42, - C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3), - Body = <<"hello">>, - C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, - publish_error, C4), - SubscriptionId = 42, - C6 = test_subscribe(Transport, S, SubscriptionId, Stream, - #{<<"filter.0">> => <<"foo">>}, - ?RESPONSE_CODE_PRECONDITION_FAILED, - C5), - - C7 = test_delete_stream(Transport, S, Stream, C6), - _C8 = test_close(Transport, S, C7), - closed = wait_for_socket_close(Transport, S, 10), - ok. - test_global_counters(Config) -> Stream = atom_to_binary(?FUNCTION_NAME, utf8), test_server(gen_tcp, Stream, Config), diff --git a/rabbitmq.bzl b/rabbitmq.bzl index 9c2927b06234..8c51a2b16f71 100644 --- a/rabbitmq.bzl +++ b/rabbitmq.bzl @@ -281,7 +281,7 @@ def rabbitmq_integration_suite( # required starting from 3.12.0 in rabbitmq_management_agent: # empty_basic_get_metric, drop_unroutable_metric # required starting from 4.0 in rabbit: - "message_containers", + "message_containers,stream_update_config_command,stream_filtering,stream_sac_coordinator_unblock_group,restart_streams", "RABBITMQ_RUN": "$(location :rabbitmq-for-tests-run)", "RABBITMQCTL": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmqctl".format(package), "RABBITMQ_PLUGINS": "$TEST_SRCDIR/$TEST_WORKSPACE/{}/broker-for-tests-home/sbin/rabbitmq-plugins".format(package), From 6cdfb62a81c287f017a0b8907ee98ab759430583 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 10 Jul 2024 10:26:23 +0000 Subject: [PATCH 2/3] Fix test case rebalance This test case was wrongly skipped and therefore never ran. --- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 13ef45d712ea..3d09d901caf9 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -1239,7 +1239,7 @@ recover_after_leader_and_coordinator_kill(Config) -> ct:pal("sys state ~p", [CState]), - + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]), ok. keep_consuming_on_leader_restart(Config) -> @@ -2194,6 +2194,7 @@ leader_locator_balanced_maintenance(Config) -> end, 60000), true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[Q1, Q]]), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). select_nodes_with_least_replicas(Config) -> @@ -2803,20 +2804,20 @@ rebalance(Config) -> Q5 = <<"st5">>, ?assertEqual({'queue.declare_ok', Q1, 0, 0}, - declare(Ch, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-initial-cluster-size">>, long, 3}])), + declare(Config, Server0, Q1, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 3}])), ?assertEqual({'queue.declare_ok', Q2, 0, 0}, - declare(Ch, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-initial-cluster-size">>, long, 3}])), + declare(Config, Server0, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 3}])), ?assertEqual({'queue.declare_ok', Q3, 0, 0}, - declare(Ch, Q3, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-initial-cluster-size">>, long, 3}])), + declare(Config, Server0, Q3, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 3}])), ?assertEqual({'queue.declare_ok', Q4, 0, 0}, - declare(Ch, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-initial-cluster-size">>, long, 3}])), + declare(Config, Server0, Q4, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 3}])), ?assertEqual({'queue.declare_ok', Q5, 0, 0}, - declare(Ch, Q5, [{<<"x-queue-type">>, longstr, <<"stream">>}, - {<<"x-initial-cluster-size">>, long, 3}])), + declare(Config, Server0, Q5, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 3}])), NumMsgs = 100, Data = crypto:strong_rand_bytes(100), From 620de9d4cb94c40a645ea66bca2de5421e269c1e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 11 Jul 2024 09:05:36 +0200 Subject: [PATCH 3/3] Delete unnecessary function as suggested by JSP in PR feedback --- deps/rabbit/src/rabbit_stream_sac_coordinator.erl | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 6be7a32b93b9..9e46085ed9d1 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -632,22 +632,19 @@ handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) -> notify_consumer_effect(Pid, SubId, Stream, Name, Active) -> notify_consumer_effect(Pid, SubId, Stream, Name, Active, false). -notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) -> - notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, map). - -notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) -> +notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown) -> mod_call_effect(Pid, {sac, #{subscription_id => SubId, stream => Stream, consumer_name => Name, active => Active}}); -notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = _SteppingDown, map) -> +notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = SteppingDown) -> mod_call_effect(Pid, {sac, #{subscription_id => SubId, stream => Stream, consumer_name => Name, active => Active, - stepping_down => true}}). + stepping_down => SteppingDown}}). maybe_create_group(VirtualHost, Stream,