Skip to content

Commit

Permalink
Merge AckBlock with Burns
Browse files Browse the repository at this point in the history
Offers greater efficiency while reducing concerns re: atomicity.
  • Loading branch information
kayabaNerve committed Apr 15, 2023
1 parent eafd054 commit e21fc5f
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 66 deletions.
11 changes: 2 additions & 9 deletions processor/messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,13 @@ pub mod substrate {

#[derive(Clone, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)]
pub enum CoordinatorMessage {
// Substrate acknwoledged the block, meaning it should be acted upon.
//
// This still needs to come from Substrate, not from the validator-chain, due to it mutating
// the scheduler, which the Substrate chain primarily does. To have two causes of mutation
// requires a definitive ordering, which isn't achievable when we have distinct consensus.
BlockAcknowledged { context: SubstrateContext, key: Vec<u8>, block: BlockHash },
Burns { context: SubstrateContext, burns: Vec<OutInstructionWithBalance> },
SubstrateBlock { context: SubstrateContext, key: Vec<u8>, burns: Vec<OutInstructionWithBalance> },
}

impl CoordinatorMessage {
pub fn required_block(&self) -> Option<BlockHash> {
let context = match self {
CoordinatorMessage::BlockAcknowledged { context, .. } => context,
CoordinatorMessage::Burns { context, .. } => context,
CoordinatorMessage::SubstrateBlock { context, .. } => context,
};
Some(context.coin_latest_finalized_block)
}
Expand Down
40 changes: 13 additions & 27 deletions processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,40 +394,22 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato

CoordinatorMessage::Substrate(msg) => {
match msg {
// TODO: Merge this with Burns so we don't have two distinct scheduling actions
messages::substrate::CoordinatorMessage::BlockAcknowledged {
messages::substrate::CoordinatorMessage::SubstrateBlock {
context,
key: key_vec,
block
burns,
} => {
let key =
<C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap();
let mut block_id = <C::Block as Block<C>>::Id::default();
block_id.as_mut().copy_from_slice(&block.0);

let plans = schedulers
.get_mut(&key_vec)
.expect("key we don't have a scheduler for acknowledged a block")
.add_outputs(scanner.ack_block(key, block_id).await);
block_id.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0);

sign_plans(
&mut main_db,
&coin,
&scanner,
&mut schedulers,
&signers,
context,
plans
).await;
}
let key =
<C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap();

messages::substrate::CoordinatorMessage::Burns { context, burns } => {
// TODO2: Rewrite rotation documentation
let schedule_key = active_keys.last().expect("burn event despite no keys");
let scheduler = schedulers.get_mut(schedule_key.to_bytes().as_ref()).unwrap();
// We now have to acknowledge every block for this key up to the acknowledged block
let outputs = scanner.ack_up_to_block(key, block_id).await;

let mut payments = vec![];
for out in burns.clone() {
for out in burns {
let OutInstructionWithBalance {
instruction: OutInstruction { address, data },
balance,
Expand All @@ -441,7 +423,11 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
}
}

let plans = scheduler.schedule(payments);
let plans = schedulers
.get_mut(&key_vec)
.expect("key we don't have a scheduler for acknowledged a block")
.schedule(outputs, payments);

sign_plans(
&mut main_db,
&coin,
Expand Down
23 changes: 15 additions & 8 deletions processor/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,7 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
key: &<C::Curve as Ciphersuite>::G,
block: usize,
) -> Vec<C::Output> {
let new_key = txn.get(Self::scanned_block_key(key)).is_none();
let outputs = Self::block(txn, block).and_then(|id| Self::outputs(txn, key, &id));
// Either this is a new key, with no outputs, or we're acknowledging this block
// If we're acknowledging it, we should have outputs available
assert_eq!(new_key, outputs.is_none());
let outputs = outputs.unwrap_or(vec![]);

// Mark all the outputs from this block as seen
Expand All @@ -199,7 +195,10 @@ impl<C: Coin, D: Db> ScannerDb<C, D> {
outputs
}
fn latest_scanned_block(&self, key: <C::Curve as Ciphersuite>::G) -> usize {
let bytes = self.0.get(Self::scanned_block_key(&key)).unwrap_or(vec![0; 8]);
let bytes = self
.0
.get(Self::scanned_block_key(&key))
.expect("asking for latest scanned block of key which wasn't rotated to");
u64::from_le_bytes(bytes.try_into().unwrap()).try_into().unwrap()
}
}
Expand Down Expand Up @@ -282,22 +281,30 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
}

/// Acknowledge having handled a block for a key.
pub async fn ack_block(
pub async fn ack_up_to_block(
&self,
key: <C::Curve as Ciphersuite>::G,
id: <C::Block as Block<C>>::Id,
) -> Vec<C::Output> {
let mut scanner = self.scanner.write().await;
debug!("Block {} acknowledged", hex::encode(&id));

// Get the number for this block
let number =
scanner.db.block_number(&id).expect("main loop trying to operate on data we haven't scanned");
// Get the number of the last block we acknowledged
let prior = scanner.db.latest_scanned_block(key);

let mut outputs = vec![];
let mut txn = scanner.db.0.txn();
let outputs = ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number);
for number in (prior + 1) ..= number {
outputs.extend(ScannerDb::<C, D>::save_scanned_block(&mut txn, &key, number));
}
// TODO: This likely needs to be atomic with the scheduler?
txn.commit();

for output in &outputs {
scanner.ram_outputs.remove(output.id().as_ref());
assert!(scanner.ram_outputs.remove(output.id().as_ref()));
}

outputs
Expand Down
27 changes: 12 additions & 15 deletions processor/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ impl<C: Coin> Scheduler<C> {
Plan { key: self.key, inputs, payments, change: Some(self.key).filter(|_| change) }
}

// When Substrate emits `Updates` for a coin, all outputs should be added up to the
// acknowledged block.
pub fn add_outputs(&mut self, mut utxos: Vec<C::Output>) -> Vec<Plan<C>> {
fn add_outputs(&mut self, mut utxos: Vec<C::Output>) -> Vec<Plan<C>> {
log::info!("adding {} outputs", utxos.len());

let mut txs = vec![];
Expand Down Expand Up @@ -139,14 +137,13 @@ impl<C: Coin> Scheduler<C> {
}

log::info!("{} planned TXs have had their required inputs confirmed", txs.len());

// Additionally call schedule in case these outputs enable fulfilling scheduled payments
txs.extend(self.schedule(vec![]));
txs
}

// Schedule a series of payments. This should be called after `add_outputs`.
pub fn schedule(&mut self, payments: Vec<Payment<C>>) -> Vec<Plan<C>> {
// Schedule a series of outputs/payments.
pub fn schedule(&mut self, utxos: Vec<C::Output>, payments: Vec<Payment<C>>) -> Vec<Plan<C>> {
let mut plans = self.add_outputs(utxos);

log::info!("scheduling {} new payments", payments.len());

// Add all new payments to the list of pending payments
Expand All @@ -157,7 +154,7 @@ impl<C: Coin> Scheduler<C> {
// If we don't have UTXOs available, don't try to continue
if self.utxos.is_empty() {
log::info!("no utxos currently avilable");
return vec![];
return plans;
}

// Sort UTXOs so the highest valued ones are first
Expand Down Expand Up @@ -185,13 +182,12 @@ impl<C: Coin> Scheduler<C> {
}
}

let mut txs = vec![];
for chunk in utxo_chunks.drain(..) {
// TODO: While payments have their TXs' fees deducted from themselves, that doesn't hold here
// We need to charge a fee before reporting incoming UTXOs to Substrate to cover aggregation
// TXs
log::debug!("aggregating a chunk of {} inputs", C::MAX_INPUTS);
txs.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) })
plans.push(Plan { key: self.key, inputs: chunk, payments: vec![], change: Some(self.key) })
}

// We want to use all possible UTXOs for all possible payments
Expand Down Expand Up @@ -220,17 +216,18 @@ impl<C: Coin> Scheduler<C> {
// Now that we have the list of payments we can successfully handle right now, create the TX
// for them
if !executing.is_empty() {
txs.push(self.execute(utxos, executing));
plans.push(self.execute(utxos, executing));
} else {
// If we don't have any payments to execute, save these UTXOs for later
self.utxos.extend(utxos);
}

log::info!(
"created {} TXs containing {} payments to sign",
txs.len(),
"created {} plans containing {} payments to sign",
plans.len(),
payments_at_start - self.payments.len(),
);
txs
plans
}

// Note a branch output as having been created, with the amount it was actually created with,
Expand Down
2 changes: 1 addition & 1 deletion processor/src/tests/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn test_scanner<C: Coin>(coin: C) {
verify_event(new_scanner().await).await;

// Acknowledge the block
assert_eq!(scanner.ack_block(keys.group_key(), block_id.clone()).await, outputs);
assert_eq!(scanner.ack_up_to_block(keys.group_key(), block_id.clone()).await, outputs);

// There should be no more events
assert!(timeout(Duration::from_secs(30), scanner.events.recv()).await.is_err());
Expand Down
14 changes: 8 additions & 6 deletions processor/src/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ pub async fn test_wallet<C: Coin>(coin: C) {
};

let mut scheduler = Scheduler::new(key);
// Add these outputs, which should return no plans
assert!(scheduler.add_outputs(outputs.clone()).is_empty());

let amount = 2 * C::DUST;
let plans = scheduler.schedule(vec![Payment { address: C::address(key), data: None, amount }]);
let plans = scheduler
.schedule(outputs.clone(), vec![Payment { address: C::address(key), data: None, amount }]);
assert_eq!(
plans,
vec![Plan {
key,
inputs: outputs,
inputs: outputs.clone(),
payments: vec![Payment { address: C::address(key), data: None, amount }],
change: Some(key),
}]
Expand Down Expand Up @@ -91,6 +89,7 @@ pub async fn test_wallet<C: Coin>(coin: C) {
coin.mine_block().await;
let block_number = coin.get_latest_block_number().await.unwrap();
let block = coin.get_block(block_number).await.unwrap();
let first_outputs = outputs;
let outputs = coin.get_outputs(&block, key).await.unwrap();
assert_eq!(outputs.len(), 2);
let amount = amount - tx.fee(&coin).await;
Expand Down Expand Up @@ -118,5 +117,8 @@ pub async fn test_wallet<C: Coin>(coin: C) {
}

// Check the Scanner DB can reload the outputs
assert_eq!(scanner.ack_block(key, block.id()).await, outputs);
assert_eq!(
scanner.ack_up_to_block(key, block.id()).await,
[first_outputs, outputs].concat().to_vec()
);
}

0 comments on commit e21fc5f

Please sign in to comment.