diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 2f282740260d..6010c6d2b7d0 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -192,6 +192,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_prelaunch_logging.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", + "src/rabbit_process_flag.erl", "src/rabbit_queue_consumers.erl", "src/rabbit_queue_decorator.erl", "src/rabbit_queue_index.erl", @@ -448,6 +449,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_prelaunch_logging.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", + "src/rabbit_process_flag.erl", "src/rabbit_queue_consumers.erl", "src/rabbit_queue_decorator.erl", "src/rabbit_queue_index.erl", @@ -727,6 +729,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_prelaunch_logging.erl", "src/rabbit_priority_queue.erl", "src/rabbit_process.erl", + "src/rabbit_process_flag.erl", "src/rabbit_queue_consumers.erl", "src/rabbit_queue_decorator.erl", "src/rabbit_queue_index.erl", diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 0c459b08a76b..0b228040f5f3 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -385,7 +385,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, outgoing_window = ?UINT(RemoteOutgoingWindow), handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), - process_flag(message_queue_data, off_heap), + rabbit_process_flag:adjust_for_message_handling_proc(), ok = pg:join(pg_scope(), self(), self()), Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 5272991f5d2c..287a4d8838b0 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -484,6 +484,8 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), + rabbit_process_flag:adjust_for_message_handling_proc(), + ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), diff --git a/deps/rabbit/src/rabbit_process_flag.erl b/deps/rabbit/src/rabbit_process_flag.erl new file mode 100644 index 000000000000..fc74c25f554e --- /dev/null +++ b/deps/rabbit/src/rabbit_process_flag.erl @@ -0,0 +1,31 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_process_flag). + + +-export([adjust_for_message_handling_proc/0 + ]). + +%% Adjust process flags for processes that handle RabbitMQ messages. +%% For example any process that uses the `rabbit_queue_type' module +%% may benefit from this tuning. +%% @returns `ok' +-spec adjust_for_message_handling_proc() -> ok. +adjust_for_message_handling_proc() -> + process_flag(message_queue_data, off_heap), + case code_version:get_otp_version() of + OtpMaj when OtpMaj >= 27 -> + %% 46422 is the default min_bin_vheap_size and for OTP 27 and above + %% we want to substantially increase it for processes that may buffer + %% messages. 32x has proven workable in testing whilst not being + %% ridiculously large + process_flag(min_bin_vheap_size, 46422 * 32), + ok; + _ -> + ok + end. diff --git a/deps/rabbit/src/rabbit_ra_systems.erl b/deps/rabbit/src/rabbit_ra_systems.erl index 08e15ecb53ba..033c76132522 100644 --- a/deps/rabbit/src/rabbit_ra_systems.erl +++ b/deps/rabbit/src/rabbit_ra_systems.erl @@ -24,6 +24,9 @@ -define(COORD_WAL_MAX_SIZE_B, 64_000_000). -define(QUORUM_AER_MAX_RPC_SIZE, 16). -define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000). +%% the default min bin vheap value in OTP 26 +-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422). +-define(MIN_BIN_VHEAP_SIZE_MULT, 64). -spec setup() -> ok | no_return(). @@ -107,7 +110,6 @@ ensure_ra_system_started(RaSystem) -> end. -spec get_config(ra_system_name()) -> ra_system:config(). - get_config(quorum_queues = RaSystem) -> DefaultConfig = get_default_config(), Checksums = application:get_env(rabbit, quorum_compute_checksums, true), @@ -124,7 +126,16 @@ get_config(quorum_queues = RaSystem) -> AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size, ?QUORUM_AER_MAX_RPC_SIZE), CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true), + MinBinVheapSize = case code_version:get_otp_version() of + OtpMaj when OtpMaj >= 27 -> + ?MIN_BIN_VHEAP_SIZE_DEFAULT * ?MIN_BIN_VHEAP_SIZE_MULT; + _ -> + ?MIN_BIN_VHEAP_SIZE_DEFAULT + end, + DefaultConfig#{name => RaSystem, + wal_min_bin_vheap_size => MinBinVheapSize, + server_min_bin_vheap_size => MinBinVheapSize, default_max_append_entries_rpc_batch_size => AERBatchSize, wal_compute_checksums => WalChecksums, wal_max_entries => WalMaxEntries, diff --git a/deps/rabbit_common/src/code_version.erl b/deps/rabbit_common/src/code_version.erl index 568a6e7c439a..af90f73d941f 100644 --- a/deps/rabbit_common/src/code_version.erl +++ b/deps/rabbit_common/src/code_version.erl @@ -116,6 +116,7 @@ get_forms(Code) -> throw({no_abstract_code, Reason}) end. +-spec get_otp_version() -> non_neg_integer(). get_otp_version() -> Version = erlang:system_info(otp_release), case re:run(Version, "^[0-9][0-9]", [{capture, first, list}]) of diff --git a/moduleindex.yaml b/moduleindex.yaml index 02f800fcd252..13d2c98adcc2 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -694,6 +694,7 @@ rabbit: - rabbit_prelaunch_logging - rabbit_priority_queue - rabbit_process +- rabbit_process_flag - rabbit_queue_consumers - rabbit_queue_decorator - rabbit_queue_index