From bf75d42ac8f747e96cc483b4c232baa1ebf69395 Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 4 Apr 2024 08:44:54 -0400 Subject: [PATCH 01/11] WIP: initial ideas for reloading messages Signed-off-by: Jacinta Ferrant --- stacks-signer/src/client/stackerdb.rs | 79 ++++++++++++++++++++------- stacks-signer/src/runloop.rs | 10 +++- stacks-signer/src/signer.rs | 31 ++++++++++- 3 files changed, 98 insertions(+), 22 deletions(-) diff --git a/stacks-signer/src/client/stackerdb.rs b/stacks-signer/src/client/stackerdb.rs index 4b6e993bcf..1c4a2454d3 100644 --- a/stacks-signer/src/client/stackerdb.rs +++ b/stacks-signer/src/client/stackerdb.rs @@ -23,6 +23,7 @@ use slog::{slog_debug, slog_warn}; use stacks_common::codec::{read_next, StacksMessageCodec}; use stacks_common::types::chainstate::StacksPrivateKey; use stacks_common::{debug, warn}; +use wsts::net::Packet; use super::ClientError; use crate::client::retry_with_exponential_backoff; @@ -179,45 +180,83 @@ impl StackerDB { } } - /// Get the transactions from stackerdb for the signers - fn get_transactions( - transactions_session: &mut StackerDBSession, - signer_ids: &[SignerSlotID], - ) -> Result, ClientError> { + /// Get all signer messages from stackerdb for the given slot IDs + fn get_messages( + session: &mut StackerDBSession, + slot_ids: &[u32], + ) -> Result, ClientError> { + let mut messages = vec![]; let send_request = || { - transactions_session - .get_latest_chunks(&signer_ids.iter().map(|id| id.0).collect::>()) + session + .get_latest_chunks(slot_ids) .map_err(backoff::Error::transient) }; let chunk_ack = retry_with_exponential_backoff(send_request)?; - let mut transactions = Vec::new(); for (i, chunk) in chunk_ack.iter().enumerate() { - let signer_id = *signer_ids - .get(i) - .expect("BUG: retrieved an unequal amount of chunks to requested chunks"); let Some(data) = chunk else { continue; }; let Ok(message) = read_next::(&mut &data[..]) else { if !data.is_empty() { warn!("Failed to deserialize chunk data into a SignerMessage"); - debug!( - "signer #{signer_id}: Failed chunk ({}): {data:?}", - &data.len(), - ); + debug!("slot #{i}: Failed chunk ({}): {data:?}", &data.len(),); } continue; }; + messages.push(message); + } + Ok(messages) + } + + /// Get all wsts packets from stackerdb for each of the given signer IDs + pub fn get_all_packets( + &mut self, + signer_ids: &[SignerSlotID], + ) -> Result, ClientError> { + let slot_ids = signer_ids.iter().map(|id| id.0).collect::>(); + let mut packets = Vec::new(); + let packet_slots = &[ + MessageSlotID::DkgBegin, + MessageSlotID::DkgPrivateBegin, + MessageSlotID::DkgEndBegin, + MessageSlotID::DkgEnd, + MessageSlotID::DkgPublicShares, + MessageSlotID::DkgPrivateShares, + MessageSlotID::NonceRequest, + MessageSlotID::NonceResponse, + MessageSlotID::SignatureShareRequest, + MessageSlotID::SignatureShareResponse, + ]; + for packet_slot in packet_slots { + let session = self + .signers_message_stackerdb_sessions + .get_mut(packet_slot) + .ok_or(ClientError::NotConnected)?; + let messages = Self::get_messages(session, &slot_ids)?; + for message in messages { + let SignerMessage::Packet(packet) = message else { + warn!("Found an unexpected type in a packet slot"); + continue; + }; + packets.push(packet); + } + } + Ok(packets) + } + /// Get the transactions from stackerdb for the signers + fn get_transactions( + transactions_session: &mut StackerDBSession, + signer_ids: &[SignerSlotID], + ) -> Result, ClientError> { + let slot_ids = signer_ids.iter().map(|id| id.0).collect::>(); + let messages = Self::get_messages(transactions_session, &slot_ids)?; + let mut transactions = vec![]; + for message in messages { let SignerMessage::Transactions(chunk_transactions) = message else { warn!("Signer wrote an unexpected type to the transactions slot"); continue; }; - debug!( - "Retrieved {} transactions from signer ID {}.", - chunk_transactions.len(), - signer_id - ); transactions.extend(chunk_transactions); } Ok(transactions) diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index 17b74c2fc9..97d9178c42 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -367,6 +367,14 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { return None; } for signer in self.stacks_signers.values_mut() { + // First check if we missed any messages due to a restart or being late to the party + if let Err(e) = signer.read_stackerdb_messages( + &self.stacks_client, + res.clone(), + current_reward_cycle, + ) { + error!("{signer}: failed to read stackerdb messages: {e}"); + } let event_parity = match event { Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2), // Block proposal events do have reward cycles, but each proposal has its own cycle, @@ -383,7 +391,6 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { if event_parity == Some(other_signer_parity) { continue; } - if signer.approved_aggregate_public_key.is_none() { if let Err(e) = signer.update_dkg(&self.stacks_client) { error!("{signer}: failed to update DKG: {e}"); @@ -417,6 +424,7 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { } } // After processing event, run the next command for each signer + signer.process_next_command(&self.stacks_client, current_reward_cycle); } None diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 3da523eeaf..7caed0bd09 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -126,6 +126,8 @@ pub enum Command { /// The Signer state #[derive(PartialEq, Eq, Debug, Clone)] pub enum State { + /// The signer is uninitialized and should read stackerdb to restore state + Uninitialized, /// The signer is idle, waiting for messages and commands Idle, /// The signer is executing a DKG or Sign round @@ -215,6 +217,29 @@ impl Signer { fn get_coordinator_dkg(&self) -> (u32, PublicKey) { self.coordinator_selector.get_coordinator() } + /// Read stackerdb messages in case the signer was started late or restarted and missed incoming messages + pub fn read_stackerdb_messages( + &mut self, + stacks_client: &StacksClient, + res: Sender>, + current_reward_cycle: u64, + ) -> Result<(), ClientError> { + // TODO: should load DKG shares first and potentially check dkg results before attempting to load any other state. + // This should be done on initialization and not on ever read right when a signer is first created. This call will then + // be called when state is equal to some sort of "in between" state where it has loaded its DKG shares if they exist + // but not yet if it missed messages and needs to respond to them. + // See https://github.com/stacks-network/stacks-core/issues/4595 + if self.state != State::Uninitialized { + // We already have state. Do not load it again. + return Ok(()); + } + let packets = self.stackerdb.get_all_packets(&self.signer_slot_ids)?; + self.handle_packets(stacks_client, res, &packets, current_reward_cycle); + if self.state == State::Uninitialized { + self.state = State::Idle; + } + Ok(()) + } } impl From for Signer { @@ -290,7 +315,7 @@ impl From for Signer { Self { coordinator, state_machine, - state: State::Idle, + state: State::Uninitialized, commands: VecDeque::new(), stackerdb, mainnet: signer_config.mainnet, @@ -437,6 +462,10 @@ impl Signer { current_reward_cycle: u64, ) { match &self.state { + State::Uninitialized => { + // We cannot process any commands until we have restored our state + warn!("{self}: Cannot process commands until state is restored. Waiting..."); + } State::Idle => { let Some(command) = self.commands.front() else { debug!("{self}: Nothing to process. Waiting for command..."); From 8681bffd87d1b4e1e113d01082dd5a39723a7632 Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 4 Apr 2024 10:21:51 -0400 Subject: [PATCH 02/11] WIP: initial testing framework Signed-off-by: Jacinta Ferrant --- .../src/tests/nakamoto_integrations.rs | 40 +++++- testnet/stacks-node/src/tests/signer.rs | 117 +++++++++++++++++- 2 files changed, 146 insertions(+), 11 deletions(-) diff --git a/testnet/stacks-node/src/tests/nakamoto_integrations.rs b/testnet/stacks-node/src/tests/nakamoto_integrations.rs index aa545514f0..a50278b3af 100644 --- a/testnet/stacks-node/src/tests/nakamoto_integrations.rs +++ b/testnet/stacks-node/src/tests/nakamoto_integrations.rs @@ -807,7 +807,7 @@ fn signer_vote_if_needed( /// * `stacker_sks` - must be a private key for sending a large `stack-stx` transaction in order /// for pox-4 to activate /// * `signer_pks` - must be the same size as `stacker_sks` -pub fn boot_to_epoch_3_reward_set( +pub fn boot_to_epoch_3_reward_set_calculation_boundary( naka_conf: &Config, blocks_processed: &Arc, stacker_sks: &[StacksPrivateKey], @@ -828,9 +828,9 @@ pub fn boot_to_epoch_3_reward_set( ); let epoch_3_reward_cycle_boundary = epoch_3_start_height.saturating_sub(epoch_3_start_height % reward_cycle_len); - let epoch_3_reward_set_calculation_boundary = - epoch_3_reward_cycle_boundary.saturating_sub(prepare_phase_len); - let epoch_3_reward_set_calculation = epoch_3_reward_set_calculation_boundary.wrapping_add(2); // +2 to ensure we are at the second block of the prepare phase + let epoch_3_reward_set_calculation_boundary = epoch_3_reward_cycle_boundary + .saturating_sub(prepare_phase_len) + .wrapping_add(1); let http_origin = format!("http://{}", &naka_conf.node.rpc_bind); next_block_and_wait(btc_regtest_controller, &blocks_processed); next_block_and_wait(btc_regtest_controller, &blocks_processed); @@ -850,7 +850,6 @@ pub fn boot_to_epoch_3_reward_set( "block_height" => {block_height}, "reward_cycle" => {reward_cycle}, "epoch_3_reward_cycle_boundary" => {epoch_3_reward_cycle_boundary}, - "epoch_3_reward_set_calculation" => {epoch_3_reward_set_calculation}, "epoch_3_start_height" => {epoch_3_start_height}, ); for (stacker_sk, signer_sk) in stacker_sks.iter().zip(signer_sks.iter()) { @@ -899,10 +898,39 @@ pub fn boot_to_epoch_3_reward_set( run_until_burnchain_height( btc_regtest_controller, &blocks_processed, - epoch_3_reward_set_calculation, + epoch_3_reward_set_calculation_boundary, &naka_conf, ); + info!("Bootstrapped to Epoch 3.0 reward set calculation boundary height: {epoch_3_reward_set_calculation_boundary}."); +} + +/// +/// * `stacker_sks` - must be a private key for sending a large `stack-stx` transaction in order +/// for pox-4 to activate +/// * `signer_pks` - must be the same size as `stacker_sks` +pub fn boot_to_epoch_3_reward_set( + naka_conf: &Config, + blocks_processed: &Arc, + stacker_sks: &[StacksPrivateKey], + signer_sks: &[StacksPrivateKey], + btc_regtest_controller: &mut BitcoinRegtestController, +) { + boot_to_epoch_3_reward_set_calculation_boundary( + naka_conf, + blocks_processed, + stacker_sks, + signer_sks, + btc_regtest_controller, + ); + let epoch_3_reward_set_calculation = + btc_regtest_controller.get_headers_height().wrapping_add(1); + run_until_burnchain_height( + btc_regtest_controller, + &blocks_processed, + epoch_3_reward_set_calculation, + &naka_conf, + ); info!("Bootstrapped to Epoch 3.0 reward set calculation height: {epoch_3_reward_set_calculation}."); } diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index 867421b5c0..8ddb5833b9 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -9,8 +9,8 @@ use std::{env, thread}; use clarity::boot_util::boot_code_id; use clarity::vm::Value; use libsigner::{ - BlockResponse, MessageSlotID, RejectCode, RunningSigner, Signer, SignerEventReceiver, - SignerMessage, + BlockResponse, MessageSlotID, RejectCode, RunningSigner, Signer, SignerEntries, + SignerEventReceiver, SignerMessage, }; use rand::thread_rng; use rand_core::RngCore; @@ -42,13 +42,14 @@ use stacks_common::util::hash::{hex_bytes, MerkleTree, Sha512Trunc256Sum}; use stacks_common::util::secp256k1::MessageSignature; use stacks_signer::client::{StackerDB, StacksClient}; use stacks_signer::config::{build_signer_config_tomls, GlobalConfig as SignerConfig, Network}; +use stacks_signer::coordinator::CoordinatorSelector; use stacks_signer::runloop::RunLoopCommand; use stacks_signer::signer::{Command as SignerCommand, SignerSlotID}; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; use wsts::curve::point::Point; use wsts::curve::scalar::Scalar; -use wsts::state_machine::OperationResult; +use wsts::state_machine::{OperationResult, PublicKeys}; use crate::config::{Config as NeonConfig, EventKeyType, EventObserverConfig, InitialBalance}; use crate::event_dispatcher::MinedNakamotoBlockEvent; @@ -56,8 +57,9 @@ use crate::neon::Counters; use crate::run_loop::boot_nakamoto; use crate::tests::bitcoin_regtest::BitcoinCoreController; use crate::tests::nakamoto_integrations::{ - boot_to_epoch_3_reward_set, naka_neon_integration_conf, next_block_and, - next_block_and_mine_commit, POX_4_DEFAULT_STACKER_BALANCE, + boot_to_epoch_3_reward_set, boot_to_epoch_3_reward_set_calculation_boundary, + naka_neon_integration_conf, next_block_and, next_block_and_mine_commit, + POX_4_DEFAULT_STACKER_BALANCE, }; use crate::tests::neon_integrations::{ next_block_and_wait, run_until_burnchain_height, test_observer, wait_for_runloop, @@ -498,6 +500,16 @@ impl SignerTest { .expect("FATAL: signer not registered") } + fn get_signer_public_keys(&self, reward_cycle: u64) -> PublicKeys { + let entries = self + .stacks_client + .get_reward_set_signers_with_retry(reward_cycle) + .unwrap() + .unwrap(); + let entries = SignerEntries::parse(false, &entries).unwrap(); + entries.public_keys + } + fn generate_invalid_transactions(&self) -> Vec { let host = self .running_nodes @@ -961,6 +973,101 @@ fn stackerdb_dkg() { info!("DKG Time Elapsed: {:.2?}", dkg_elapsed); } +#[test] +#[ignore] +/// Test the signer can handle delayed start and still see the DKG round has commenced +fn stackerdb_delayed_start_dkg() { + if env::var("BITCOIND_TEST") != Ok("1".into()) { + return; + } + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("------------------------- Test Setup -------------------------"); + let timeout = Duration::from_secs(200); + let num_signers = 5; + let mut signer_test = SignerTest::new(num_signers); + boot_to_epoch_3_reward_set_calculation_boundary( + &signer_test.running_nodes.conf, + &signer_test.running_nodes.blocks_processed, + &signer_test.signer_stacks_private_keys, + &signer_test.signer_stacks_private_keys, + &mut signer_test.running_nodes.btc_regtest_controller, + ); + + info!("------------------------- Stop Signers -------------------------"); + let reward_cycle = signer_test.get_current_reward_cycle().saturating_add(1); + let public_keys = signer_test.get_signer_public_keys(reward_cycle); + let coordinator_selector = CoordinatorSelector::from(public_keys); + let (_, coordinator_public_key) = coordinator_selector.get_coordinator(); + let coordinator_public_key = + StacksPublicKey::from_slice(coordinator_public_key.to_bytes().as_slice()).unwrap(); + let mut to_stop = vec![]; + for (idx, key) in signer_test.signer_stacks_private_keys.iter().enumerate() { + let public_key = StacksPublicKey::from_private(key); + if public_key == coordinator_public_key { + // Do not stop the coordinator. We want coordinator to start a DKG round + continue; + } + to_stop.push(idx); + if to_stop.len() == num_signers.saturating_sub(2) { + // Keep one signer alive to test scenario where restartings signers can read both coordinator and signer messages to catch up + break; + } + } + let mut stopped_signers = Vec::with_capacity(to_stop.len()); + for idx in to_stop.into_iter().rev() { + let signer_key = signer_test.stop_signer(idx); + debug!( + "Removed signer {idx} with key: {:?}, {}", + signer_key, + signer_key.to_hex() + ); + stopped_signers.push((idx, signer_key)); + } + + info!("------------------------- Start DKG -------------------------"); + let height = signer_test + .running_nodes + .btc_regtest_controller + .get_headers_height(); + // Advance one more to trigger DKG + run_until_burnchain_height( + &mut signer_test.running_nodes.btc_regtest_controller, + &signer_test.running_nodes.blocks_processed, + height.wrapping_add(1), + &signer_test.running_nodes.conf, + ); + // Wait one second so DKG is actually triggered and signers are not available to respond + std::thread::sleep(Duration::from_secs(1)); + // Make sure DKG did not get set + assert!(signer_test + .stacks_client + .get_approved_aggregate_key(reward_cycle) + .expect("Failed to get approved aggregate key") + .is_none()); + info!("------------------------- Restart Stopped Signers -------------------------"); + + for (idx, signer_key) in stopped_signers { + signer_test.restart_signer(idx, signer_key); + } + + info!("------------------------- Wait for DKG -------------------------"); + let key = signer_test.wait_for_dkg(timeout); + // Make sure DKG did not get set + assert_eq!( + key, + signer_test + .stacks_client + .get_approved_aggregate_key(reward_cycle) + .expect("Failed to get approved aggregate key") + .expect("No approved aggregate key found") + ); +} + #[test] #[ignore] /// Test the signer can respond to external commands to perform DKG From 18f3c0344c6038961f927169eb7778f6aacb55c7 Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 4 Apr 2024 22:28:51 -0400 Subject: [PATCH 03/11] WIP: issues when tries to start DKG before initialized Signed-off-by: Jacinta Ferrant --- .github/workflows/bitcoin-tests.yml | 4 +- stacks-signer/src/client/stackerdb.rs | 12 +- stacks-signer/src/runloop.rs | 17 +- stacks-signer/src/signer.rs | 46 +++- testnet/stacks-node/src/tests/signer.rs | 278 +++++++++++++++--------- 5 files changed, 245 insertions(+), 112 deletions(-) diff --git a/.github/workflows/bitcoin-tests.yml b/.github/workflows/bitcoin-tests.yml index a832bc2234..2b367d20d6 100644 --- a/.github/workflows/bitcoin-tests.yml +++ b/.github/workflows/bitcoin-tests.yml @@ -82,12 +82,14 @@ jobs: - tests::nakamoto_integrations::vote_for_aggregate_key_burn_op - tests::nakamoto_integrations::follower_bootup - tests::signer::stackerdb_dkg - - tests::signer::stackerdb_sign + - tests::signer::stackerdb_sign_request_rejected - tests::signer::stackerdb_block_proposal - tests::signer::stackerdb_filter_bad_transactions - tests::signer::stackerdb_mine_2_nakamoto_reward_cycles - tests::signer::stackerdb_sign_after_signer_reboot - tests::nakamoto_integrations::stack_stx_burn_op_integration_test + - tests::signer::stackerdb_delayed_dkg + - tests::signer::stackerdb_delayed_sign # Do not run this one until we figure out why it fails in CI # - tests::neon_integrations::bitcoin_reorg_flap steps: diff --git a/stacks-signer/src/client/stackerdb.rs b/stacks-signer/src/client/stackerdb.rs index 1c4a2454d3..002ca41b58 100644 --- a/stacks-signer/src/client/stackerdb.rs +++ b/stacks-signer/src/client/stackerdb.rs @@ -212,9 +212,9 @@ impl StackerDB { pub fn get_all_packets( &mut self, signer_ids: &[SignerSlotID], - ) -> Result, ClientError> { + ) -> Result>, ClientError> { let slot_ids = signer_ids.iter().map(|id| id.0).collect::>(); - let mut packets = Vec::new(); + let mut packets = HashMap::new(); let packet_slots = &[ MessageSlotID::DkgBegin, MessageSlotID::DkgPrivateBegin, @@ -227,6 +227,7 @@ impl StackerDB { MessageSlotID::SignatureShareRequest, MessageSlotID::SignatureShareResponse, ]; + for packet_slot in packet_slots { let session = self .signers_message_stackerdb_sessions @@ -235,10 +236,13 @@ impl StackerDB { let messages = Self::get_messages(session, &slot_ids)?; for message in messages { let SignerMessage::Packet(packet) = message else { - warn!("Found an unexpected type in a packet slot"); + warn!("Found an unexpected type in a packet slot {packet_slot}"); continue; }; - packets.push(packet); + packets + .entry(*packet_slot) + .or_insert_with(|| vec![]) + .push(packet); } } Ok(packets) diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index 97d9178c42..1f9ede1a24 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -367,14 +367,6 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { return None; } for signer in self.stacks_signers.values_mut() { - // First check if we missed any messages due to a restart or being late to the party - if let Err(e) = signer.read_stackerdb_messages( - &self.stacks_client, - res.clone(), - current_reward_cycle, - ) { - error!("{signer}: failed to read stackerdb messages: {e}"); - } let event_parity = match event { Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2), // Block proposal events do have reward cycles, but each proposal has its own cycle, @@ -423,8 +415,15 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { signer.commands.push_back(command.command); } } + // Check if we missed any messages due to a restart or being late to the party + if let Err(e) = signer.read_stackerdb_messages( + &self.stacks_client, + res.clone(), + current_reward_cycle, + ) { + error!("{signer}: failed to read stackerdb messages: {e}"); + } // After processing event, run the next command for each signer - signer.process_next_command(&self.stacks_client, current_reward_cycle); } None diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 7caed0bd09..e5d6361ea5 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -234,7 +234,51 @@ impl Signer { return Ok(()); } let packets = self.stackerdb.get_all_packets(&self.signer_slot_ids)?; - self.handle_packets(stacks_client, res, &packets, current_reward_cycle); + let mut ordered_packets = vec![]; + if self.approved_aggregate_public_key.is_none() { + // we should read the DKG messages in order to see if we are in the middle of a DKG round + // TODO: we should check if we already computed some party shares. In which case, we should start LATER in this list + debug!("{self}: Checking stackerdb for missed DKG messages."); + let dkg_order = &[ + MessageSlotID::DkgBegin, + MessageSlotID::DkgEndBegin, + MessageSlotID::DkgPublicShares, + MessageSlotID::DkgPrivateBegin, + MessageSlotID::DkgPrivateShares, + MessageSlotID::DkgEndBegin, + MessageSlotID::DkgEnd, + ]; + for slot_id in dkg_order { + let packets = packets.get(slot_id).cloned().unwrap_or_default(); + ordered_packets.extend(packets); + } + } else { + debug!("{self}: Checking stackerdb for missed Sign messages."); + let sign_order = &[ + MessageSlotID::NonceRequest, + MessageSlotID::NonceResponse, + MessageSlotID::SignatureShareRequest, + MessageSlotID::SignatureShareResponse, + ]; + for slot_id in sign_order { + let packets = packets.get(slot_id).cloned().unwrap_or_default(); + ordered_packets.extend(packets); + } + } + if !ordered_packets.is_empty() { + debug!( + "{self}: Processing {} messages from stackerdb: {ordered_packets:?}", + ordered_packets.len() + ); + self.handle_packets( + stacks_client, + res.clone(), + &ordered_packets, + current_reward_cycle, + ); + } + debug!("{:?}", self.state); + debug!("{self}: Finished checking stackerdb for missed messages."); if self.state == State::Uninitialized { self.state = State::Idle; } diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index 8ddb5833b9..995da0d680 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -975,103 +975,8 @@ fn stackerdb_dkg() { #[test] #[ignore] -/// Test the signer can handle delayed start and still see the DKG round has commenced -fn stackerdb_delayed_start_dkg() { - if env::var("BITCOIND_TEST") != Ok("1".into()) { - return; - } - - tracing_subscriber::registry() - .with(fmt::layer()) - .with(EnvFilter::from_default_env()) - .init(); - - info!("------------------------- Test Setup -------------------------"); - let timeout = Duration::from_secs(200); - let num_signers = 5; - let mut signer_test = SignerTest::new(num_signers); - boot_to_epoch_3_reward_set_calculation_boundary( - &signer_test.running_nodes.conf, - &signer_test.running_nodes.blocks_processed, - &signer_test.signer_stacks_private_keys, - &signer_test.signer_stacks_private_keys, - &mut signer_test.running_nodes.btc_regtest_controller, - ); - - info!("------------------------- Stop Signers -------------------------"); - let reward_cycle = signer_test.get_current_reward_cycle().saturating_add(1); - let public_keys = signer_test.get_signer_public_keys(reward_cycle); - let coordinator_selector = CoordinatorSelector::from(public_keys); - let (_, coordinator_public_key) = coordinator_selector.get_coordinator(); - let coordinator_public_key = - StacksPublicKey::from_slice(coordinator_public_key.to_bytes().as_slice()).unwrap(); - let mut to_stop = vec![]; - for (idx, key) in signer_test.signer_stacks_private_keys.iter().enumerate() { - let public_key = StacksPublicKey::from_private(key); - if public_key == coordinator_public_key { - // Do not stop the coordinator. We want coordinator to start a DKG round - continue; - } - to_stop.push(idx); - if to_stop.len() == num_signers.saturating_sub(2) { - // Keep one signer alive to test scenario where restartings signers can read both coordinator and signer messages to catch up - break; - } - } - let mut stopped_signers = Vec::with_capacity(to_stop.len()); - for idx in to_stop.into_iter().rev() { - let signer_key = signer_test.stop_signer(idx); - debug!( - "Removed signer {idx} with key: {:?}, {}", - signer_key, - signer_key.to_hex() - ); - stopped_signers.push((idx, signer_key)); - } - - info!("------------------------- Start DKG -------------------------"); - let height = signer_test - .running_nodes - .btc_regtest_controller - .get_headers_height(); - // Advance one more to trigger DKG - run_until_burnchain_height( - &mut signer_test.running_nodes.btc_regtest_controller, - &signer_test.running_nodes.blocks_processed, - height.wrapping_add(1), - &signer_test.running_nodes.conf, - ); - // Wait one second so DKG is actually triggered and signers are not available to respond - std::thread::sleep(Duration::from_secs(1)); - // Make sure DKG did not get set - assert!(signer_test - .stacks_client - .get_approved_aggregate_key(reward_cycle) - .expect("Failed to get approved aggregate key") - .is_none()); - info!("------------------------- Restart Stopped Signers -------------------------"); - - for (idx, signer_key) in stopped_signers { - signer_test.restart_signer(idx, signer_key); - } - - info!("------------------------- Wait for DKG -------------------------"); - let key = signer_test.wait_for_dkg(timeout); - // Make sure DKG did not get set - assert_eq!( - key, - signer_test - .stacks_client - .get_approved_aggregate_key(reward_cycle) - .expect("Failed to get approved aggregate key") - .expect("No approved aggregate key found") - ); -} - -#[test] -#[ignore] -/// Test the signer can respond to external commands to perform DKG -fn stackerdb_sign() { +/// Test the signer rejects requests to sign that do not come from a miner +fn stackerdb_sign_request_rejected() { if env::var("BITCOIND_TEST") != Ok("1".into()) { return; } @@ -1205,6 +1110,185 @@ fn stackerdb_sign() { info!("Sign Time Elapsed: {:.2?}", sign_elapsed); } +#[test] +#[ignore] +/// Test that a signer can be offline when a DKG round has commenced and +/// can rejoin the DKG round after it has restarted +fn stackerdb_delayed_dkg() { + if env::var("BITCOIND_TEST") != Ok("1".into()) { + return; + } + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("------------------------- Test Setup -------------------------"); + let timeout = Duration::from_secs(200); + let mut signer_test = SignerTest::new(3); + boot_to_epoch_3_reward_set_calculation_boundary( + &signer_test.running_nodes.conf, + &signer_test.running_nodes.blocks_processed, + &signer_test.signer_stacks_private_keys, + &signer_test.signer_stacks_private_keys, + &mut signer_test.running_nodes.btc_regtest_controller, + ); + let reward_cycle = signer_test.get_current_reward_cycle().saturating_add(1); + let public_keys = signer_test.get_signer_public_keys(reward_cycle); + let coordinator_selector = CoordinatorSelector::from(public_keys); + let (_, coordinator_public_key) = coordinator_selector.get_coordinator(); + let coordinator_public_key = + StacksPublicKey::from_slice(coordinator_public_key.to_bytes().as_slice()).unwrap(); + + info!("------------------------- Stop Signers -------------------------"); + let mut to_stop = None; + for (idx, key) in signer_test.signer_stacks_private_keys.iter().enumerate() { + let public_key = StacksPublicKey::from_private(key); + if public_key == coordinator_public_key { + // Do not stop the coordinator. We want coordinator to start a DKG round + continue; + } + // Only stop one signer + to_stop = Some(idx); + break; + } + let signer_idx = to_stop.expect("Failed to find a signer to stop"); + let signer_key = signer_test.stop_signer(signer_idx); + debug!( + "Removed signer {signer_idx} with key: {:?}, {}", + signer_key, + signer_key.to_hex() + ); + + info!("------------------------- Start DKG -------------------------"); + let height = signer_test + .running_nodes + .btc_regtest_controller + .get_headers_height(); + // Advance one more to trigger DKG + run_until_burnchain_height( + &mut signer_test.running_nodes.btc_regtest_controller, + &signer_test.running_nodes.blocks_processed, + height.wrapping_add(1), + &signer_test.running_nodes.conf, + ); + // Wait a bit so DKG is actually triggered and signers are not available to respond + std::thread::sleep(Duration::from_secs(5)); + + // Make sure DKG did not get set + assert!(signer_test + .stacks_client + .get_approved_aggregate_key(reward_cycle) + .expect("Failed to get approved aggregate key") + .is_none()); + + info!("------------------------- Restart Stopped Signer -------------------------"); + + signer_test.restart_signer(signer_idx, signer_key); + + info!("------------------------- Wait for DKG -------------------------"); + let key = signer_test.wait_for_dkg(timeout); + let height = signer_test + .running_nodes + .btc_regtest_controller + .get_headers_height(); + // Advance one more to mine dkg transactions + run_until_burnchain_height( + &mut signer_test.running_nodes.btc_regtest_controller, + &signer_test.running_nodes.blocks_processed, + height.wrapping_add(1), + &signer_test.running_nodes.conf, + ); + // Make sure DKG did get set + assert_eq!( + key, + signer_test + .stacks_client + .get_approved_aggregate_key(reward_cycle) + .expect("Failed to get approved aggregate key") + .expect("No approved aggregate key found") + ); +} + +#[test] +#[ignore] +/// Test that a signer can be offline when a sign round has commenced +/// and can rejoin the sign round after it has restarted +fn stackerdb_delayed_sign() { + if env::var("BITCOIND_TEST") != Ok("1".into()) { + return; + } + + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + info!("------------------------- Test Setup -------------------------"); + let timeout = Duration::from_secs(200); + let mut signer_test = SignerTest::new(3); + let _key = signer_test.boot_to_epoch_3(timeout); + let rpc_bind = signer_test.running_nodes.conf.node.rpc_bind.clone(); + let run_stamp = signer_test.run_stamp; + let reward_cycle = signer_test.get_current_reward_cycle(); + let public_keys = signer_test.get_signer_public_keys(reward_cycle); + let coordinator_selector = CoordinatorSelector::from(public_keys); + let (_, coordinator_public_key) = coordinator_selector.get_coordinator(); + let coordinator_public_key = + StacksPublicKey::from_slice(coordinator_public_key.to_bytes().as_slice()).unwrap(); + + info!("------------------------- Stop Signer -------------------------"); + let mut to_stop = None; + for (idx, key) in signer_test.signer_stacks_private_keys.iter().enumerate() { + let public_key = StacksPublicKey::from_private(key); + if public_key == coordinator_public_key { + // Do not stop the coordinator. We want coordinator to start a sign round + continue; + } + // Only stop one signer + to_stop = Some(idx); + break; + } + let signer_idx = to_stop.expect("Failed to find a signer to stop"); + let signer_key = signer_test.stop_signer(signer_idx); + debug!( + "Removed signer {signer_idx} with key: {:?}, {}", + signer_key, + signer_key.to_hex() + ); + + info!("------------------------- Start Sign -------------------------"); + let sign_now = Instant::now(); + let h = std::thread::spawn(move || signer_test.mine_nakamoto_block(timeout)); + + // Sleep a bit to wait for a miner to propose a block + std::thread::sleep(Duration::from_secs(5)); + + info!("------------------------- Restart Stopped Signer -------------------------"); + let signer_config = build_signer_config_tomls( + &[signer_key], + &rpc_bind, + Some(Duration::from_millis(128)), // Timeout defaults to 5 seconds. Let's override it to 128 milliseconds. + &Network::Testnet, + "12345", + run_stamp, + 3000 + signer_idx, + ) + .pop() + .unwrap(); + + let (_cmd_send, cmd_recv) = channel(); + let (res_send, _res_recv) = channel(); + + let _signer = spawn_signer(&signer_config, cmd_recv, res_send); + info!("------------------------- Wait for Signed Block -------------------------"); + // Now the signer has come back online, it should sign the block + h.join().unwrap(); + let sign_elapsed = sign_now.elapsed(); + info!("Sign Time Elapsed: {:.2?}", sign_elapsed); +} + pub fn find_block_response(chunk_events: Vec) -> Option { for event in chunk_events.into_iter() { if event.contract_id.name.as_str() From 31af777edd9c56a735c052aea4775a9b368eb490 Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 4 Apr 2024 23:37:43 -0400 Subject: [PATCH 04/11] WIP: can't seem to end dkg Signed-off-by: Jacinta Ferrant --- stacks-signer/src/client/stackerdb.rs | 25 +++++------------------- stacks-signer/src/runloop.rs | 1 + stacks-signer/src/signer.rs | 28 +++++++++------------------ 3 files changed, 15 insertions(+), 39 deletions(-) diff --git a/stacks-signer/src/client/stackerdb.rs b/stacks-signer/src/client/stackerdb.rs index 002ca41b58..20ae01d273 100644 --- a/stacks-signer/src/client/stackerdb.rs +++ b/stacks-signer/src/client/stackerdb.rs @@ -209,25 +209,13 @@ impl StackerDB { } /// Get all wsts packets from stackerdb for each of the given signer IDs - pub fn get_all_packets( + pub fn get_packets( &mut self, signer_ids: &[SignerSlotID], - ) -> Result>, ClientError> { + packet_slots: &[MessageSlotID], + ) -> Result, ClientError> { let slot_ids = signer_ids.iter().map(|id| id.0).collect::>(); - let mut packets = HashMap::new(); - let packet_slots = &[ - MessageSlotID::DkgBegin, - MessageSlotID::DkgPrivateBegin, - MessageSlotID::DkgEndBegin, - MessageSlotID::DkgEnd, - MessageSlotID::DkgPublicShares, - MessageSlotID::DkgPrivateShares, - MessageSlotID::NonceRequest, - MessageSlotID::NonceResponse, - MessageSlotID::SignatureShareRequest, - MessageSlotID::SignatureShareResponse, - ]; - + let mut packets = vec![]; for packet_slot in packet_slots { let session = self .signers_message_stackerdb_sessions @@ -239,10 +227,7 @@ impl StackerDB { warn!("Found an unexpected type in a packet slot {packet_slot}"); continue; }; - packets - .entry(*packet_slot) - .or_insert_with(|| vec![]) - .push(packet); + packets.push(packet); } } Ok(packets) diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index 1f9ede1a24..1fecb18c91 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -429,6 +429,7 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { None } } + #[cfg(test)] mod tests { use blockstack_lib::chainstate::stacks::boot::NakamotoSignerEntry; diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index e5d6361ea5..966911dc92 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -233,38 +233,30 @@ impl Signer { // We already have state. Do not load it again. return Ok(()); } - let packets = self.stackerdb.get_all_packets(&self.signer_slot_ids)?; - let mut ordered_packets = vec![]; - if self.approved_aggregate_public_key.is_none() { + let packet_slots = if self.approved_aggregate_public_key.is_none() { // we should read the DKG messages in order to see if we are in the middle of a DKG round // TODO: we should check if we already computed some party shares. In which case, we should start LATER in this list debug!("{self}: Checking stackerdb for missed DKG messages."); - let dkg_order = &[ + vec![ MessageSlotID::DkgBegin, - MessageSlotID::DkgEndBegin, MessageSlotID::DkgPublicShares, MessageSlotID::DkgPrivateBegin, MessageSlotID::DkgPrivateShares, MessageSlotID::DkgEndBegin, MessageSlotID::DkgEnd, - ]; - for slot_id in dkg_order { - let packets = packets.get(slot_id).cloned().unwrap_or_default(); - ordered_packets.extend(packets); - } + ] } else { debug!("{self}: Checking stackerdb for missed Sign messages."); - let sign_order = &[ + vec![ MessageSlotID::NonceRequest, MessageSlotID::NonceResponse, MessageSlotID::SignatureShareRequest, MessageSlotID::SignatureShareResponse, - ]; - for slot_id in sign_order { - let packets = packets.get(slot_id).cloned().unwrap_or_default(); - ordered_packets.extend(packets); - } - } + ] + }; + let ordered_packets = self + .stackerdb + .get_packets(&self.signer_slot_ids, &packet_slots)?; if !ordered_packets.is_empty() { debug!( "{self}: Processing {} messages from stackerdb: {ordered_packets:?}", @@ -277,8 +269,6 @@ impl Signer { current_reward_cycle, ); } - debug!("{:?}", self.state); - debug!("{self}: Finished checking stackerdb for missed messages."); if self.state == State::Uninitialized { self.state = State::Idle; } From ebfe075420e9d4427fdbb663a82e0523b8e4240b Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Fri, 5 Apr 2024 08:39:04 -0400 Subject: [PATCH 05/11] WIP: Verify retrieved packets from stackerdb before ingesting them Signed-off-by: Jacinta Ferrant --- stacks-signer/src/signer.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 966911dc92..759c8417d0 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -217,6 +217,7 @@ impl Signer { fn get_coordinator_dkg(&self) -> (u32, PublicKey) { self.coordinator_selector.get_coordinator() } + /// Read stackerdb messages in case the signer was started late or restarted and missed incoming messages pub fn read_stackerdb_messages( &mut self, @@ -256,7 +257,17 @@ impl Signer { }; let ordered_packets = self .stackerdb - .get_packets(&self.signer_slot_ids, &packet_slots)?; + .get_packets(&self.signer_slot_ids, &packet_slots)? + .iter() + .filter_map(|packet| { + let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) { + self.get_coordinator_dkg().1 + } else { + self.get_coordinator_sign(current_reward_cycle).1 + }; + self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey) + }) + .collect::>(); if !ordered_packets.is_empty() { debug!( "{self}: Processing {} messages from stackerdb: {ordered_packets:?}", From 2aaa669c47b343d502d0b14694ae9e9b842f733f Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Mon, 8 Apr 2024 11:52:19 -0700 Subject: [PATCH 06/11] Do not attempt to process sign messages as miner messages are always overwritten so its impossible to do properly Signed-off-by: Jacinta Ferrant --- stacks-signer/src/client/stackerdb.rs | 13 ++- stacks-signer/src/runloop.rs | 22 ++-- stacks-signer/src/signer.rs | 134 ++++++++++++++---------- testnet/stacks-node/src/tests/signer.rs | 80 +------------- 4 files changed, 96 insertions(+), 153 deletions(-) diff --git a/stacks-signer/src/client/stackerdb.rs b/stacks-signer/src/client/stackerdb.rs index 20ae01d273..35082bcca6 100644 --- a/stacks-signer/src/client/stackerdb.rs +++ b/stacks-signer/src/client/stackerdb.rs @@ -208,12 +208,19 @@ impl StackerDB { Ok(messages) } - /// Get all wsts packets from stackerdb for each of the given signer IDs - pub fn get_packets( + /// Get the ordered DKG packets from stackerdb for the signer slot IDs. + pub fn get_dkg_packets( &mut self, signer_ids: &[SignerSlotID], - packet_slots: &[MessageSlotID], ) -> Result, ClientError> { + let packet_slots = &[ + MessageSlotID::DkgBegin, + MessageSlotID::DkgPublicShares, + MessageSlotID::DkgPrivateBegin, + MessageSlotID::DkgPrivateShares, + MessageSlotID::DkgEndBegin, + MessageSlotID::DkgEnd, + ]; let slot_ids = signer_ids.iter().map(|id| id.0).collect::>(); let mut packets = vec![]; for packet_slot in packet_slots { diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index 1fecb18c91..8078acab5e 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -367,6 +367,14 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { return None; } for signer in self.stacks_signers.values_mut() { + signer.refresh_coordinator(); + if signer.approved_aggregate_public_key.is_none() { + if let Err(e) = + signer.update_dkg(&self.stacks_client, res.clone(), current_reward_cycle) + { + error!("{signer}: failed to update DKG: {e}"); + } + } let event_parity = match event { Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2), // Block proposal events do have reward cycles, but each proposal has its own cycle, @@ -383,12 +391,6 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { if event_parity == Some(other_signer_parity) { continue; } - if signer.approved_aggregate_public_key.is_none() { - if let Err(e) = signer.update_dkg(&self.stacks_client) { - error!("{signer}: failed to update DKG: {e}"); - } - } - signer.refresh_coordinator(); if let Err(e) = signer.process_event( &self.stacks_client, event.as_ref(), @@ -415,14 +417,6 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { signer.commands.push_back(command.command); } } - // Check if we missed any messages due to a restart or being late to the party - if let Err(e) = signer.read_stackerdb_messages( - &self.stacks_client, - res.clone(), - current_reward_cycle, - ) { - error!("{signer}: failed to read stackerdb messages: {e}"); - } // After processing event, run the next command for each signer signer.process_next_command(&self.stacks_client, current_reward_cycle); } diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 759c8417d0..87376eccba 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -123,6 +123,15 @@ pub enum Command { }, } +/// The signer operation types that can be performed +#[derive(PartialEq, Eq, Debug, Clone)] +pub enum Operation { + /// RUnning a DKG round + Dkg, + /// Running a sign round + Sign, +} + /// The Signer state #[derive(PartialEq, Eq, Debug, Clone)] pub enum State { @@ -131,7 +140,7 @@ pub enum State { /// The signer is idle, waiting for messages and commands Idle, /// The signer is executing a DKG or Sign round - OperationInProgress, + OperationInProgress(Operation), } /// The stacks signer registered for the reward cycle @@ -218,71 +227,40 @@ impl Signer { self.coordinator_selector.get_coordinator() } - /// Read stackerdb messages in case the signer was started late or restarted and missed incoming messages - pub fn read_stackerdb_messages( + /// Read stackerdb messages in case the signer was started late or restarted and missed incoming DKG messages + pub fn read_dkg_stackerdb_messages( &mut self, stacks_client: &StacksClient, res: Sender>, current_reward_cycle: u64, ) -> Result<(), ClientError> { - // TODO: should load DKG shares first and potentially check dkg results before attempting to load any other state. - // This should be done on initialization and not on ever read right when a signer is first created. This call will then - // be called when state is equal to some sort of "in between" state where it has loaded its DKG shares if they exist - // but not yet if it missed messages and needs to respond to them. - // See https://github.com/stacks-network/stacks-core/issues/4595 if self.state != State::Uninitialized { - // We already have state. Do not load it again. + // We should only read stackerdb if we are uninitialized return Ok(()); - } - let packet_slots = if self.approved_aggregate_public_key.is_none() { - // we should read the DKG messages in order to see if we are in the middle of a DKG round - // TODO: we should check if we already computed some party shares. In which case, we should start LATER in this list - debug!("{self}: Checking stackerdb for missed DKG messages."); - vec![ - MessageSlotID::DkgBegin, - MessageSlotID::DkgPublicShares, - MessageSlotID::DkgPrivateBegin, - MessageSlotID::DkgPrivateShares, - MessageSlotID::DkgEndBegin, - MessageSlotID::DkgEnd, - ] - } else { - debug!("{self}: Checking stackerdb for missed Sign messages."); - vec![ - MessageSlotID::NonceRequest, - MessageSlotID::NonceResponse, - MessageSlotID::SignatureShareRequest, - MessageSlotID::SignatureShareResponse, - ] }; let ordered_packets = self .stackerdb - .get_packets(&self.signer_slot_ids, &packet_slots)? + .get_dkg_packets(&self.signer_slot_ids)? .iter() .filter_map(|packet| { let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) { self.get_coordinator_dkg().1 } else { - self.get_coordinator_sign(current_reward_cycle).1 + debug!( + "{self}: Received a non-DKG message in the DKG message queue. Ignoring it." + ); + return None; }; self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey) }) .collect::>(); - if !ordered_packets.is_empty() { - debug!( - "{self}: Processing {} messages from stackerdb: {ordered_packets:?}", - ordered_packets.len() - ); - self.handle_packets( - stacks_client, - res.clone(), - &ordered_packets, - current_reward_cycle, - ); - } - if self.state == State::Uninitialized { - self.state = State::Idle; - } + // We successfully read stackerdb so we are no longer uninitialized + self.state = State::Idle; + debug!( + "{self}: Processing {} DKG messages from stackerdb: {ordered_packets:?}", + ordered_packets.len() + ); + self.handle_packets(stacks_client, res, &ordered_packets, current_reward_cycle); Ok(()) } } @@ -413,8 +391,8 @@ impl Signer { } /// Update operation - fn update_operation(&mut self) { - self.state = State::OperationInProgress; + fn update_operation(&mut self, operation: Operation) { + self.state = State::OperationInProgress(operation); self.coordinator_selector.last_message_time = Some(Instant::now()); } @@ -450,6 +428,7 @@ impl Signer { return; } } + self.update_operation(Operation::Dkg); } Command::Sign { block, @@ -495,9 +474,9 @@ impl Signer { return; } } + self.update_operation(Operation::Sign); } } - self.update_operation(); } /// Attempt to process the next command in the queue, and update state accordingly @@ -534,10 +513,10 @@ impl Signer { .expect("BUG: Already asserted that the command queue was not empty"); self.execute_command(stacks_client, &command); } - State::OperationInProgress => { + State::OperationInProgress(op) => { // We cannot execute the next command until the current one is finished... debug!( - "{self}: Waiting for operation to finish. Coordinator state = {:?}", + "{self}: Waiting for {op:?} operation to finish. Coordinator state = {:?}", self.coordinator.state ); } @@ -770,9 +749,26 @@ impl Signer { self.process_operation_results(stacks_client, &operation_results); self.send_operation_results(res, operation_results); self.finish_operation(); - } else if !packets.is_empty() && self.coordinator.state != CoordinatorState::Idle { - // We have received a message and are in the middle of an operation. Update our state accordingly - self.update_operation(); + } else if !packets.is_empty() { + // We have received a message. Update our state accordingly + // Let us be extra explicit in case a new state type gets added to wsts' state machine + match &self.coordinator.state { + CoordinatorState::Idle => {} + CoordinatorState::DkgPublicDistribute + | CoordinatorState::DkgPublicGather + | CoordinatorState::DkgPrivateDistribute + | CoordinatorState::DkgPrivateGather + | CoordinatorState::DkgEndDistribute + | CoordinatorState::DkgEndGather => { + self.update_operation(Operation::Dkg); + } + CoordinatorState::NonceRequest(_, _) + | CoordinatorState::NonceGather(_, _) + | CoordinatorState::SigShareRequest(_, _) + | CoordinatorState::SigShareGather(_, _) => { + self.update_operation(Operation::Sign); + } + } } debug!("{self}: Saving signer state"); @@ -1438,7 +1434,12 @@ impl Signer { } /// Update the DKG for the provided signer info, triggering it if required - pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> { + pub fn update_dkg( + &mut self, + stacks_client: &StacksClient, + res: Sender>, + current_reward_cycle: u64, + ) -> Result<(), ClientError> { let old_dkg = self.approved_aggregate_public_key; self.approved_aggregate_public_key = stacks_client.get_approved_aggregate_key(self.reward_cycle)?; @@ -1454,8 +1455,27 @@ impl Signer { self.approved_aggregate_public_key ); } + if matches!(self.state, State::OperationInProgress(Operation::Dkg)) { + // We already have DKG, abort operation and reset state. + debug!( + "{self}: DKG has already been set. Aborting DKG round {}.", + self.coordinator.current_dkg_id + ); + self.finish_operation(); + } + if self.state == State::Uninitialized { + // If we successfully load the DKG value, we are fully initialized + self.state = State::Idle; + } return Ok(()); - }; + } + // Check if we missed any DKG messages due to a restart or being late to the party + // Note: We currently only check for DKG specific messages as we cannot rejoin a sign + // round due to a miner overwriting its own message slots (impossible to recover without every message) + if let Err(e) = self.read_dkg_stackerdb_messages(&stacks_client, res, current_reward_cycle) + { + error!("{self}: failed to read stackerdb messages: {e}"); + } if self.should_queue_dkg(stacks_client)? { info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command..."); self.commands.push_front(Command::Dkg); diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index 995da0d680..29aebb0254 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -503,7 +503,7 @@ impl SignerTest { fn get_signer_public_keys(&self, reward_cycle: u64) -> PublicKeys { let entries = self .stacks_client - .get_reward_set_signers_with_retry(reward_cycle) + .get_reward_set_signers(reward_cycle) .unwrap() .unwrap(); let entries = SignerEntries::parse(false, &entries).unwrap(); @@ -1211,84 +1211,6 @@ fn stackerdb_delayed_dkg() { ); } -#[test] -#[ignore] -/// Test that a signer can be offline when a sign round has commenced -/// and can rejoin the sign round after it has restarted -fn stackerdb_delayed_sign() { - if env::var("BITCOIND_TEST") != Ok("1".into()) { - return; - } - - tracing_subscriber::registry() - .with(fmt::layer()) - .with(EnvFilter::from_default_env()) - .init(); - - info!("------------------------- Test Setup -------------------------"); - let timeout = Duration::from_secs(200); - let mut signer_test = SignerTest::new(3); - let _key = signer_test.boot_to_epoch_3(timeout); - let rpc_bind = signer_test.running_nodes.conf.node.rpc_bind.clone(); - let run_stamp = signer_test.run_stamp; - let reward_cycle = signer_test.get_current_reward_cycle(); - let public_keys = signer_test.get_signer_public_keys(reward_cycle); - let coordinator_selector = CoordinatorSelector::from(public_keys); - let (_, coordinator_public_key) = coordinator_selector.get_coordinator(); - let coordinator_public_key = - StacksPublicKey::from_slice(coordinator_public_key.to_bytes().as_slice()).unwrap(); - - info!("------------------------- Stop Signer -------------------------"); - let mut to_stop = None; - for (idx, key) in signer_test.signer_stacks_private_keys.iter().enumerate() { - let public_key = StacksPublicKey::from_private(key); - if public_key == coordinator_public_key { - // Do not stop the coordinator. We want coordinator to start a sign round - continue; - } - // Only stop one signer - to_stop = Some(idx); - break; - } - let signer_idx = to_stop.expect("Failed to find a signer to stop"); - let signer_key = signer_test.stop_signer(signer_idx); - debug!( - "Removed signer {signer_idx} with key: {:?}, {}", - signer_key, - signer_key.to_hex() - ); - - info!("------------------------- Start Sign -------------------------"); - let sign_now = Instant::now(); - let h = std::thread::spawn(move || signer_test.mine_nakamoto_block(timeout)); - - // Sleep a bit to wait for a miner to propose a block - std::thread::sleep(Duration::from_secs(5)); - - info!("------------------------- Restart Stopped Signer -------------------------"); - let signer_config = build_signer_config_tomls( - &[signer_key], - &rpc_bind, - Some(Duration::from_millis(128)), // Timeout defaults to 5 seconds. Let's override it to 128 milliseconds. - &Network::Testnet, - "12345", - run_stamp, - 3000 + signer_idx, - ) - .pop() - .unwrap(); - - let (_cmd_send, cmd_recv) = channel(); - let (res_send, _res_recv) = channel(); - - let _signer = spawn_signer(&signer_config, cmd_recv, res_send); - info!("------------------------- Wait for Signed Block -------------------------"); - // Now the signer has come back online, it should sign the block - h.join().unwrap(); - let sign_elapsed = sign_now.elapsed(); - info!("Sign Time Elapsed: {:.2?}", sign_elapsed); -} - pub fn find_block_response(chunk_events: Vec) -> Option { for event in chunk_events.into_iter() { if event.contract_id.name.as_str() From 9f26d4afee08997b2ce9cac168c6aa2b247dac8a Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Mon, 8 Apr 2024 11:56:29 -0700 Subject: [PATCH 07/11] Remove non existent stackerdb_delayed_sign test Signed-off-by: Jacinta Ferrant --- .github/workflows/bitcoin-tests.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/bitcoin-tests.yml b/.github/workflows/bitcoin-tests.yml index 2b367d20d6..93dafc57ab 100644 --- a/.github/workflows/bitcoin-tests.yml +++ b/.github/workflows/bitcoin-tests.yml @@ -89,7 +89,6 @@ jobs: - tests::signer::stackerdb_sign_after_signer_reboot - tests::nakamoto_integrations::stack_stx_burn_op_integration_test - tests::signer::stackerdb_delayed_dkg - - tests::signer::stackerdb_delayed_sign # Do not run this one until we figure out why it fails in CI # - tests::neon_integrations::bitcoin_reorg_flap steps: From d0c887125bf3d39670f915099a07838d8ef96875 Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 18 Apr 2024 08:37:53 -0700 Subject: [PATCH 08/11] Fix test Signed-off-by: Jacinta Ferrant --- stacks-signer/src/runloop.rs | 10 ++++++ stacks-signer/src/signer.rs | 9 +++++- testnet/stacks-node/src/tests/signer.rs | 43 +++++++++---------------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index c16de3309a..db2e7d90dd 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -415,6 +415,16 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { signer.commands.push_back(command.command); } } + // Check if we missed any DKG messages due to a restart or being late to the party + // Note: We currently only check for DKG specific messages as we cannot rejoin a sign + // round due to a miner overwriting its own message slots (impossible to recover without every message) + if let Err(e) = signer.read_dkg_stackerdb_messages( + &self.stacks_client, + res.clone(), + current_reward_cycle, + ) { + error!("{signer}: failed to read stackerdb messages: {e}"); + } // After processing event, run the next command for each signer signer.process_next_command(&self.stacks_client, current_reward_cycle); } diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 47e3057a24..0597ede7a1 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -1418,11 +1418,15 @@ impl Signer { // TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key. // Need to update state to store the necessary info, check against it to see if we have participated in the winning round and // then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate. + let internal_dkg = self.coordinator.aggregate_public_key; + if internal_dkg != self.approved_aggregate_public_key { + warn!("{self}: we do not support changing the internal DKG key yet. Expected {internal_dkg:?} got {:?}", self.approved_aggregate_public_key); + } self.coordinator .set_aggregate_public_key(self.approved_aggregate_public_key); if old_dkg != self.approved_aggregate_public_key { warn!( - "{self}: updated DKG value to {:?}.", + "{self}: updated DKG value from {old_dkg:?} to {:?}.", self.approved_aggregate_public_key ); } @@ -1432,6 +1436,9 @@ impl Signer { self.coordinator.current_dkg_id ); self.finish_operation(); + } else if self.state == State::Uninitialized { + // If we successfully load the DKG value, we are fully initialized + self.state = State::Idle; } } else if should_queue { if self.commands.front() != Some(&Command::Dkg) { diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index 29aebb0254..87df9b2dfd 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -59,7 +59,7 @@ use crate::tests::bitcoin_regtest::BitcoinCoreController; use crate::tests::nakamoto_integrations::{ boot_to_epoch_3_reward_set, boot_to_epoch_3_reward_set_calculation_boundary, naka_neon_integration_conf, next_block_and, next_block_and_mine_commit, - POX_4_DEFAULT_STACKER_BALANCE, + next_block_and_process_new_stacks_block, POX_4_DEFAULT_STACKER_BALANCE, }; use crate::tests::neon_integrations::{ next_block_and_wait, run_until_burnchain_height, test_observer, wait_for_runloop, @@ -1162,44 +1162,33 @@ fn stackerdb_delayed_dkg() { ); info!("------------------------- Start DKG -------------------------"); - let height = signer_test - .running_nodes - .btc_regtest_controller - .get_headers_height(); + info!("Waiting for DKG to start..."); // Advance one more to trigger DKG - run_until_burnchain_height( + next_block_and( &mut signer_test.running_nodes.btc_regtest_controller, - &signer_test.running_nodes.blocks_processed, - height.wrapping_add(1), - &signer_test.running_nodes.conf, - ); + timeout.as_secs(), + || Ok(true), + ) + .expect("Failed to mine bitcoin block"); // Wait a bit so DKG is actually triggered and signers are not available to respond std::thread::sleep(Duration::from_secs(5)); - // Make sure DKG did not get set - assert!(signer_test - .stacks_client - .get_approved_aggregate_key(reward_cycle) - .expect("Failed to get approved aggregate key") - .is_none()); - info!("------------------------- Restart Stopped Signer -------------------------"); signer_test.restart_signer(signer_idx, signer_key); info!("------------------------- Wait for DKG -------------------------"); let key = signer_test.wait_for_dkg(timeout); - let height = signer_test - .running_nodes - .btc_regtest_controller - .get_headers_height(); - // Advance one more to mine dkg transactions - run_until_burnchain_height( + // Sleep a bit to make sure the transactions are broadcast. + std::thread::sleep(Duration::from_secs(1)); + // Mine a block and make sure the votes were mined + next_block_and_process_new_stacks_block( &mut signer_test.running_nodes.btc_regtest_controller, - &signer_test.running_nodes.blocks_processed, - height.wrapping_add(1), - &signer_test.running_nodes.conf, - ); + timeout.as_secs(), + &signer_test.running_nodes.coord_channel, + ) + .unwrap(); + // Make sure DKG did get set assert_eq!( key, From 286ea1d737dcfac6f760abdd281e7c77c111d42e Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 18 Apr 2024 09:33:09 -0700 Subject: [PATCH 09/11] Move reading of stackerdb to refresh dkg call to make logic easier to follow Signed-off-by: Jacinta Ferrant --- stacks-signer/src/runloop.rs | 14 ++----- stacks-signer/src/signer.rs | 79 ++++++++++++++++++++++++------------ 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/stacks-signer/src/runloop.rs b/stacks-signer/src/runloop.rs index db2e7d90dd..3b550e20dd 100644 --- a/stacks-signer/src/runloop.rs +++ b/stacks-signer/src/runloop.rs @@ -384,7 +384,9 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { continue; } if signer.approved_aggregate_public_key.is_none() { - if let Err(e) = signer.refresh_dkg(&self.stacks_client) { + if let Err(e) = + signer.refresh_dkg(&self.stacks_client, res.clone(), current_reward_cycle) + { error!("{signer}: failed to refresh DKG: {e}"); } } @@ -415,16 +417,6 @@ impl SignerRunLoop, RunLoopCommand> for RunLoop { signer.commands.push_back(command.command); } } - // Check if we missed any DKG messages due to a restart or being late to the party - // Note: We currently only check for DKG specific messages as we cannot rejoin a sign - // round due to a miner overwriting its own message slots (impossible to recover without every message) - if let Err(e) = signer.read_dkg_stackerdb_messages( - &self.stacks_client, - res.clone(), - current_reward_cycle, - ) { - error!("{signer}: failed to read stackerdb messages: {e}"); - } // After processing event, run the next command for each signer signer.process_next_command(&self.stacks_client, current_reward_cycle); } diff --git a/stacks-signer/src/signer.rs b/stacks-signer/src/signer.rs index 0597ede7a1..8108c1d50b 100644 --- a/stacks-signer/src/signer.rs +++ b/stacks-signer/src/signer.rs @@ -241,7 +241,7 @@ impl Signer { if self.state != State::Uninitialized { // We should only read stackerdb if we are uninitialized return Ok(()); - }; + } let ordered_packets = self .stackerdb .get_dkg_packets(&self.signer_slot_ids)? @@ -1403,14 +1403,44 @@ impl Signer { } } - /// Refresh DKG value and queue DKG command if necessary - pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> { - // First check if we should queue DKG based on contract vote state and stackerdb transactions - let should_queue = self.should_queue_dkg(stacks_client)?; - // Before queueing the command, check one last time if DKG has been - // approved. It could have happened after the last call to - // `get_approved_aggregate_key` but before the theshold check in - // `should_queue_dkg`. + /// Refresh DKG and queue it if required + pub fn refresh_dkg( + &mut self, + stacks_client: &StacksClient, + res: Sender>, + current_reward_cycle: u64, + ) -> Result<(), ClientError> { + // First attempt to retrieve the aggregate key from the contract. + self.update_approved_aggregate_key(stacks_client)?; + if self.approved_aggregate_public_key.is_some() { + return Ok(()); + } + // Check stackerdb for any missed DKG messages to catch up our state. + self.read_dkg_stackerdb_messages(&stacks_client, res, current_reward_cycle)?; + // Check if we should still queue DKG + if !self.should_queue_dkg(stacks_client)? { + return Ok(()); + } + // Because there could be a slight delay in reading pending transactions and a key being approved by the contract, + // check one last time if the approved key was set since we finished the should queue dkg call + self.update_approved_aggregate_key(stacks_client)?; + if self.approved_aggregate_public_key.is_some() { + return Ok(()); + } + if self.commands.front() != Some(&Command::Dkg) { + info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command..."); + self.commands.push_front(Command::Dkg); + } else { + debug!("{self}: DKG command already queued..."); + } + Ok(()) + } + + /// Overwrites the approved aggregate key to the value in the contract, updating state accordingly + pub fn update_approved_aggregate_key( + &mut self, + stacks_client: &StacksClient, + ) -> Result<(), ClientError> { let old_dkg = self.approved_aggregate_public_key; self.approved_aggregate_public_key = stacks_client.get_approved_aggregate_key(self.reward_cycle)?; @@ -1430,22 +1460,21 @@ impl Signer { self.approved_aggregate_public_key ); } - if let State::OperationInProgress(Operation::Dkg) = self.state { - debug!( - "{self}: DKG has already been set. Aborting DKG operation {}.", - self.coordinator.current_dkg_id - ); - self.finish_operation(); - } else if self.state == State::Uninitialized { - // If we successfully load the DKG value, we are fully initialized - self.state = State::Idle; - } - } else if should_queue { - if self.commands.front() != Some(&Command::Dkg) { - info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command..."); - self.commands.push_front(Command::Dkg); - } else { - debug!("{self}: DKG command already queued..."); + match self.state { + State::OperationInProgress(Operation::Dkg) => { + debug!( + "{self}: DKG has already been set. Aborting DKG operation {}.", + self.coordinator.current_dkg_id + ); + self.finish_operation(); + } + State::Uninitialized => { + // If we successfully load the DKG value, we are fully initialized + self.state = State::Idle; + } + _ => { + // do nothing + } } } Ok(()) From 80057807f5810b214b396a477e219ca4ad7f91af Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 25 Apr 2024 08:08:53 -0700 Subject: [PATCH 10/11] Increase timeouts so test doesn't flake Signed-off-by: Jacinta Ferrant --- testnet/stacks-node/src/tests/signer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index e8787863af..f0ae3b6702 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -1180,7 +1180,7 @@ fn stackerdb_delayed_dkg() { info!("------------------------- Wait for DKG -------------------------"); let key = signer_test.wait_for_dkg(timeout); // Sleep a bit to make sure the transactions are broadcast. - std::thread::sleep(Duration::from_secs(1)); + std::thread::sleep(Duration::from_secs(10)); // Mine a block and make sure the votes were mined next_block_and_process_new_stacks_block( &mut signer_test.running_nodes.btc_regtest_controller, @@ -1188,7 +1188,8 @@ fn stackerdb_delayed_dkg() { &signer_test.running_nodes.coord_channel, ) .unwrap(); - + // Sleep a bit to make sure the contract gets updated + std::thread::sleep(Duration::from_secs(5)); // Make sure DKG did get set assert_eq!( key, From ff96f8c31ef8e0a5ec2596e5d15f05ad8f71d2a1 Mon Sep 17 00:00:00 2001 From: Jacinta Ferrant Date: Thu, 25 Apr 2024 11:44:18 -0700 Subject: [PATCH 11/11] Do it right or don't do it at all. Use stackerdb to know when to trigger next steps Signed-off-by: Jacinta Ferrant --- testnet/stacks-node/src/tests/signer.rs | 105 ++++++++++++++++++++---- 1 file changed, 88 insertions(+), 17 deletions(-) diff --git a/testnet/stacks-node/src/tests/signer.rs b/testnet/stacks-node/src/tests/signer.rs index 877471a930..68515d130c 100644 --- a/testnet/stacks-node/src/tests/signer.rs +++ b/testnet/stacks-node/src/tests/signer.rs @@ -49,6 +49,7 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; use wsts::curve::point::Point; use wsts::curve::scalar::Scalar; +use wsts::net::Message; use wsts::state_machine::{OperationResult, PublicKeys}; use crate::config::{Config as NeonConfig, EventKeyType, EventObserverConfig, InitialBalance}; @@ -59,7 +60,7 @@ use crate::tests::bitcoin_regtest::BitcoinCoreController; use crate::tests::nakamoto_integrations::{ boot_to_epoch_3_reward_set, boot_to_epoch_3_reward_set_calculation_boundary, naka_neon_integration_conf, next_block_and, next_block_and_mine_commit, - next_block_and_process_new_stacks_block, POX_4_DEFAULT_STACKER_BALANCE, + POX_4_DEFAULT_STACKER_BALANCE, }; use crate::tests::neon_integrations::{ next_block_and_wait, run_until_burnchain_height, test_observer, wait_for_runloop, @@ -1136,7 +1137,8 @@ fn stackerdb_delayed_dkg() { info!("------------------------- Test Setup -------------------------"); let timeout = Duration::from_secs(200); - let mut signer_test = SignerTest::new(3); + let num_signers = 3; + let mut signer_test = SignerTest::new(num_signers); boot_to_epoch_3_reward_set_calculation_boundary( &signer_test.running_nodes.conf, &signer_test.running_nodes.blocks_processed, @@ -1150,7 +1152,22 @@ fn stackerdb_delayed_dkg() { let (_, coordinator_public_key) = coordinator_selector.get_coordinator(); let coordinator_public_key = StacksPublicKey::from_slice(coordinator_public_key.to_bytes().as_slice()).unwrap(); - + let signer_slot_ids: Vec<_> = (0..num_signers) + .into_iter() + .map(|i| SignerSlotID(i as u32)) + .collect(); + let mut stackerdbs: Vec<_> = signer_slot_ids + .iter() + .map(|i| { + StackerDB::new( + &signer_test.running_nodes.conf.node.rpc_bind, + StacksPrivateKey::new(), // Doesn't matter what key we use. We are just reading, not writing + false, + reward_cycle, + *i, + ) + }) + .collect(); info!("------------------------- Stop Signers -------------------------"); let mut to_stop = None; for (idx, key) in signer_test.signer_stacks_private_keys.iter().enumerate() { @@ -1170,7 +1187,6 @@ fn stackerdb_delayed_dkg() { signer_key, signer_key.to_hex() ); - info!("------------------------- Start DKG -------------------------"); info!("Waiting for DKG to start..."); // Advance one more to trigger DKG @@ -1180,8 +1196,31 @@ fn stackerdb_delayed_dkg() { || Ok(true), ) .expect("Failed to mine bitcoin block"); - // Wait a bit so DKG is actually triggered and signers are not available to respond - std::thread::sleep(Duration::from_secs(5)); + // Do not proceed until we guarantee that DKG was triggered + let start_time = Instant::now(); + loop { + let stackerdb = stackerdbs.first_mut().unwrap(); + let dkg_packets: Vec<_> = stackerdb + .get_dkg_packets(&signer_slot_ids) + .expect("Failed to get dkg packets"); + let begin_packets: Vec<_> = dkg_packets + .iter() + .filter_map(|packet| { + if matches!(packet.msg, Message::DkgBegin(_)) { + Some(packet) + } else { + None + } + }) + .collect(); + if !begin_packets.is_empty() { + break; + } + assert!( + start_time.elapsed() < Duration::from_secs(30), + "Timed out waiting for DKG to be triggered" + ); + } info!("------------------------- Restart Stopped Signer -------------------------"); @@ -1189,17 +1228,49 @@ fn stackerdb_delayed_dkg() { info!("------------------------- Wait for DKG -------------------------"); let key = signer_test.wait_for_dkg(timeout); - // Sleep a bit to make sure the transactions are broadcast. - std::thread::sleep(Duration::from_secs(10)); - // Mine a block and make sure the votes were mined - next_block_and_process_new_stacks_block( - &mut signer_test.running_nodes.btc_regtest_controller, - timeout.as_secs(), - &signer_test.running_nodes.coord_channel, - ) - .unwrap(); - // Sleep a bit to make sure the contract gets updated - std::thread::sleep(Duration::from_secs(5)); + let mut transactions = HashSet::with_capacity(num_signers); + let start_time = Instant::now(); + while transactions.len() < num_signers { + for stackerdb in stackerdbs.iter_mut() { + let current_transactions = stackerdb + .get_current_transactions() + .expect("Failed getting current transactions for signer slot id"); + for tx in current_transactions { + transactions.insert(tx.txid()); + } + } + assert!( + start_time.elapsed() < Duration::from_secs(30), + "Failed to retrieve pending vote transactions within timeout" + ); + } + + // Make sure transactions get mined + let start_time = Instant::now(); + while !transactions.is_empty() { + assert!( + start_time.elapsed() < Duration::from_secs(30), + "Failed to mine transactions within timeout" + ); + next_block_and_wait( + &mut signer_test.running_nodes.btc_regtest_controller, + &signer_test.running_nodes.blocks_processed, + ); + let blocks = test_observer::get_blocks(); + for block in blocks.iter() { + let txs = block.get("transactions").unwrap().as_array().unwrap(); + for tx in txs.iter() { + let raw_tx = tx.get("raw_tx").unwrap().as_str().unwrap(); + if raw_tx == "0x00" { + continue; + } + let tx_bytes = hex_bytes(&raw_tx[2..]).unwrap(); + let parsed = StacksTransaction::consensus_deserialize(&mut &tx_bytes[..]).unwrap(); + transactions.remove(&parsed.txid()); + } + } + } + // Make sure DKG did get set assert_eq!( key,