Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug/dkg results #4660

Merged
merged 7 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,9 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
if event_parity == Some(other_signer_parity) {
continue;
}

if signer.approved_aggregate_public_key.is_none() {
if let Err(e) = signer.update_dkg(&self.stacks_client) {
error!("{signer}: failed to update DKG: {e}");
if let Err(e) = signer.refresh_dkg(&self.stacks_client) {
error!("{signer}: failed to refresh DKG: {e}");
}
}
signer.refresh_coordinator();
Expand Down
143 changes: 102 additions & 41 deletions stacks-signer/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::time::Instant;
use blockstack_lib::chainstate::burn::ConsensusHashExtensions;
use blockstack_lib::chainstate::nakamoto::signer_set::NakamotoSigners;
use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockVote};
use blockstack_lib::chainstate::stacks::boot::SIGNERS_VOTING_FUNCTION_NAME;
use blockstack_lib::chainstate::stacks::StacksTransaction;
use blockstack_lib::net::api::postblock_proposal::BlockValidateResponse;
use hashbrown::HashSet;
Expand Down Expand Up @@ -123,13 +124,22 @@ pub enum Command {
},
}

/// The specific operations that a signer can perform
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Operation {
/// A DKG operation
Dkg,
/// A Sign operation
Sign,
}

/// The Signer state
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum State {
/// The signer is idle, waiting for messages and commands
Idle,
/// The signer is executing a DKG or Sign round
OperationInProgress,
OperationInProgress(Operation),
}

/// The stacks signer registered for the reward cycle
Expand Down Expand Up @@ -343,8 +353,8 @@ impl Signer {
}

/// Update operation
fn update_operation(&mut self) {
self.state = State::OperationInProgress;
fn update_operation(&mut self, operation: Operation) {
self.state = State::OperationInProgress(operation);
self.coordinator_selector.last_message_time = Some(Instant::now());
}

Expand Down Expand Up @@ -374,6 +384,7 @@ impl Signer {
Ok(msg) => {
let ack = self.stackerdb.send_message_with_retry(msg.into());
debug!("{self}: ACK: {ack:?}",);
self.update_operation(Operation::Dkg);
}
Err(e) => {
error!("{self}: Failed to start DKG: {e:?}",);
Expand Down Expand Up @@ -419,6 +430,7 @@ impl Signer {
.unwrap_or_else(|e| {
error!("{self}: Failed to insert block in DB: {e:?}");
});
self.update_operation(Operation::Sign);
}
Err(e) => {
error!("{self}: Failed to start signing block: {e:?}",);
Expand All @@ -427,7 +439,6 @@ impl Signer {
}
}
}
self.update_operation();
}

/// Attempt to process the next command in the queue, and update state accordingly
Expand Down Expand Up @@ -460,10 +471,10 @@ impl Signer {
.expect("BUG: Already asserted that the command queue was not empty");
self.execute_command(stacks_client, &command);
}
State::OperationInProgress => {
State::OperationInProgress(op) => {
// We cannot execute the next command until the current one is finished...
debug!(
"{self}: Waiting for operation to finish. Coordinator state = {:?}",
"{self}: Waiting for {op:?} operation to finish. Coordinator state = {:?}",
self.coordinator.state
);
}
Expand Down Expand Up @@ -696,9 +707,26 @@ impl Signer {
self.process_operation_results(stacks_client, &operation_results);
self.send_operation_results(res, operation_results);
self.finish_operation();
} else if !packets.is_empty() && self.coordinator.state != CoordinatorState::Idle {
// We have received a message and are in the middle of an operation. Update our state accordingly
self.update_operation();
} else if !packets.is_empty() {
// We have received a message. Update our state accordingly
// Let us be extra explicit in case a new state type gets added to wsts' state machine
match &self.coordinator.state {
CoordinatorState::Idle => {}
CoordinatorState::DkgPublicDistribute
| CoordinatorState::DkgPublicGather
| CoordinatorState::DkgPrivateDistribute
| CoordinatorState::DkgPrivateGather
| CoordinatorState::DkgEndDistribute
| CoordinatorState::DkgEndGather => {
self.update_operation(Operation::Dkg);
}
CoordinatorState::NonceRequest(_, _)
| CoordinatorState::NonceGather(_, _)
| CoordinatorState::SigShareRequest(_, _)
| CoordinatorState::SigShareGather(_, _) => {
self.update_operation(Operation::Sign);
}
}
}

debug!("{self}: Saving signer state");
Expand Down Expand Up @@ -1016,6 +1044,10 @@ impl Signer {
/// Process a dkg result by broadcasting a vote to the stacks node
fn process_dkg(&mut self, stacks_client: &StacksClient, dkg_public_key: &Point) {
let mut dkg_results_bytes = vec![];
debug!(
"{self}: Received DKG result. Broadcasting vote to the stacks node...";
"dkg_public_key" => %dkg_public_key
);
if let Err(e) = SignerMessage::serialize_dkg_result(
&mut dkg_results_bytes,
dkg_public_key,
Expand Down Expand Up @@ -1273,7 +1305,49 @@ impl Signer {
}
}

/// Refresh DKG value and queue DKG command if necessary
pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
// First check if we should queue DKG based on contract vote state and stackerdb transactions
let should_queue = self.should_queue_dkg(stacks_client)?;
// Before queueing the command, check one last time if DKG has been
// approved. It could have happened after the last call to
// `get_approved_aggregate_key` but before the theshold check in
// `should_queue_dkg`.
let old_dkg = self.approved_aggregate_public_key;
self.approved_aggregate_public_key =
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
if self.approved_aggregate_public_key.is_some() {
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
self.coordinator
.set_aggregate_public_key(self.approved_aggregate_public_key);
if old_dkg != self.approved_aggregate_public_key {
warn!(
"{self}: updated DKG value to {:?}.",
self.approved_aggregate_public_key
);
}
if let State::OperationInProgress(Operation::Dkg) = self.state {
debug!(
"{self}: DKG has already been set. Aborting DKG operation {}.",
hstove marked this conversation as resolved.
Show resolved Hide resolved
self.coordinator.current_dkg_id
);
self.finish_operation();
}
} else if should_queue {
if self.commands.front() != Some(&Command::Dkg) {
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
self.commands.push_front(Command::Dkg);
} else {
debug!("{self}: DKG command already queued...");
}
}
Ok(())
}

/// Should DKG be queued to the current signer's command queue
/// This assumes that no key has been approved by the contract yet
pub fn should_queue_dkg(&mut self, stacks_client: &StacksClient) -> Result<bool, ClientError> {
if self.state != State::Idle
|| self.signer_id != self.get_coordinator_dkg().0
Expand All @@ -1283,6 +1357,25 @@ impl Signer {
return Ok(false);
}
let signer_address = stacks_client.get_signer_address();
let account_nonces = self.get_account_nonces(stacks_client, &[*signer_address]);
let old_transactions = self.get_signer_transactions(&account_nonces).map_err(|e| {
warn!("{self}: Failed to get old signer transactions: {e:?}. May trigger DKG unnecessarily");
}).unwrap_or_default();
// Check if we have an existing vote transaction for the same round and reward cycle
for transaction in old_transactions.iter() {
let params =
NakamotoSigners::parse_vote_for_aggregate_public_key(transaction).unwrap_or_else(|| panic!("BUG: {self}: Received an invalid {SIGNERS_VOTING_FUNCTION_NAME} transaction in an already filtered list: {transaction:?}"));
if Some(params.aggregate_key) == self.coordinator.aggregate_public_key
&& params.voting_round == self.coordinator.current_dkg_id
{
debug!("{self}: Not triggering a DKG round. Already have a pending vote transaction.";
"txid" => %transaction.txid(),
"aggregate_key" => %params.aggregate_key,
"voting_round" => params.voting_round
);
return Ok(false);
}
}
if let Some(aggregate_key) = stacks_client.get_vote_for_aggregate_public_key(
self.coordinator.current_dkg_id,
self.reward_cycle,
Expand Down Expand Up @@ -1311,12 +1404,6 @@ impl Signer {
);
return Ok(false);
}
warn!("{self}: Vote for DKG failed.";
"voting_round" => self.coordinator.current_dkg_id,
"aggregate_key" => %aggregate_key,
"round_weight" => round_weight,
"threshold_weight" => threshold_weight
);
} else {
// Have I already voted, but the vote is still pending in StackerDB? Check stackerdb for the same round number and reward cycle vote transaction
// Only get the account nonce of THIS signer as we only care about our own votes, not other signer votes
Expand Down Expand Up @@ -1363,32 +1450,6 @@ impl Signer {
Ok(true)
}

/// Update the DKG for the provided signer info, triggering it if required
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
let old_dkg = self.approved_aggregate_public_key;
self.approved_aggregate_public_key =
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
if self.approved_aggregate_public_key.is_some() {
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
self.coordinator
.set_aggregate_public_key(self.approved_aggregate_public_key);
if old_dkg != self.approved_aggregate_public_key {
warn!(
"{self}: updated DKG value to {:?}.",
self.approved_aggregate_public_key
);
}
return Ok(());
};
if self.should_queue_dkg(stacks_client)? {
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
self.commands.push_front(Command::Dkg);
}
Ok(())
}

/// Process the event
pub fn process_event(
&mut self,
Expand Down