diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index e17f34d3223411..e76de0e8c8ac49 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -257,11 +257,13 @@ impl PrioGraphScheduler { pub fn receive_completed( &mut self, container: &mut TransactionStateContainer, + lamports_per_signature: u64, ) -> Result<(usize, usize), SchedulerError> { let mut total_num_transactions: usize = 0; let mut total_num_retryable: usize = 0; loop { - let (num_transactions, num_retryable) = self.try_receive_completed(container)?; + let (num_transactions, num_retryable) = + self.try_receive_completed(container, lamports_per_signature)?; if num_transactions == 0 { break; } @@ -276,6 +278,7 @@ impl PrioGraphScheduler { fn try_receive_completed( &mut self, container: &mut TransactionStateContainer, + lamports_per_signature: u64, ) -> Result<(usize, usize), SchedulerError> { match self.finished_consume_work_receiver.try_recv() { Ok(FinishedConsumeWork { @@ -307,6 +310,7 @@ impl PrioGraphScheduler { transaction, max_age_slot, }, + lamports_per_signature, ); retryable_iter.next(); continue; @@ -500,6 +504,8 @@ mod tests { std::borrow::Borrow, }; + const TEST_LAMPORTS_PER_SIGNATURE: u64 = 5_000; + macro_rules! txid { ($value:expr) => { TransactionId::new($value) @@ -581,6 +587,7 @@ mod tests { compute_unit_limit: 1, }, transaction_cost, + TEST_LAMPORTS_PER_SIGNATURE, ); } @@ -753,7 +760,9 @@ mod tests { retryable_indexes: vec![], }) .unwrap(); - scheduler.receive_completed(&mut container).unwrap(); + scheduler + .receive_completed(&mut container, TEST_LAMPORTS_PER_SIGNATURE) + .unwrap(); let scheduling_summary = scheduler .schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter) .unwrap(); diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 225ff6a53e18c5..a4d0d60c2b1d9d 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -237,8 +237,11 @@ impl SchedulerController { /// Receives completed transactions from the workers and updates metrics. fn receive_completed(&mut self) -> Result<(), SchedulerError> { - let ((num_transactions, num_retryable), receive_completed_time_us) = - measure_us!(self.scheduler.receive_completed(&mut self.container)?); + let bank = self.bank_forks.read().unwrap().working_bank(); + let lamports_per_signature = bank.get_lamports_per_signature(); + let ((num_transactions, num_retryable), receive_completed_time_us) = measure_us!(self + .scheduler + .receive_completed(&mut self.container, lamports_per_signature)?); saturating_add_assign!(self.count_metrics.num_finished, num_transactions); saturating_add_assign!(self.count_metrics.num_retryable, num_retryable); saturating_add_assign!( @@ -299,6 +302,7 @@ impl SchedulerController { fn buffer_packets(&mut self, packets: Vec) { // Sanitize packets, generate IDs, and insert into the container. let bank = self.bank_forks.read().unwrap().working_bank(); + let lamports_per_signature = bank.get_lamports_per_signature(); let last_slot_in_epoch = bank.epoch_schedule().get_last_slot_in_epoch(bank.epoch()); let transaction_account_lock_limit = bank.get_transaction_account_lock_limit(); let feature_set = &bank.feature_set; @@ -355,6 +359,7 @@ impl SchedulerController { transaction_ttl, priority_details, transaction_cost, + lamports_per_signature, ) { saturating_add_assign!(self.count_metrics.num_dropped_on_capacity, 1); } diff --git a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs index 7c95f843537934..e56659252eb02c 100644 --- a/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs +++ b/core/src/banking_stage/transaction_scheduler/transaction_state_container.rs @@ -127,17 +127,17 @@ impl TransactionStateContainer { transaction_ttl: SanitizedTransactionTTL, transaction_priority_details: TransactionPriorityDetails, transaction_cost: TransactionCost, + lamports_per_signature: u64, ) -> bool { - let priority_id = - TransactionPriorityId::new(transaction_priority_details.priority, transaction_id); - self.id_to_transaction_state.insert( - transaction_id, - TransactionState::new( - transaction_ttl, - transaction_priority_details, - transaction_cost, - ), + let state = TransactionState::new( + transaction_ttl, + transaction_priority_details, + transaction_cost, ); + let priority = Self::calculate_prioritization(lamports_per_signature, &state); + self.id_to_transaction_state.insert(transaction_id, state); + + let priority_id = TransactionPriorityId::new(priority, transaction_id); self.push_id_into_queue(priority_id) } @@ -147,11 +147,13 @@ impl TransactionStateContainer { &mut self, transaction_id: TransactionId, transaction_ttl: SanitizedTransactionTTL, + lamports_per_signature: u64, ) { let transaction_state = self .get_mut_transaction_state(&transaction_id) .expect("transaction must exist"); - let priority_id = TransactionPriorityId::new(transaction_state.priority(), transaction_id); + let priority = Self::calculate_prioritization(lamports_per_signature, transaction_state); + let priority_id = TransactionPriorityId::new(priority, transaction_id); transaction_state.transition_to_unprocessed(transaction_ttl); self.push_id_into_queue(priority_id); } @@ -176,6 +178,16 @@ impl TransactionStateContainer { .remove(id) .expect("transaction must exist"); } + + /// Calculate prioritization for a transaction. + fn calculate_prioritization(lamports_per_signature: u64, state: &TransactionState) -> u64 { + let compute_unit_price = state.transaction_priority_details().priority; + let signature_fees = + lamports_per_signature * state.transaction_ttl().transaction.signatures().len() as u64; + let cost = state.transaction_cost().sum(); + + signature_fees / cost + compute_unit_price + } } #[cfg(test)] @@ -196,6 +208,8 @@ mod tests { }, }; + const TEST_LAMPORTS_PER_SIGNATURE: u64 = 5_000; + fn test_transaction( priority: u64, ) -> ( @@ -243,6 +257,7 @@ mod tests { transaction_ttl, transaction_priority_details, transaction_cost, + TEST_LAMPORTS_PER_SIGNATURE, ); } }