From e21fc5ff3ced2f4ac950374d6ee12dac1f59e082 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 15 Apr 2023 18:38:40 -0400 Subject: [PATCH] Merge AckBlock with Burns Offers greater efficiency while reducing concerns re: atomicity. --- processor/messages/src/lib.rs | 11 ++-------- processor/src/main.rs | 40 +++++++++++----------------------- processor/src/scanner.rs | 23 ++++++++++++------- processor/src/scheduler.rs | 27 ++++++++++------------- processor/src/tests/scanner.rs | 2 +- processor/src/tests/wallet.rs | 14 +++++++----- 6 files changed, 51 insertions(+), 66 deletions(-) diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index 9d3e8282d..c1aba10fe 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -173,20 +173,13 @@ pub mod substrate { #[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub enum CoordinatorMessage { - // Substrate acknwoledged the block, meaning it should be acted upon. - // - // This still needs to come from Substrate, not from the validator-chain, due to it mutating - // the scheduler, which the Substrate chain primarily does. To have two causes of mutation - // requires a definitive ordering, which isn't achievable when we have distinct consensus. - BlockAcknowledged { context: SubstrateContext, key: Vec, block: BlockHash }, - Burns { context: SubstrateContext, burns: Vec }, + SubstrateBlock { context: SubstrateContext, key: Vec, burns: Vec }, } impl CoordinatorMessage { pub fn required_block(&self) -> Option { let context = match self { - CoordinatorMessage::BlockAcknowledged { context, .. } => context, - CoordinatorMessage::Burns { context, .. } => context, + CoordinatorMessage::SubstrateBlock { context, .. } => context, }; Some(context.coin_latest_finalized_block) } diff --git a/processor/src/main.rs b/processor/src/main.rs index 8cfc43821..4e5e373c0 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -394,40 +394,22 @@ async fn run(raw_db: D, coin: C, mut coordinato CoordinatorMessage::Substrate(msg) => { match msg { - // TODO: Merge this with Burns so we don't have two distinct scheduling actions - messages::substrate::CoordinatorMessage::BlockAcknowledged { + messages::substrate::CoordinatorMessage::SubstrateBlock { context, key: key_vec, - block + burns, } => { - let key = - ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); let mut block_id = >::Id::default(); - block_id.as_mut().copy_from_slice(&block.0); - - let plans = schedulers - .get_mut(&key_vec) - .expect("key we don't have a scheduler for acknowledged a block") - .add_outputs(scanner.ack_block(key, block_id).await); + block_id.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0); - sign_plans( - &mut main_db, - &coin, - &scanner, - &mut schedulers, - &signers, - context, - plans - ).await; - } + let key = + ::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap(); - messages::substrate::CoordinatorMessage::Burns { context, burns } => { - // TODO2: Rewrite rotation documentation - let schedule_key = active_keys.last().expect("burn event despite no keys"); - let scheduler = schedulers.get_mut(schedule_key.to_bytes().as_ref()).unwrap(); + // We now have to acknowledge every block for this key up to the acknowledged block + let outputs = scanner.ack_up_to_block(key, block_id).await; let mut payments = vec![]; - for out in burns.clone() { + for out in burns { let OutInstructionWithBalance { instruction: OutInstruction { address, data }, balance, @@ -441,7 +423,11 @@ async fn run(raw_db: D, coin: C, mut coordinato } } - let plans = scheduler.schedule(payments); + let plans = schedulers + .get_mut(&key_vec) + .expect("key we don't have a scheduler for acknowledged a block") + .schedule(outputs, payments); + sign_plans( &mut main_db, &coin, diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index 964ced40c..c7f1ea34d 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -181,11 +181,7 @@ impl ScannerDb { key: &::G, block: usize, ) -> Vec { - let new_key = txn.get(Self::scanned_block_key(key)).is_none(); let outputs = Self::block(txn, block).and_then(|id| Self::outputs(txn, key, &id)); - // Either this is a new key, with no outputs, or we're acknowledging this block - // If we're acknowledging it, we should have outputs available - assert_eq!(new_key, outputs.is_none()); let outputs = outputs.unwrap_or(vec![]); // Mark all the outputs from this block as seen @@ -199,7 +195,10 @@ impl ScannerDb { outputs } fn latest_scanned_block(&self, key: ::G) -> usize { - let bytes = self.0.get(Self::scanned_block_key(&key)).unwrap_or(vec![0; 8]); + let bytes = self + .0 + .get(Self::scanned_block_key(&key)) + .expect("asking for latest scanned block of key which wasn't rotated to"); u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap() } } @@ -282,22 +281,30 @@ impl ScannerHandle { } /// Acknowledge having handled a block for a key. - pub async fn ack_block( + pub async fn ack_up_to_block( &self, key: ::G, id: >::Id, ) -> Vec { let mut scanner = self.scanner.write().await; debug!("Block {} acknowledged", hex::encode(&id)); + + // Get the number for this block let number = scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned"); + // Get the number of the last block we acknowledged + let prior = scanner.db.latest_scanned_block(key); + let mut outputs = vec![]; let mut txn = scanner.db.0.txn(); - let outputs = ScannerDb::::save_scanned_block(&mut txn, &key, number); + for number in (prior + 1) ..= number { + outputs.extend(ScannerDb::::save_scanned_block(&mut txn, &key, number)); + } + // TODO: This likely needs to be atomic with the scheduler? txn.commit(); for output in &outputs { - scanner.ram_outputs.remove(output.id().as_ref()); + assert!(scanner.ram_outputs.remove(output.id().as_ref())); } outputs diff --git a/processor/src/scheduler.rs b/processor/src/scheduler.rs index 7c49c61d1..c308ab436 100644 --- a/processor/src/scheduler.rs +++ b/processor/src/scheduler.rs @@ -108,9 +108,7 @@ impl Scheduler { Plan { key: self.key, inputs, payments, change: Some(self.key).filter(|_| change) } } - // When Substrate emits `Updates` for a coin, all outputs should be added up to the - // acknowledged block. - pub fn add_outputs(&mut self, mut utxos: Vec) -> Vec> { + fn add_outputs(&mut self, mut utxos: Vec) -> Vec> { log::info!("adding {} outputs", utxos.len()); let mut txs = vec![]; @@ -139,14 +137,13 @@ impl Scheduler { } log::info!("{} planned TXs have had their required inputs confirmed", txs.len()); - - // Additionally call schedule in case these outputs enable fulfilling scheduled payments - txs.extend(self.schedule(vec![])); txs } - // Schedule a series of payments. This should be called after `add_outputs`. - pub fn schedule(&mut self, payments: Vec>) -> Vec> { + // Schedule a series of outputs/payments. + pub fn schedule(&mut self, utxos: Vec, payments: Vec>) -> Vec> { + let mut plans = self.add_outputs(utxos); + log::info!("scheduling {} new payments", payments.len()); // Add all new payments to the list of pending payments @@ -157,7 +154,7 @@ impl Scheduler { // If we don't have UTXOs available, don't try to continue if self.utxos.is_empty() { log::info!("no utxos currently avilable"); - return vec![]; + return plans; } // Sort UTXOs so the highest valued ones are first @@ -185,13 +182,12 @@ impl Scheduler { } } - let mut txs = vec![]; for chunk in utxo_chunks.drain(..) { // TODO: While payments have their TXs' fees deducted from themselves, that doesn't hold here // We need to charge a fee before reporting incoming UTXOs to Substrate to cover aggregation // TXs log::debug!("aggregating a chunk of {} inputs", C::MAX_INPUTS); - txs.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) }) + plans.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) }) } // We want to use all possible UTXOs for all possible payments @@ -220,17 +216,18 @@ impl Scheduler { // Now that we have the list of payments we can successfully handle right now, create the TX // for them if !executing.is_empty() { - txs.push(self.execute(utxos, executing)); + plans.push(self.execute(utxos, executing)); } else { + // If we don't have any payments to execute, save these UTXOs for later self.utxos.extend(utxos); } log::info!( - "created {} TXs containing {} payments to sign", - txs.len(), + "created {} plans containing {} payments to sign", + plans.len(), payments_at_start - self.payments.len(), ); - txs + plans } // Note a branch output as having been created, with the amount it was actually created with, diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 25923adc0..926ce2422 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -70,7 +70,7 @@ pub async fn test_scanner(coin: C) { verify_event(new_scanner().await).await; // Acknowledge the block - assert_eq!(scanner.ack_block(keys.group_key(), block_id.clone()).await, outputs); + assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id.clone()).await, outputs); // There should be no more events assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err()); diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 3bfcacfcb..87c386b8d 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -49,16 +49,14 @@ pub async fn test_wallet(coin: C) { }; let mut scheduler = Scheduler::new(key); - // Add these outputs, which should return no plans - assert!(scheduler.add_outputs(outputs.clone()).is_empty()); - let amount = 2 * C::DUST; - let plans = scheduler.schedule(vec![Payment { address: C::address(key), data: None, amount }]); + let plans = scheduler + .schedule(outputs.clone(), vec![Payment { address: C::address(key), data: None, amount }]); assert_eq!( plans, vec![Plan { key, - inputs: outputs, + inputs: outputs.clone(), payments: vec![Payment { address: C::address(key), data: None, amount }], change: Some(key), }] @@ -91,6 +89,7 @@ pub async fn test_wallet(coin: C) { coin.mine_block().await; let block_number = coin.get_latest_block_number().await.unwrap(); let block = coin.get_block(block_number).await.unwrap(); + let first_outputs = outputs; let outputs = coin.get_outputs(&block, key).await.unwrap(); assert_eq!(outputs.len(), 2); let amount = amount - tx.fee(&coin).await; @@ -118,5 +117,8 @@ pub async fn test_wallet(coin: C) { } // Check the Scanner DB can reload the outputs - assert_eq!(scanner.ack_block(key, block.id()).await, outputs); + assert_eq!( + scanner.ack_up_to_block(key, block.id()).await, + [first_outputs, outputs].concat().to_vec() + ); }