From 81c8ed73bcdd632988fdba5043bb112cc45c858f Mon Sep 17 00:00:00 2001 From: Trent Nelson <490004+t-nelson@users.noreply.github.com> Date: Wed, 20 Mar 2024 11:13:50 -0600 Subject: [PATCH] rpc-sts: add config options for stake-weighted qos (#197) * rpc-sts: plumb options for swqos config * rpc-sts: send to specific tpu peers when configured --- .../src/send_transaction_service.rs | 22 ++++++++++++--- validator/src/cli.rs | 16 +++++++++++ validator/src/main.rs | 28 +++++++++++++++---- 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index dbdcda2f2ff905..abe53b236d2e75 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -115,6 +115,7 @@ pub struct Config { pub batch_send_rate_ms: u64, /// When the retry pool exceeds this max size, new transactions are dropped after their first broadcast attempt pub retry_pool_max_size: usize, + pub tpu_peers: Option>, } impl Default for Config { @@ -127,6 +128,7 @@ impl Default for Config { batch_size: DEFAULT_TRANSACTION_BATCH_SIZE, batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS, retry_pool_max_size: MAX_TRANSACTION_RETRY_POOL_SIZE, + tpu_peers: None, } } } @@ -566,12 +568,18 @@ impl SendTransactionService { stats: &SendTransactionServiceStats, ) { // Processing the transactions in batch - let addresses = Self::get_tpu_addresses_with_slots( + let mut addresses = config + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) + .unwrap_or_default(); + let leader_addresses = Self::get_tpu_addresses_with_slots( tpu_address, leader_info, config, connection_cache.protocol(), ); + addresses.extend(leader_addresses); let wire_transactions = transactions .iter() @@ -584,8 +592,8 @@ impl SendTransactionService { }) .collect::>(); - for address in &addresses { - Self::send_transactions(address.0, &wire_transactions, connection_cache, stats); + for (address, _) in &addresses { + Self::send_transactions(address, &wire_transactions, connection_cache, stats); } } @@ -702,14 +710,20 @@ impl SendTransactionService { let iter = wire_transactions.chunks(config.batch_size); for chunk in iter { + let mut addresses = config + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().collect::>()) + .unwrap_or_default(); let mut leader_info_provider = leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let addresses = Self::get_tpu_addresses( + let leader_addresses = Self::get_tpu_addresses( tpu_address, leader_info, config, connection_cache.protocol(), ); + addresses.extend(leader_addresses); for address in &addresses { Self::send_transactions(address, chunk, connection_cache, stats); diff --git a/validator/src/cli.rs b/validator/src/cli.rs index e9298d9c02928e..f127273c8da2f3 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1163,6 +1163,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .default_value(&default_args.rpc_send_transaction_retry_pool_max_size) .help("The maximum size of transactions retry pool."), ) + .arg( + Arg::with_name("rpc_send_transaction_tpu_peer") + .long("rpc-send-transaction-tpu-peer") + .takes_value(true) + .number_of_values(1) + .multiple(true) + .value_name("HOST:PORT") + .validator(solana_net_utils::is_host_port) + .help("Peer(s) to broadcast transactions to instead of the current leader") + ) + .arg( + Arg::with_name("rpc_send_transaction_also_leader") + .long("rpc-send-transaction-also-leader") + .requires("rpc_send_transaction_tpu_peer") + .help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader") + ) .arg( Arg::with_name("rpc_scan_and_fix_roots") .long("rpc-scan-and-fix-roots") diff --git a/validator/src/main.rs b/validator/src/main.rs index fd55bf74d20dde..545ecfda481d35 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1308,6 +1308,27 @@ pub fn main() { ); exit(1); } + let rpc_send_transaction_tpu_peers = matches + .values_of("rpc_send_transaction_tpu_peer") + .map(|values| { + values + .map(solana_net_utils::parse_host_port) + .collect::, String>>() + }) + .transpose() + .unwrap_or_else(|e| { + eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}"); + exit(1); + }); + let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader"); + let leader_forward_count = + if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader { + // rpc-sts is configured to send only to specific tpu peers. disable leader forwards + 0 + } else { + value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64) + }; + let full_api = matches.is_present("full_rpc_api"); let mut validator_config = ValidatorConfig { @@ -1399,11 +1420,7 @@ pub fn main() { contact_debug_interval, send_transaction_service_config: send_transaction_service::Config { retry_rate_ms: rpc_send_retry_rate_ms, - leader_forward_count: value_t_or_exit!( - matches, - "rpc_send_transaction_leader_forward_count", - u64 - ), + leader_forward_count, default_max_retries: value_t!( matches, "rpc_send_transaction_default_max_retries", @@ -1422,6 +1439,7 @@ pub fn main() { "rpc_send_transaction_retry_pool_max_size", usize ), + tpu_peers: rpc_send_transaction_tpu_peers, }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),