diff --git a/Cargo.lock b/Cargo.lock index e8c55722e..c6fe718ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6637,10 +6637,7 @@ name = "processor-messages" version = "0.1.0" dependencies = [ "dkg", - "flexible-transcript", "in-instructions-primitives", - "rand_chacha 0.3.1", - "rand_core 0.6.4", "serai-primitives", "serde", "tokens-primitives", diff --git a/processor/messages/Cargo.toml b/processor/messages/Cargo.toml index 90ac81e07..5908f77df 100644 --- a/processor/messages/Cargo.toml +++ b/processor/messages/Cargo.toml @@ -16,10 +16,6 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] zeroize = { version = "1", features = ["derive"] } -rand_core = "0.6" -rand_chacha = "0.3" -transcript = { package = "flexible-transcript", path = "../../crypto/transcript" } - serde = { version = "1", features = ["derive"] } dkg = { path = "../../crypto/dkg", features = ["serde"] } diff --git a/processor/messages/src/lib.rs b/processor/messages/src/lib.rs index c1aba10fe..b13efbf61 100644 --- a/processor/messages/src/lib.rs +++ b/processor/messages/src/lib.rs @@ -2,10 +2,6 @@ use std::collections::HashMap; use zeroize::Zeroize; -use rand_core::{RngCore, SeedableRng}; -use rand_chacha::ChaCha8Rng; -use transcript::{Transcript, RecommendedTranscript}; - use serde::{Serialize, Deserialize}; use dkg::{Participant, ThresholdParams}; @@ -17,7 +13,6 @@ use validator_sets_primitives::ValidatorSet; #[derive(Clone, Copy, PartialEq, Eq, Debug, Zeroize, Serialize, Deserialize)] pub struct SubstrateContext { - pub time: u64, pub coin_latest_finalized_block: BlockHash, } @@ -73,35 +68,14 @@ pub mod sign { pub attempt: u32, } - impl SignId { - /// Determine a signing set for a given signing session. - // TODO: Replace with ROAST or the first available group of signers. - // https://github.com/serai-dex/serai/issues/163 - pub fn signing_set(&self, params: &ThresholdParams) -> Vec { - let mut transcript = RecommendedTranscript::new(b"SignId signing_set"); - transcript.domain_separate(b"SignId"); - transcript.append_message(b"key", &self.key); - transcript.append_message(b"id", self.id); - transcript.append_message(b"attempt", self.attempt.to_le_bytes()); - - let mut candidates = - (1 ..= params.n()).map(|i| Participant::new(i).unwrap()).collect::>(); - let mut rng = ChaCha8Rng::from_seed(transcript.rng_seed(b"signing_set")); - while candidates.len() > params.t().into() { - candidates.swap_remove( - usize::try_from(rng.next_u64() % u64::try_from(candidates.len()).unwrap()).unwrap(), - ); - } - candidates - } - } - #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] pub enum CoordinatorMessage { // Received preprocesses for the specified signing protocol. Preprocesses { id: SignId, preprocesses: HashMap> }, // Received shares for the specified signing protocol. Shares { id: SignId, shares: HashMap> }, + // Re-attempt a signing protocol. + Reattempt { id: SignId }, // Completed a signing protocol already. Completed { key: Vec, id: [u8; 32], tx: Vec }, } @@ -125,6 +99,7 @@ pub mod sign { match self { CoordinatorMessage::Preprocesses { id, .. } => &id.key, CoordinatorMessage::Shares { id, .. } => &id.key, + CoordinatorMessage::Reattempt { id } => &id.key, CoordinatorMessage::Completed { key, .. } => key, } } @@ -139,6 +114,8 @@ pub mod coordinator { // Uses Vec instead of [u8; 64] since serde Deserialize isn't implemented for [u8; 64] BatchPreprocesses { id: SignId, preprocesses: HashMap> }, BatchShares { id: SignId, shares: HashMap }, + // Re-attempt a batch signing protocol. + BatchReattempt { id: SignId }, // Needed so a client which didn't participate in signing can still realize signing completed BatchSigned { key: Vec, block: BlockHash }, } @@ -148,6 +125,7 @@ pub mod coordinator { Some(match self { CoordinatorMessage::BatchPreprocesses { id, .. } => BlockHash(id.id), CoordinatorMessage::BatchShares { id, .. } => BlockHash(id.id), + CoordinatorMessage::BatchReattempt { id } => BlockHash(id.id), CoordinatorMessage::BatchSigned { block, .. } => *block, }) } @@ -156,6 +134,7 @@ pub mod coordinator { match self { CoordinatorMessage::BatchPreprocesses { id, .. } => &id.key, CoordinatorMessage::BatchShares { id, .. } => &id.key, + CoordinatorMessage::BatchReattempt { id } => &id.key, CoordinatorMessage::BatchSigned { key, .. } => key, } } diff --git a/processor/src/coins/bitcoin.rs b/processor/src/coins/bitcoin.rs index 52c5da4f2..2940833e1 100644 --- a/processor/src/coins/bitcoin.rs +++ b/processor/src/coins/bitcoin.rs @@ -1,8 +1,4 @@ -use std::{ - time::{SystemTime, Duration}, - io, - collections::HashMap, -}; +use std::{time::Duration, io, collections::HashMap}; use async_trait::async_trait; @@ -201,9 +197,6 @@ impl BlockTrait for Block { hash.reverse(); hash } - fn time(&self) -> SystemTime { - SystemTime::UNIX_EPOCH + Duration::from_secs(self.header.time.into()) - } fn median_fee(&self) -> Fee { // TODO Fee(20) diff --git a/processor/src/coins/mod.rs b/processor/src/coins/mod.rs index c2de39a4b..f9ff6e433 100644 --- a/processor/src/coins/mod.rs +++ b/processor/src/coins/mod.rs @@ -1,5 +1,5 @@ use core::fmt::Debug; -use std::{time::SystemTime, io, collections::HashMap}; +use std::{io, collections::HashMap}; use async_trait::async_trait; use thiserror::Error; @@ -175,7 +175,6 @@ pub trait Block: Send + Sync + Sized + Clone + Debug { // This is currently bounded to being 32-bytes. type Id: 'static + Id; fn id(&self) -> Self::Id; - fn time(&self) -> SystemTime; fn median_fee(&self) -> C::Fee; } diff --git a/processor/src/coins/monero.rs b/processor/src/coins/monero.rs index 853744372..9f48fcb67 100644 --- a/processor/src/coins/monero.rs +++ b/processor/src/coins/monero.rs @@ -1,8 +1,4 @@ -use std::{ - time::{SystemTime, Duration}, - collections::HashMap, - io, -}; +use std::{time::Duration, collections::HashMap, io}; use async_trait::async_trait; @@ -146,10 +142,6 @@ impl BlockTrait for Block { self.0 } - fn time(&self) -> SystemTime { - SystemTime::UNIX_EPOCH + Duration::from_secs(self.1.header.timestamp) - } - fn median_fee(&self) -> Fee { // TODO Fee { per_weight: 80000, mask: 10000 } diff --git a/processor/src/db.rs b/processor/src/db.rs index f6ef35895..923a7e84d 100644 --- a/processor/src/db.rs +++ b/processor/src/db.rs @@ -21,7 +21,7 @@ impl MainDb { fn signing_key(key: &[u8]) -> Vec { Self::main_key(b"signing", key) } - pub fn save_signing(&mut self, key: &[u8], block_number: u64, time: u64, plan: &Plan) { + pub fn save_signing(&mut self, key: &[u8], block_number: u64, plan: &Plan) { let id = plan.id(); // Creating a TXN here is arguably an anti-pattern, yet nothing here expects atomicity let mut txn = self.0.txn(); @@ -43,7 +43,6 @@ impl MainDb { { let mut buf = block_number.to_le_bytes().to_vec(); - buf.extend(&time.to_le_bytes()); plan.write(&mut buf).unwrap(); txn.put(Self::plan_key(&id), &buf); } @@ -51,7 +50,7 @@ impl MainDb { txn.commit(); } - pub fn signing(&self, key: &[u8]) -> Vec<(u64, u64, Plan)> { + pub fn signing(&self, key: &[u8]) -> Vec<(u64, Plan)> { let signing = self.0.get(Self::signing_key(key)).unwrap_or(vec![]); let mut res = vec![]; @@ -61,10 +60,9 @@ impl MainDb { let buf = self.0.get(Self::plan_key(id)).unwrap(); let block_number = u64::from_le_bytes(buf[.. 8].try_into().unwrap()); - let time = u64::from_le_bytes(buf[8 .. 16].try_into().unwrap()); let plan = Plan::::read::<&[u8]>(&mut &buf[16 ..]).unwrap(); assert_eq!(id, &plan.id()); - res.push((block_number, time, plan)); + res.push((block_number, plan)); } res diff --git a/processor/src/main.rs b/processor/src/main.rs index 4e5e373c0..b080c3796 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -1,9 +1,6 @@ use std::{ env, - pin::Pin, - task::{Poll, Context}, - future::Future, - time::{Duration, SystemTime}, + time::Duration, collections::{VecDeque, HashMap}, }; @@ -48,10 +45,10 @@ mod key_gen; use key_gen::{KeyGenEvent, KeyGen}; mod signer; -use signer::{SignerEvent, Signer, SignerHandle}; +use signer::{SignerEvent, Signer}; mod substrate_signer; -use substrate_signer::{SubstrateSignerEvent, SubstrateSigner, SubstrateSignerHandle}; +use substrate_signer::{SubstrateSignerEvent, SubstrateSigner}; mod scanner; use scanner::{ScannerEvent, Scanner, ScannerHandle}; @@ -73,34 +70,6 @@ pub(crate) fn additional_key(k: u64) -> ::F { ) } -struct SignerMessageFuture<'a, C: Coin, D: Db>(&'a mut HashMap, SignerHandle>); -impl<'a, C: Coin, D: Db> Future for SignerMessageFuture<'a, C, D> { - type Output = (Vec, SignerEvent); - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - for (key, signer) in self.0.iter_mut() { - match signer.events.poll_recv(ctx) { - Poll::Ready(event) => return Poll::Ready((key.clone(), event.unwrap())), - Poll::Pending => {} - } - } - Poll::Pending - } -} - -struct SubstrateSignerMessageFuture<'a, D: Db>(&'a mut HashMap, SubstrateSignerHandle>); -impl<'a, D: Db> Future for SubstrateSignerMessageFuture<'a, D> { - type Output = (Vec, SubstrateSignerEvent); - fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { - for (key, signer) in self.0.iter_mut() { - match signer.events.poll_recv(ctx) { - Poll::Ready(event) => return Poll::Ready((key.clone(), event.unwrap())), - Poll::Pending => {} - } - } - Poll::Pending - } -} - async fn get_fee(coin: &C, block_number: usize) -> C::Fee { loop { // TODO2: Use an fee representative of several blocks @@ -123,7 +92,7 @@ async fn get_fee(coin: &C, block_number: usize) -> C::Fee { async fn prepare_send( coin: &C, - signer: &SignerHandle, + signer: &Signer, block_number: usize, fee: C::Fee, plan: Plan, @@ -152,14 +121,12 @@ async fn sign_plans( coin: &C, scanner: &ScannerHandle, schedulers: &mut HashMap, Scheduler>, - signers: &HashMap, SignerHandle>, + signers: &mut HashMap, Signer>, context: SubstrateContext, plans: Vec>, ) { let mut plans = VecDeque::from(plans); - let start = SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(context.time)).unwrap(); - let mut block_hash = >::Id::default(); block_hash.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0); let block_number = scanner @@ -174,8 +141,9 @@ async fn sign_plans( info!("preparing plan {}: {:?}", hex::encode(id), plan); let key = plan.key.to_bytes(); - db.save_signing(key.as_ref(), block_number.try_into().unwrap(), context.time, &plan); - let (tx, branches) = prepare_send(coin, &signers[key.as_ref()], block_number, fee, plan).await; + db.save_signing(key.as_ref(), block_number.try_into().unwrap(), &plan); + let (tx, branches) = + prepare_send(coin, signers.get_mut(key.as_ref()).unwrap(), block_number, fee, plan).await; // TODO: If we reboot mid-sign_plans, for a DB-backed scheduler, these may be partially // executed @@ -193,7 +161,7 @@ async fn sign_plans( if let Some((tx, eventuality)) = tx { scanner.register_eventuality(block_number, id, eventuality.clone()).await; - signers[key.as_ref()].sign_transaction(id, start, tx, eventuality).await; + signers.get_mut(key.as_ref()).unwrap().sign_transaction(id, tx, eventuality).await; } } } @@ -253,13 +221,12 @@ async fn run(raw_db: D, coin: C, mut coordinato // necessary substrate_signers.insert(substrate_key.to_bytes().to_vec(), substrate_signer); - let signer = Signer::new(raw_db.clone(), coin.clone(), coin_keys); + let mut signer = Signer::new(raw_db.clone(), coin.clone(), coin_keys); // Load any TXs being actively signed let key = key.to_bytes(); - for (block_number, start, plan) in main_db.signing(key.as_ref()) { + for (block_number, plan) in main_db.signing(key.as_ref()) { let block_number = block_number.try_into().unwrap(); - let start = SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(start)).unwrap(); let fee = get_fee(&coin, block_number).await; @@ -274,7 +241,7 @@ async fn run(raw_db: D, coin: C, mut coordinato scanner.register_eventuality(block_number, id, eventuality.clone()).await; // TODO: Reconsider if the Signer should have the eventuality, or if just the coin/scanner // should - signer.sign_transaction(id, start, tx, eventuality).await; + signer.sign_transaction(id, tx, eventuality).await; } signers.insert(key.as_ref().to_vec(), signer); @@ -284,6 +251,59 @@ async fn run(raw_db: D, coin: C, mut coordinato let mut last_coordinator_msg = None; loop { + // Check if the signers have events + // The signers will only have events after the following select executes, which will then + // trigger the loop again, hence why having the code here with no timer is fine + for (key, signer) in signers.iter_mut() { + while let Some(msg) = signer.events.pop_front() { + match msg { + SignerEvent::ProcessorMessage(msg) => { + coordinator.send(ProcessorMessage::Sign(msg)).await; + } + + SignerEvent::SignedTransaction { id, tx } => { + // If we die after calling finish_signing, we'll never fire Completed + // TODO: Is that acceptable? Do we need to fire Completed before firing finish_signing? + main_db.finish_signing(key, id); + scanner.drop_eventuality(id).await; + coordinator + .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { + key: key.clone(), + id, + tx: tx.as_ref().to_vec(), + })) + .await; + + // TODO + // 1) We need to stop signing whenever a peer informs us or the chain has an + // eventuality + // 2) If a peer informed us of an eventuality without an outbound payment, stop + // scanning the chain for it (or at least ack it's solely for sanity purposes?) + // 3) When the chain has an eventuality, if it had an outbound payment, report it up to + // Substrate for logging purposes + } + } + } + } + + for (key, signer) in substrate_signers.iter_mut() { + while let Some(msg) = signer.events.pop_front() { + match msg { + SubstrateSignerEvent::ProcessorMessage(msg) => { + coordinator.send(ProcessorMessage::Coordinator(msg)).await; + } + SubstrateSignerEvent::SignedBatch(batch) => { + coordinator + .send(ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Update { + key: key.clone(), + batch, + })) + .await; + } + } + } + } + tokio::select! { // This blocks the entire processor until it finishes handling this message // KeyGen specifically may take a notable amount of processing time @@ -385,11 +405,11 @@ async fn run(raw_db: D, coin: C, mut coordinato }, CoordinatorMessage::Sign(msg) => { - signers[msg.key()].handle(msg).await; + signers.get_mut(msg.key()).unwrap().handle(msg).await; }, CoordinatorMessage::Coordinator(msg) => { - substrate_signers[msg.key()].handle(msg).await; + substrate_signers.get_mut(msg.key()).unwrap().handle(msg).await; }, CoordinatorMessage::Substrate(msg) => { @@ -433,7 +453,7 @@ async fn run(raw_db: D, coin: C, mut coordinato &coin, &scanner, &mut schedulers, - &signers, + &mut signers, context, plans ).await; @@ -447,7 +467,7 @@ async fn run(raw_db: D, coin: C, mut coordinato msg = scanner.events.recv() => { match msg.unwrap() { - ScannerEvent::Block { key, block, time, batch, outputs } => { + ScannerEvent::Block { key, block, batch, outputs } => { let key = key.to_bytes().as_ref().to_vec(); let mut block_hash = [0; 32]; @@ -484,7 +504,7 @@ async fn run(raw_db: D, coin: C, mut coordinato }).collect() }; - substrate_signers[&key].sign(time, batch).await; + substrate_signers.get_mut(&key).unwrap().sign(batch).await; }, ScannerEvent::Completed(id, tx) => { @@ -495,52 +515,6 @@ async fn run(raw_db: D, coin: C, mut coordinato }, } }, - - (key, msg) = SubstrateSignerMessageFuture(&mut substrate_signers) => { - match msg { - SubstrateSignerEvent::ProcessorMessage(msg) => { - coordinator.send(ProcessorMessage::Coordinator(msg)).await; - }, - SubstrateSignerEvent::SignedBatch(batch) => { - coordinator - .send(ProcessorMessage::Substrate(messages::substrate::ProcessorMessage::Update { - key, - batch, - })) - .await; - }, - } - }, - - (key, msg) = SignerMessageFuture(&mut signers) => { - match msg { - SignerEvent::ProcessorMessage(msg) => { - coordinator.send(ProcessorMessage::Sign(msg)).await; - }, - - SignerEvent::SignedTransaction { id, tx } => { - // If we die after calling finish_signing, we'll never fire Completed - // TODO: Is that acceptable? Do we need to fire Completed before firing finish_signing? - main_db.finish_signing(&key, id); - scanner.drop_eventuality(id).await; - coordinator - .send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed { - key: key.to_vec(), - id, - tx: tx.as_ref().to_vec() - })) - .await; - - // TODO - // 1) We need to stop signing whenever a peer informs us or the chain has an - // eventuality - // 2) If a peer informed us of an eventuality without an outbound payment, stop - // scanning the chain for it (or at least ack it's solely for sanity purposes?) - // 3) When the chain has an eventuality, if it had an outbound payment, report it up to - // Substrate for logging purposes - }, - } - }, } } } diff --git a/processor/src/scanner.rs b/processor/src/scanner.rs index c7f1ea34d..1d7662262 100644 --- a/processor/src/scanner.rs +++ b/processor/src/scanner.rs @@ -1,7 +1,7 @@ use core::marker::PhantomData; use std::{ sync::Arc, - time::{SystemTime, Duration}, + time::Duration, collections::{HashSet, HashMap}, }; @@ -25,7 +25,6 @@ pub enum ScannerEvent { Block { key: ::G, block: >::Id, - time: SystemTime, batch: u32, outputs: Vec, }, @@ -464,47 +463,8 @@ impl Scanner { let batch = ScannerDb::::save_outputs(&mut txn, &key, &block_id, &outputs); txn.commit(); - const TIME_TOLERANCE: u64 = 15; - - let now = SystemTime::now(); - let mut time = block.time(); - - // Block is older than the tolerance - // This isn't an issue, yet shows our daemon may have fallen behind/been disconnected - if now.duration_since(time).unwrap_or(Duration::ZERO) > - Duration::from_secs(TIME_TOLERANCE) - { - warn!( - "the time is {} and we only just received a block dated {}", - (now.duration_since(SystemTime::UNIX_EPOCH)).expect("now before epoch").as_secs(), - (time.duration_since(SystemTime::UNIX_EPOCH)) - .expect("block time before epoch") - .as_secs(), - ); - } - - // If this block is in the future, either this server's clock is wrong OR the block's - // miner's clock is wrong. The latter is the problem - // - // This time is used to schedule signing sessions over the content of this block - // If it's in the future, the first attempt won't time out until this block is no - // longer in the future - // - // Since we don't need consensus, if this time is more than 15s in the future, - // set it to the local time - // - // As long as a supermajority of nodes set a time within ~15s of each other, this - // should be fine - - // TODO2: Make more robust - if time.duration_since(now).unwrap_or(Duration::ZERO) > - Duration::from_secs(TIME_TOLERANCE) - { - time = now; - } - // Send all outputs - if !scanner.emit(ScannerEvent::Block { key, block: block_id, time, batch, outputs }) { + if !scanner.emit(ScannerEvent::Block { key, block: block_id, batch, outputs }) { return; } // Write this number as scanned so we won't re-fire these outputs diff --git a/processor/src/signer.rs b/processor/src/signer.rs index b2155685d..f1a359dfb 100644 --- a/processor/src/signer.rs +++ b/processor/src/signer.rs @@ -1,9 +1,5 @@ use core::{marker::PhantomData, fmt}; -use std::{ - sync::Arc, - time::{SystemTime, Duration}, - collections::HashMap, -}; +use std::collections::{VecDeque, HashMap}; use rand_core::OsRng; @@ -14,10 +10,6 @@ use frost::{ }; use log::{info, debug, warn, error}; -use tokio::{ - sync::{RwLock, mpsc}, - time::sleep, -}; use messages::sign::*; use crate::{ @@ -25,16 +17,12 @@ use crate::{ coins::{Transaction, Eventuality, Coin}, }; -const CHANNEL_MSG: &str = "Signer handler was dropped. Shutting down?"; - #[derive(Debug)] pub enum SignerEvent { SignedTransaction { id: [u8; 32], tx: >::Id }, ProcessorMessage(ProcessorMessage), } -pub type SignerEventChannel = mpsc::UnboundedReceiver>; - #[derive(Debug)] struct SignerDb(D, PhantomData); impl SignerDb { @@ -106,7 +94,7 @@ pub struct Signer { keys: ThresholdKeys, - signable: HashMap<[u8; 32], (SystemTime, C::SignableTransaction)>, + signable: HashMap<[u8; 32], C::SignableTransaction>, attempt: HashMap<[u8; 32], u32>, preprocessing: HashMap<[u8; 32], ::SignMachine>, #[allow(clippy::type_complexity)] @@ -117,7 +105,7 @@ pub struct Signer { >::SignatureMachine, >, - events: mpsc::UnboundedSender>, + pub events: VecDeque>, } impl fmt::Debug for Signer { @@ -131,18 +119,9 @@ impl fmt::Debug for Signer { } } -#[derive(Debug)] -pub struct SignerHandle { - signer: Arc>>, - pub events: SignerEventChannel, -} - impl Signer { - #[allow(clippy::new_ret_no_self)] - pub fn new(db: D, coin: C, keys: ThresholdKeys) -> SignerHandle { - let (events_send, events_recv) = mpsc::unbounded_channel(); - - let signer = Arc::new(RwLock::new(Signer { + pub fn new(db: D, coin: C, keys: ThresholdKeys) -> Signer { + Signer { coin, db: SignerDb(db, PhantomData), @@ -153,37 +132,35 @@ impl Signer { preprocessing: HashMap::new(), signing: HashMap::new(), - events: events_send, - })); - - tokio::spawn(Signer::run(signer.clone())); + events: VecDeque::new(), + } + } - SignerHandle { signer, events: events_recv } + pub async fn keys(&self) -> ThresholdKeys { + self.keys.clone() } fn verify_id(&self, id: &SignId) -> Result<(), ()> { - if !id.signing_set(&self.keys.params()).contains(&self.keys.params().i()) { - panic!("coordinator sent us preprocesses for a signing attempt we're not participating in"); - } - // Check the attempt lines up match self.attempt.get(&id.id) { - // If we don't have an attempt logged, it's because the coordinator is faulty OR - // because we rebooted + // If we don't have an attempt logged, it's because the coordinator is faulty OR because we + // rebooted None => { warn!( "not attempting {} #{}. this is an error if we didn't reboot", hex::encode(id.id), id.attempt ); - // Don't panic on the assumption we rebooted Err(())?; } Some(attempt) => { - // This could be an old attempt, or it may be a 'future' attempt if we rebooted and - // our SystemTime wasn't monotonic, as it may be if attempt != &id.attempt { - debug!("sent signing data for a distinct attempt"); + warn!( + "sent signing data for {} #{} yet we have attempt #{}", + hex::encode(id.id), + id.attempt, + attempt + ); Err(())?; } } @@ -192,16 +169,7 @@ impl Signer { Ok(()) } - fn emit(&mut self, event: SignerEvent) -> bool { - if self.events.send(event).is_err() { - info!("{}", CHANNEL_MSG); - false - } else { - true - } - } - - async fn eventuality_completion( + pub async fn eventuality_completion( &mut self, id: [u8; 32], tx_id: &>::Id, @@ -234,7 +202,7 @@ impl Signer { self.preprocessing.remove(&id); self.signing.remove(&id); - self.emit(SignerEvent::SignedTransaction { id, tx: tx.id() }); + self.events.push_back(SignerEvent::SignedTransaction { id, tx: tx.id() }); } else { warn!( "a validator claimed {} completed {} when it did not", @@ -252,7 +220,140 @@ impl Signer { } } - async fn handle(&mut self, msg: CoordinatorMessage) { + async fn check_completion(&mut self, id: [u8; 32]) -> bool { + if let Some(txs) = self.db.completed(id) { + debug!( + "SignTransaction/Reattempt order for {}, which we've already completed signing", + hex::encode(id) + ); + + // Find the first instance we noted as having completed *and can still get from our node* + let mut tx = None; + let mut buf = >::Id::default(); + let tx_id_len = buf.as_ref().len(); + assert_eq!(txs.len() % tx_id_len, 0); + for id in 0 .. (txs.len() / tx_id_len) { + let start = id * tx_id_len; + buf.as_mut().copy_from_slice(&txs[start .. (start + tx_id_len)]); + if self.coin.get_transaction(&buf).await.is_ok() { + tx = Some(buf); + break; + } + } + + // Fire the SignedTransaction event again + if let Some(tx) = tx { + self.events.push_back(SignerEvent::SignedTransaction { id, tx }); + } else { + warn!("completed signing {} yet couldn't get any of the completing TXs", hex::encode(id)); + } + + true + } else { + false + } + } + + async fn attempt(&mut self, id: [u8; 32], attempt: u32) { + if self.check_completion(id).await { + return; + } + + // Check if we're already working on this attempt + if let Some(curr_attempt) = self.attempt.get(&id) { + if curr_attempt >= &attempt { + warn!( + "told to attempt {} #{} yet we're already working on {}", + hex::encode(id), + attempt, + curr_attempt + ); + return; + } + } + + // Start this attempt + // Clone the TX so we don't have an immutable borrow preventing the below mutable actions + // (also because we do need an owned tx anyways) + let Some(tx) = self.signable.get(&id).cloned() else { + warn!("told to attempt a TX we aren't currently signing for"); + return; + }; + + // Delete any existing machines + self.preprocessing.remove(&id); + self.signing.remove(&id); + + // Update the attempt number + self.attempt.insert(id, attempt); + + let id = SignId { key: self.keys.group_key().to_bytes().as_ref().to_vec(), id, attempt }; + + info!("signing for {} #{}", hex::encode(id.id), id.attempt); + + // If we reboot mid-sign, the current design has us abort all signs and wait for latter + // attempts/new signing protocols + // This is distinct from the DKG which will continue DKG sessions, even on reboot + // This is because signing is tolerant of failures of up to 1/3rd of the group + // The DKG requires 100% participation + // While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for + // reboots, it's not worth the complexity when messing up here leaks our secret share + // + // Despite this, on reboot, we'll get told of active signing items, and may be in this + // branch again for something we've already attempted + // + // Only run if this hasn't already been attempted + if self.db.has_attempt(&id) { + warn!( + "already attempted {} #{}. this is an error if we didn't reboot", + hex::encode(id.id), + id.attempt + ); + return; + } + + let mut txn = self.db.0.txn(); + SignerDb::::attempt(&mut txn, &id); + txn.commit(); + + // Attempt to create the TX + let machine = match self.coin.attempt_send(tx).await { + Err(e) => { + error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e); + return; + } + Ok(machine) => machine, + }; + + let (machine, preprocess) = machine.preprocess(&mut OsRng); + self.preprocessing.insert(id.id, machine); + + // Broadcast our preprocess + self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess { + id, + preprocess: preprocess.serialize(), + })); + } + + pub async fn sign_transaction( + &mut self, + id: [u8; 32], + tx: C::SignableTransaction, + eventuality: C::Eventuality, + ) { + if self.check_completion(id).await { + return; + } + + let mut txn = self.db.0.txn(); + SignerDb::::save_eventuality(&mut txn, id, eventuality); + txn.commit(); + + self.signable.insert(id, tx); + self.attempt(id, 0).await; + } + + pub async fn handle(&mut self, msg: CoordinatorMessage) { match msg { CoordinatorMessage::Preprocesses { id, mut preprocesses } => { if self.verify_id(&id).is_err() { @@ -292,7 +393,7 @@ impl Signer { self.signing.insert(id.id, machine); // Broadcast our share - self.emit(SignerEvent::ProcessorMessage(ProcessorMessage::Share { + self.events.push_back(SignerEvent::ProcessorMessage(ProcessorMessage::Share { id, share: share.serialize(), })); @@ -357,7 +458,11 @@ impl Signer { assert!(self.preprocessing.remove(&id.id).is_none()); assert!(self.signing.remove(&id.id).is_none()); - self.emit(SignerEvent::SignedTransaction { id: id.id, tx: tx_id }); + self.events.push_back(SignerEvent::SignedTransaction { id: id.id, tx: tx_id }); + } + + CoordinatorMessage::Reattempt { id } => { + self.attempt(id.id, id.attempt).await; } CoordinatorMessage::Completed { key: _, id, tx: mut tx_vec } => { @@ -377,190 +482,4 @@ impl Signer { } } } - - // An async function, to be spawned on a task, to handle signing - async fn run(signer_arc: Arc>) { - const SIGN_TIMEOUT: u64 = 30; - - loop { - // Sleep until a timeout expires (or five seconds expire) - // Since this code start new sessions, it will delay any ordered signing sessions from - // starting for up to 5 seconds, hence why this number can't be too high (such as 30 seconds, - // the full timeout) - // This won't delay re-attempting any signing session however, nor will it block the - // sign_transaction function (since this doesn't hold any locks) - sleep({ - let now = SystemTime::now(); - let mut lowest = Duration::from_secs(5); - let signer = signer_arc.read().await; - for (id, (start, _)) in &signer.signable { - let until = if let Some(attempt) = signer.attempt.get(id) { - // Get when this attempt times out - (*start + Duration::from_secs(u64::from(attempt + 1) * SIGN_TIMEOUT)) - .duration_since(now) - .unwrap_or(Duration::ZERO) - } else { - Duration::ZERO - }; - - if until < lowest { - lowest = until; - } - } - lowest - }) - .await; - - // Because a signing attempt has timed out (or five seconds has passed), check all - // sessions' timeouts - { - let mut signer = signer_arc.write().await; - let keys = signer.signable.keys().cloned().collect::>(); - for id in keys { - let (start, tx) = &signer.signable[&id]; - let start = *start; - - let attempt = u32::try_from( - SystemTime::now().duration_since(start).unwrap_or(Duration::ZERO).as_secs() / - SIGN_TIMEOUT, - ) - .unwrap(); - - // Check if we're already working on this attempt - if let Some(curr_attempt) = signer.attempt.get(&id) { - if curr_attempt >= &attempt { - continue; - } - } - - // Start this attempt - // Clone the TX so we don't have an immutable borrow preventing the below mutable actions - // (also because we do need an owned tx anyways) - let tx = tx.clone(); - - // Delete any existing machines - signer.preprocessing.remove(&id); - signer.signing.remove(&id); - - // Update the attempt number so we don't re-enter this conditional - signer.attempt.insert(id, attempt); - - let id = - SignId { key: signer.keys.group_key().to_bytes().as_ref().to_vec(), id, attempt }; - // Only preprocess if we're a signer - if !id.signing_set(&signer.keys.params()).contains(&signer.keys.params().i()) { - continue; - } - info!("selected to sign {} #{}", hex::encode(id.id), id.attempt); - - // If we reboot mid-sign, the current design has us abort all signs and wait for latter - // attempts/new signing protocols - // This is distinct from the DKG which will continue DKG sessions, even on reboot - // This is because signing is tolerant of failures of up to 1/3rd of the group - // The DKG requires 100% participation - // While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for - // reboots, it's not worth the complexity when messing up here leaks our secret share - // - // Despite this, on reboot, we'll get told of active signing items, and may be in this - // branch again for something we've already attempted - // - // Only run if this hasn't already been attempted - if signer.db.has_attempt(&id) { - warn!( - "already attempted {} #{}. this is an error if we didn't reboot", - hex::encode(id.id), - id.attempt - ); - continue; - } - - let mut txn = signer.db.0.txn(); - SignerDb::::attempt(&mut txn, &id); - txn.commit(); - - // Attempt to create the TX - let machine = match signer.coin.attempt_send(tx).await { - Err(e) => { - error!("failed to attempt {}, #{}: {:?}", hex::encode(id.id), id.attempt, e); - continue; - } - Ok(machine) => machine, - }; - - let (machine, preprocess) = machine.preprocess(&mut OsRng); - signer.preprocessing.insert(id.id, machine); - - // Broadcast our preprocess - if !signer.emit(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess { - id, - preprocess: preprocess.serialize(), - })) { - return; - } - } - } - } - } -} - -impl SignerHandle { - pub async fn keys(&self) -> ThresholdKeys { - self.signer.read().await.keys.clone() - } - - pub async fn sign_transaction( - &self, - id: [u8; 32], - start: SystemTime, - tx: C::SignableTransaction, - eventuality: C::Eventuality, - ) { - let mut signer = self.signer.write().await; - - if let Some(txs) = signer.db.completed(id) { - debug!("SignTransaction order for ID we've already completed signing"); - - // Find the first instance we noted as having completed *and can still get from our node* - let mut tx = None; - let mut buf = >::Id::default(); - let tx_id_len = buf.as_ref().len(); - assert_eq!(txs.len() % tx_id_len, 0); - for id in 0 .. (txs.len() / tx_id_len) { - let start = id * tx_id_len; - buf.as_mut().copy_from_slice(&txs[start .. (start + tx_id_len)]); - if signer.coin.get_transaction(&buf).await.is_ok() { - tx = Some(buf); - break; - } - } - - // Fire the SignedTransaction event again - if let Some(tx) = tx { - if !signer.emit(SignerEvent::SignedTransaction { id, tx }) { - return; - } - } else { - warn!("completed signing {} yet couldn't get any of the completing TXs", hex::encode(id)); - } - return; - } - - let mut txn = signer.db.0.txn(); - SignerDb::::save_eventuality(&mut txn, id, eventuality); - txn.commit(); - - signer.signable.insert(id, (start, tx)); - } - - pub async fn eventuality_completion( - &self, - id: [u8; 32], - tx: &>::Id, - ) { - self.signer.write().await.eventuality_completion(id, tx).await; - } - - pub async fn handle(&self, msg: CoordinatorMessage) { - self.signer.write().await.handle(msg).await; - } } diff --git a/processor/src/substrate_signer.rs b/processor/src/substrate_signer.rs index ab668f6df..28e09b2cf 100644 --- a/processor/src/substrate_signer.rs +++ b/processor/src/substrate_signer.rs @@ -1,9 +1,5 @@ use core::fmt; -use std::{ - sync::Arc, - time::{SystemTime, Duration}, - collections::HashMap, -}; +use std::collections::{VecDeque, HashMap}; use rand_core::OsRng; @@ -21,26 +17,18 @@ use frost::{ use frost_schnorrkel::Schnorrkel; use log::{info, debug, warn}; -use tokio::{ - sync::{RwLock, mpsc}, - time::sleep, -}; use serai_client::in_instructions::primitives::{Batch, SignedBatch}; use messages::{sign::SignId, coordinator::*}; use crate::{DbTxn, Db}; -const CHANNEL_MSG: &str = "SubstrateSigner handler was dropped. Shutting down?"; - #[derive(Debug)] pub enum SubstrateSignerEvent { ProcessorMessage(ProcessorMessage), SignedBatch(SignedBatch), } -pub type SubstrateSignerEventChannel = mpsc::UnboundedReceiver; - #[derive(Debug)] struct SubstrateSignerDb(D); impl SubstrateSignerDb { @@ -78,12 +66,12 @@ pub struct SubstrateSigner { keys: ThresholdKeys, - signable: HashMap<[u8; 32], (SystemTime, Batch)>, + signable: HashMap<[u8; 32], Batch>, attempt: HashMap<[u8; 32], u32>, preprocessing: HashMap<[u8; 32], AlgorithmSignMachine>, signing: HashMap<[u8; 32], AlgorithmSignatureMachine>, - events: mpsc::UnboundedSender, + pub events: VecDeque, } impl fmt::Debug for SubstrateSigner { @@ -96,18 +84,9 @@ impl fmt::Debug for SubstrateSigner { } } -#[derive(Debug)] -pub struct SubstrateSignerHandle { - signer: Arc>>, - pub events: SubstrateSignerEventChannel, -} - impl SubstrateSigner { - #[allow(clippy::new_ret_no_self)] - pub fn new(db: D, keys: ThresholdKeys) -> SubstrateSignerHandle { - let (events_send, events_recv) = mpsc::unbounded_channel(); - - let signer = Arc::new(RwLock::new(SubstrateSigner { + pub fn new(db: D, keys: ThresholdKeys) -> SubstrateSigner { + SubstrateSigner { db: SubstrateSignerDb(db), keys, @@ -117,33 +96,31 @@ impl SubstrateSigner { preprocessing: HashMap::new(), signing: HashMap::new(), - events: events_send, - })); - - tokio::spawn(SubstrateSigner::run(signer.clone())); - - SubstrateSignerHandle { signer, events: events_recv } + events: VecDeque::new(), + } } fn verify_id(&self, id: &SignId) -> Result<(), ()> { - if !id.signing_set(&self.keys.params()).contains(&self.keys.params().i()) { - panic!("coordinator sent us preprocesses for a signing attempt we're not participating in"); - } - // Check the attempt lines up match self.attempt.get(&id.id) { - // If we don't have an attempt logged, it's because the coordinator is faulty OR - // because we rebooted + // If we don't have an attempt logged, it's because the coordinator is faulty OR because we + // rebooted None => { - warn!("not attempting {}. this is an error if we didn't reboot", hex::encode(id.id)); - // Don't panic on the assumption we rebooted + warn!( + "not attempting batch {} #{}. this is an error if we didn't reboot", + hex::encode(id.id), + id.attempt + ); Err(())?; } Some(attempt) => { - // This could be an old attempt, or it may be a 'future' attempt if we rebooted and - // our SystemTime wasn't monotonic, as it may be if attempt != &id.attempt { - debug!("sent signing data for a distinct attempt"); + warn!( + "sent signing data for batch {} #{} yet we have attempt #{}", + hex::encode(id.id), + id.attempt, + attempt + ); Err(())?; } } @@ -152,16 +129,91 @@ impl SubstrateSigner { Ok(()) } - fn emit(&mut self, event: SubstrateSignerEvent) -> bool { - if self.events.send(event).is_err() { - info!("{}", CHANNEL_MSG); - false - } else { - true + async fn attempt(&mut self, id: [u8; 32], attempt: u32) { + // See above commentary for why this doesn't emit SignedBatch + if self.db.completed(id) { + return; + } + + // Check if we're already working on this attempt + if let Some(curr_attempt) = self.attempt.get(&id) { + if curr_attempt >= &attempt { + warn!( + "told to attempt {} #{} yet we're already working on {}", + hex::encode(id), + attempt, + curr_attempt + ); + return; + } } + + // Start this attempt + if !self.signable.contains_key(&id) { + warn!("told to attempt signing a batch we aren't currently signing for"); + return; + }; + + // Delete any existing machines + self.preprocessing.remove(&id); + self.signing.remove(&id); + + // Update the attempt number + self.attempt.insert(id, attempt); + + let id = SignId { key: self.keys.group_key().to_bytes().to_vec(), id, attempt }; + info!("signing batch {} #{}", hex::encode(id.id), id.attempt); + + // If we reboot mid-sign, the current design has us abort all signs and wait for latter + // attempts/new signing protocols + // This is distinct from the DKG which will continue DKG sessions, even on reboot + // This is because signing is tolerant of failures of up to 1/3rd of the group + // The DKG requires 100% participation + // While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for + // reboots, it's not worth the complexity when messing up here leaks our secret share + // + // Despite this, on reboot, we'll get told of active signing items, and may be in this + // branch again for something we've already attempted + // + // Only run if this hasn't already been attempted + if self.db.has_attempt(&id) { + warn!( + "already attempted {} #{}. this is an error if we didn't reboot", + hex::encode(id.id), + id.attempt + ); + return; + } + + let mut txn = self.db.0.txn(); + SubstrateSignerDb::::attempt(&mut txn, &id); + txn.commit(); + + // b"substrate" is a literal from sp-core + let machine = AlgorithmMachine::new(Schnorrkel::new(b"substrate"), self.keys.clone()); + + let (machine, preprocess) = machine.preprocess(&mut OsRng); + self.preprocessing.insert(id.id, machine); + + // Broadcast our preprocess + self.events.push_back(SubstrateSignerEvent::ProcessorMessage( + ProcessorMessage::BatchPreprocess { id, preprocess: preprocess.serialize() }, + )); } - async fn handle(&mut self, msg: CoordinatorMessage) { + pub async fn sign(&mut self, batch: Batch) { + if self.db.completed(batch.block.0) { + debug!("Sign batch order for ID we've already completed signing"); + // See BatchSigned for commentary on why this simply returns + return; + } + + let id = batch.block.0; + self.signable.insert(id, batch); + self.attempt(id, 0).await; + } + + pub async fn handle(&mut self, msg: CoordinatorMessage) { match msg { CoordinatorMessage::BatchPreprocesses { id, mut preprocesses } => { if self.verify_id(&id).is_err() { @@ -193,7 +245,7 @@ impl SubstrateSigner { Err(e) => todo!("malicious signer: {:?}", e), }; - let (machine, share) = match machine.sign(preprocesses, &self.signable[&id.id].1.encode()) { + let (machine, share) = match machine.sign(preprocesses, &self.signable[&id.id].encode()) { Ok(res) => res, Err(e) => todo!("malicious signer: {:?}", e), }; @@ -202,10 +254,9 @@ impl SubstrateSigner { // Broadcast our share let mut share_bytes = [0; 32]; share_bytes.copy_from_slice(&share.serialize()); - self.emit(SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { - id, - share: share_bytes, - })); + self.events.push_back(SubstrateSignerEvent::ProcessorMessage( + ProcessorMessage::BatchShare { id, share: share_bytes }, + )); } CoordinatorMessage::BatchShares { id, mut shares } => { @@ -248,7 +299,7 @@ impl SubstrateSigner { }; let batch = - SignedBatch { batch: self.signable.remove(&id.id).unwrap().1, signature: sig.into() }; + SignedBatch { batch: self.signable.remove(&id.id).unwrap(), signature: sig.into() }; // Save the batch in case it's needed for recovery let mut txn = self.db.0.txn(); @@ -261,7 +312,11 @@ impl SubstrateSigner { assert!(self.preprocessing.remove(&id.id).is_none()); assert!(self.signing.remove(&id.id).is_none()); - self.emit(SubstrateSignerEvent::SignedBatch(batch)); + self.events.push_back(SubstrateSignerEvent::SignedBatch(batch)); + } + + CoordinatorMessage::BatchReattempt { id } => { + self.attempt(id.id, id.attempt).await; } CoordinatorMessage::BatchSigned { key: _, block } => { @@ -280,136 +335,9 @@ impl SubstrateSigner { // chain, hence why it's unnecessary to check it/back it up here // This also doesn't emit any further events since all mutation happen on the - // substrate::CoordinatorMessage::BlockAcknowledged message (which SignedBatch is meant to + // substrate::CoordinatorMessage::SubstrateBlock message (which SignedBatch is meant to // end up triggering) } } } - - // An async function, to be spawned on a task, to handle signing - async fn run(signer_arc: Arc>) { - const SIGN_TIMEOUT: u64 = 30; - - loop { - // Sleep until a timeout expires (or five seconds expire) - // Since this code start new sessions, it will delay any ordered signing sessions from - // starting for up to 5 seconds, hence why this number can't be too high (such as 30 seconds, - // the full timeout) - // This won't delay re-attempting any signing session however, nor will it block the - // sign_transaction function (since this doesn't hold any locks) - sleep({ - let now = SystemTime::now(); - let mut lowest = Duration::from_secs(5); - let signer = signer_arc.read().await; - for (id, (start, _)) in &signer.signable { - let until = if let Some(attempt) = signer.attempt.get(id) { - // Get when this attempt times out - (*start + Duration::from_secs(u64::from(attempt + 1) * SIGN_TIMEOUT)) - .duration_since(now) - .unwrap_or(Duration::ZERO) - } else { - Duration::ZERO - }; - - if until < lowest { - lowest = until; - } - } - lowest - }) - .await; - - // Because a signing attempt has timed out (or five seconds has passed), check all - // sessions' timeouts - { - let mut signer = signer_arc.write().await; - let keys = signer.signable.keys().cloned().collect::>(); - for id in keys { - let (start, _) = &signer.signable[&id]; - let start = *start; - - let attempt = u32::try_from( - SystemTime::now().duration_since(start).unwrap_or(Duration::ZERO).as_secs() / - SIGN_TIMEOUT, - ) - .unwrap(); - - // Check if we're already working on this attempt - if let Some(curr_attempt) = signer.attempt.get(&id) { - if curr_attempt >= &attempt { - continue; - } - } - - // Delete any existing machines - signer.preprocessing.remove(&id); - signer.signing.remove(&id); - - // Update the attempt number so we don't re-enter this conditional - signer.attempt.insert(id, attempt); - - let id = SignId { key: signer.keys.group_key().to_bytes().to_vec(), id, attempt }; - // Only preprocess if we're a signer - if !id.signing_set(&signer.keys.params()).contains(&signer.keys.params().i()) { - continue; - } - info!("selected to sign {} #{}", hex::encode(id.id), id.attempt); - - // If we reboot mid-sign, the current design has us abort all signs and wait for latter - // attempts/new signing protocols - // This is distinct from the DKG which will continue DKG sessions, even on reboot - // This is because signing is tolerant of failures of up to 1/3rd of the group - // The DKG requires 100% participation - // While we could apply similar tricks as the DKG (a seeded RNG) to achieve support for - // reboots, it's not worth the complexity when messing up here leaks our secret share - // - // Despite this, on reboot, we'll get told of active signing items, and may be in this - // branch again for something we've already attempted - // - // Only run if this hasn't already been attempted - if signer.db.has_attempt(&id) { - warn!( - "already attempted {} #{}. this is an error if we didn't reboot", - hex::encode(id.id), - id.attempt - ); - continue; - } - - let mut txn = signer.db.0.txn(); - SubstrateSignerDb::::attempt(&mut txn, &id); - txn.commit(); - - // b"substrate" is a literal from sp-core - let machine = AlgorithmMachine::new(Schnorrkel::new(b"substrate"), signer.keys.clone()); - - let (machine, preprocess) = machine.preprocess(&mut OsRng); - signer.preprocessing.insert(id.id, machine); - - // Broadcast our preprocess - if !signer.emit(SubstrateSignerEvent::ProcessorMessage( - ProcessorMessage::BatchPreprocess { id, preprocess: preprocess.serialize() }, - )) { - return; - } - } - } - } - } -} - -impl SubstrateSignerHandle { - pub async fn sign(&self, start: SystemTime, batch: Batch) { - let mut signer = self.signer.write().await; - if signer.db.completed(batch.block.0) { - debug!("Sign batch order for ID we've already completed signing"); - // See BatchSigned for commentary on why this simply returns - return; - } - signer.signable.insert(batch.block.0, (start, batch)); - } - - pub async fn handle(&self, msg: CoordinatorMessage) { - self.signer.write().await.handle(msg).await; - } } diff --git a/processor/src/tests/addresses.rs b/processor/src/tests/addresses.rs index 62ef3b387..ad5fb5420 100644 --- a/processor/src/tests/addresses.rs +++ b/processor/src/tests/addresses.rs @@ -52,7 +52,7 @@ async fn spend( coin.mine_block().await; } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block: _, time: _, batch: this_batch, outputs } => { + ScannerEvent::Block { key: this_key, block: _, batch: this_batch, outputs } => { assert_eq!(this_key, key); assert_eq!(this_batch, batch); assert_eq!(outputs.len(), 1); @@ -89,7 +89,7 @@ pub async fn test_addresses(coin: C) { // Verify the Scanner picked them up let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block, time: _, batch, outputs } => { + ScannerEvent::Block { key: this_key, block, batch, outputs } => { assert_eq!(this_key, key); assert_eq!(block, block_id); assert_eq!(batch, 0); diff --git a/processor/src/tests/key_gen.rs b/processor/src/tests/key_gen.rs index 54ec961f4..1d2423075 100644 --- a/processor/src/tests/key_gen.rs +++ b/processor/src/tests/key_gen.rs @@ -122,7 +122,7 @@ pub async fn test_key_gen() { let key_gen = key_gens.get_mut(&i).unwrap(); if let KeyGenEvent::KeyConfirmed { activation_block, substrate_keys, coin_keys } = key_gen .handle(CoordinatorMessage::ConfirmKeyPair { - context: SubstrateContext { time: 0, coin_latest_finalized_block: BlockHash([0x11; 32]) }, + context: SubstrateContext { coin_latest_finalized_block: BlockHash([0x11; 32]) }, id: ID, }) .await diff --git a/processor/src/tests/scanner.rs b/processor/src/tests/scanner.rs index 926ce2422..69cd3e8d3 100644 --- a/processor/src/tests/scanner.rs +++ b/processor/src/tests/scanner.rs @@ -43,16 +43,14 @@ pub async fn test_scanner(coin: C) { // Receive funds let block = coin.test_send(C::address(keys.group_key())).await; let block_id = block.id(); - let block_time = block.time(); // Verify the Scanner picked them up let verify_event = |mut scanner: ScannerHandle| async { let outputs = match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key, block, time, batch, outputs } => { + ScannerEvent::Block { key, block, batch, outputs } => { assert_eq!(key, keys.group_key()); assert_eq!(block, block_id); - assert_eq!(time, block_time); assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].kind(), OutputType::External); diff --git a/processor/src/tests/signer.rs b/processor/src/tests/signer.rs index b39cc5e3e..91694ff8c 100644 --- a/processor/src/tests/signer.rs +++ b/processor/src/tests/signer.rs @@ -1,9 +1,6 @@ -use std::{ - time::{Duration, SystemTime}, - collections::HashMap, -}; +use std::collections::HashMap; -use rand_core::OsRng; +use rand_core::{RngCore, OsRng}; use group::GroupEncoding; use frost::{ @@ -11,8 +8,6 @@ use frost::{ dkg::tests::{key_gen, clone_without}, }; -use tokio::time::timeout; - use serai_db::MemDb; use messages::sign::*; @@ -36,35 +31,52 @@ pub async fn sign( attempt: 0, }; - let signing_set = actual_id.signing_set(&keys_txs[&Participant::new(1).unwrap()].0.params()); let mut keys = HashMap::new(); let mut txs = HashMap::new(); for (i, (these_keys, this_tx)) in keys_txs.drain() { - assert_eq!(actual_id.signing_set(&these_keys.params()), signing_set); keys.insert(i, these_keys); txs.insert(i, this_tx); } let mut signers = HashMap::new(); + let mut t = 0; for i in 1 ..= keys.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); - signers.insert(i, Signer::new(MemDb::new(), coin.clone(), keys.remove(&i).unwrap())); + let keys = keys.remove(&i).unwrap(); + t = keys.params().t(); + signers.insert(i, Signer::new(MemDb::new(), coin.clone(), keys)); } + drop(keys); - let start = SystemTime::now(); for i in 1 ..= signers.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); let (tx, eventuality) = txs.remove(&i).unwrap(); - signers[&i].sign_transaction(actual_id.id, start, tx, eventuality).await; + signers.get_mut(&i).unwrap().sign_transaction(actual_id.id, tx, eventuality).await; + } + + let mut signing_set = vec![]; + while signing_set.len() < usize::from(t) { + let candidate = Participant::new( + u16::try_from((OsRng.next_u64() % u64::try_from(signers.len()).unwrap()) + 1).unwrap(), + ) + .unwrap(); + if signing_set.contains(&candidate) { + continue; + } + signing_set.push(candidate); } + // All participants should emit a preprocess let mut preprocesses = HashMap::new(); - for i in &signing_set { - if let Some(SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess { id, preprocess })) = - signers.get_mut(i).unwrap().events.recv().await + for i in 1 ..= signers.len() { + let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); + if let SignerEvent::ProcessorMessage(ProcessorMessage::Preprocess { id, preprocess }) = + signers.get_mut(&i).unwrap().events.pop_front().unwrap() { assert_eq!(id, actual_id); - preprocesses.insert(*i, preprocess); + if signing_set.contains(&i) { + preprocesses.insert(i, preprocess); + } } else { panic!("didn't get preprocess back"); } @@ -72,14 +84,16 @@ pub async fn sign( let mut shares = HashMap::new(); for i in &signing_set { - signers[i] + signers + .get_mut(i) + .unwrap() .handle(CoordinatorMessage::Preprocesses { id: actual_id.clone(), preprocesses: clone_without(&preprocesses, i), }) .await; - if let Some(SignerEvent::ProcessorMessage(ProcessorMessage::Share { id, share })) = - signers.get_mut(i).unwrap().events.recv().await + if let SignerEvent::ProcessorMessage(ProcessorMessage::Share { id, share }) = + signers.get_mut(i).unwrap().events.pop_front().unwrap() { assert_eq!(id, actual_id); shares.insert(*i, share); @@ -90,14 +104,16 @@ pub async fn sign( let mut tx_id = None; for i in &signing_set { - signers[i] + signers + .get_mut(i) + .unwrap() .handle(CoordinatorMessage::Shares { id: actual_id.clone(), shares: clone_without(&shares, i), }) .await; - if let Some(SignerEvent::SignedTransaction { id, tx }) = - signers.get_mut(i).unwrap().events.recv().await + if let SignerEvent::SignedTransaction { id, tx } = + signers.get_mut(i).unwrap().events.pop_front().unwrap() { assert_eq!(id, actual_id.id); if tx_id.is_none() { @@ -109,20 +125,9 @@ pub async fn sign( } } - // Make sure the signers not included didn't do anything - let mut excluded = (1 ..= signers.len()) - .map(|i| Participant::new(u16::try_from(i).unwrap()).unwrap()) - .collect::>(); - for i in signing_set { - excluded.remove(excluded.binary_search(&i).unwrap()); - } - for i in excluded { - assert!(timeout( - Duration::from_secs(5), - signers.get_mut(&Participant::new(u16::try_from(i).unwrap()).unwrap()).unwrap().events.recv() - ) - .await - .is_err()); + // Make sure there's no events left + for (_, mut signer) in signers.drain() { + assert!(signer.events.pop_front().is_none()); } tx_id.unwrap() diff --git a/processor/src/tests/substrate_signer.rs b/processor/src/tests/substrate_signer.rs index c198e2b16..395d2b01d 100644 --- a/processor/src/tests/substrate_signer.rs +++ b/processor/src/tests/substrate_signer.rs @@ -1,9 +1,6 @@ -use std::{ - time::{Duration, SystemTime}, - collections::HashMap, -}; +use std::collections::HashMap; -use rand_core::OsRng; +use rand_core::{RngCore, OsRng}; use group::GroupEncoding; use frost::{ @@ -12,8 +9,6 @@ use frost::{ dkg::tests::{key_gen, clone_without}, }; -use tokio::time::timeout; - use scale::Encode; use sp_application_crypto::{RuntimePublic, sr25519::Public}; @@ -53,29 +48,43 @@ async fn test_substrate_signer() { ], }; - let signing_set = actual_id.signing_set(&keys[&participant_one].params()); - for these_keys in keys.values() { - assert_eq!(actual_id.signing_set(&these_keys.params()), signing_set); - } - - let start = SystemTime::now(); let mut signers = HashMap::new(); + let mut t = 0; for i in 1 ..= keys.len() { let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); - let signer = SubstrateSigner::new(MemDb::new(), keys.remove(&i).unwrap()); - signer.sign(start, batch.clone()).await; + let keys = keys.remove(&i).unwrap(); + t = keys.params().t(); + let mut signer = SubstrateSigner::new(MemDb::new(), keys); + signer.sign(batch.clone()).await; signers.insert(i, signer); } + drop(keys); + let mut signing_set = vec![]; + while signing_set.len() < usize::from(t) { + let candidate = Participant::new( + u16::try_from((OsRng.next_u64() % u64::try_from(signers.len()).unwrap()) + 1).unwrap(), + ) + .unwrap(); + if signing_set.contains(&candidate) { + continue; + } + signing_set.push(candidate); + } + + // All participants should emit a preprocess let mut preprocesses = HashMap::new(); - for i in &signing_set { - if let Some(SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchPreprocess { + for i in 1 ..= signers.len() { + let i = Participant::new(u16::try_from(i).unwrap()).unwrap(); + if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchPreprocess { id, preprocess, - })) = signers.get_mut(i).unwrap().events.recv().await + }) = signers.get_mut(&i).unwrap().events.pop_front().unwrap() { assert_eq!(id, actual_id); - preprocesses.insert(*i, preprocess); + if signing_set.contains(&i) { + preprocesses.insert(i, preprocess); + } } else { panic!("didn't get preprocess back"); } @@ -83,16 +92,16 @@ async fn test_substrate_signer() { let mut shares = HashMap::new(); for i in &signing_set { - signers[i] + signers + .get_mut(i) + .unwrap() .handle(CoordinatorMessage::BatchPreprocesses { id: actual_id.clone(), preprocesses: clone_without(&preprocesses, i), }) .await; - if let Some(SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { - id, - share, - })) = signers.get_mut(i).unwrap().events.recv().await + if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchShare { id, share }) = + signers.get_mut(i).unwrap().events.pop_front().unwrap() { assert_eq!(id, actual_id); shares.insert(*i, share); @@ -102,15 +111,17 @@ async fn test_substrate_signer() { } for i in &signing_set { - signers[i] + signers + .get_mut(i) + .unwrap() .handle(CoordinatorMessage::BatchShares { id: actual_id.clone(), shares: clone_without(&shares, i), }) .await; - if let Some(SubstrateSignerEvent::SignedBatch(signed_batch)) = - signers.get_mut(i).unwrap().events.recv().await + if let SubstrateSignerEvent::SignedBatch(signed_batch) = + signers.get_mut(i).unwrap().events.pop_front().unwrap() { assert_eq!(signed_batch.batch, batch); assert!(Public::from_raw(actual_id.key.clone().try_into().unwrap()) @@ -120,19 +131,8 @@ async fn test_substrate_signer() { } } - // Make sure the signers not included didn't do anything - let mut excluded = (1 ..= signers.len()) - .map(|i| Participant::new(u16::try_from(i).unwrap()).unwrap()) - .collect::>(); - for i in signing_set { - excluded.remove(excluded.binary_search(&i).unwrap()); - } - for i in excluded { - assert!(timeout( - Duration::from_secs(5), - signers.get_mut(&Participant::new(u16::try_from(i).unwrap()).unwrap()).unwrap().events.recv() - ) - .await - .is_err()); + // Make sure there's no events left + for (_, mut signer) in signers.drain() { + assert!(signer.events.pop_front().is_none()); } } diff --git a/processor/src/tests/wallet.rs b/processor/src/tests/wallet.rs index 87c386b8d..65fb3dae7 100644 --- a/processor/src/tests/wallet.rs +++ b/processor/src/tests/wallet.rs @@ -31,13 +31,11 @@ pub async fn test_wallet(coin: C) { let block = coin.test_send(C::address(key)).await; let block_id = block.id(); - let block_time = block.time(); match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block, time, batch, outputs } => { + ScannerEvent::Block { key: this_key, block, batch, outputs } => { assert_eq!(this_key, key); assert_eq!(block, block_id); - assert_eq!(time, block_time); assert_eq!(batch, 0); assert_eq!(outputs.len(), 1); (block_id, outputs) @@ -104,10 +102,9 @@ pub async fn test_wallet(coin: C) { } match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() { - ScannerEvent::Block { key: this_key, block: block_id, time, batch, outputs: these_outputs } => { + ScannerEvent::Block { key: this_key, block: block_id, batch, outputs: these_outputs } => { assert_eq!(this_key, key); assert_eq!(block_id, block.id()); - assert_eq!(time, block.time()); assert_eq!(batch, 1); assert_eq!(these_outputs, outputs); }