Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/signers read stackerdb #4658

Merged
merged 14 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,13 @@ jobs:
- tests::nakamoto_integrations::vote_for_aggregate_key_burn_op
- tests::nakamoto_integrations::follower_bootup
- tests::signer::stackerdb_dkg
- tests::signer::stackerdb_sign
- tests::signer::stackerdb_sign_request_rejected
jferrant marked this conversation as resolved.
Show resolved Hide resolved
- tests::signer::stackerdb_block_proposal
- tests::signer::stackerdb_filter_bad_transactions
- tests::signer::stackerdb_mine_2_nakamoto_reward_cycles
- tests::signer::stackerdb_sign_after_signer_reboot
- tests::nakamoto_integrations::stack_stx_burn_op_integration_test
- tests::signer::stackerdb_delayed_dkg
# Do not run this one until we figure out why it fails in CI
# - tests::neon_integrations::bitcoin_reorg_flap
steps:
Expand Down
75 changes: 55 additions & 20 deletions stacks-signer/src/client/stackerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use slog::{slog_debug, slog_warn};
use stacks_common::codec::{read_next, StacksMessageCodec};
use stacks_common::types::chainstate::StacksPrivateKey;
use stacks_common::{debug, warn};
use wsts::net::Packet;

use super::ClientError;
use crate::client::retry_with_exponential_backoff;
Expand Down Expand Up @@ -179,45 +180,79 @@ impl StackerDB {
}
}

/// Get the transactions from stackerdb for the signers
fn get_transactions(
transactions_session: &mut StackerDBSession,
signer_ids: &[SignerSlotID],
) -> Result<Vec<StacksTransaction>, ClientError> {
/// Get all signer messages from stackerdb for the given slot IDs
fn get_messages(
session: &mut StackerDBSession,
slot_ids: &[u32],
) -> Result<Vec<SignerMessage>, ClientError> {
let mut messages = vec![];
let send_request = || {
transactions_session
.get_latest_chunks(&signer_ids.iter().map(|id| id.0).collect::<Vec<_>>())
session
.get_latest_chunks(slot_ids)
.map_err(backoff::Error::transient)
};
let chunk_ack = retry_with_exponential_backoff(send_request)?;
let mut transactions = Vec::new();
for (i, chunk) in chunk_ack.iter().enumerate() {
let signer_id = *signer_ids
.get(i)
.expect("BUG: retrieved an unequal amount of chunks to requested chunks");
let Some(data) = chunk else {
continue;
};
let Ok(message) = read_next::<SignerMessage, _>(&mut &data[..]) else {
if !data.is_empty() {
warn!("Failed to deserialize chunk data into a SignerMessage");
debug!(
"signer #{signer_id}: Failed chunk ({}): {data:?}",
&data.len(),
);
debug!("slot #{i}: Failed chunk ({}): {data:?}", &data.len(),);
}
continue;
};
messages.push(message);
}
Ok(messages)
}

/// Get the ordered DKG packets from stackerdb for the signer slot IDs.
pub fn get_dkg_packets(
&mut self,
signer_ids: &[SignerSlotID],
) -> Result<Vec<Packet>, ClientError> {
let packet_slots = &[
MessageSlotID::DkgBegin,
MessageSlotID::DkgPublicShares,
MessageSlotID::DkgPrivateBegin,
MessageSlotID::DkgPrivateShares,
MessageSlotID::DkgEndBegin,
MessageSlotID::DkgEnd,
];
let slot_ids = signer_ids.iter().map(|id| id.0).collect::<Vec<_>>();
let mut packets = vec![];
for packet_slot in packet_slots {
let session = self
.signers_message_stackerdb_sessions
.get_mut(packet_slot)
.ok_or(ClientError::NotConnected)?;
let messages = Self::get_messages(session, &slot_ids)?;
for message in messages {
let SignerMessage::Packet(packet) = message else {
warn!("Found an unexpected type in a packet slot {packet_slot}");
continue;
};
packets.push(packet);
}
}
Ok(packets)
}

/// Get the transactions from stackerdb for the signers
fn get_transactions(
transactions_session: &mut StackerDBSession,
signer_ids: &[SignerSlotID],
) -> Result<Vec<StacksTransaction>, ClientError> {
let slot_ids = signer_ids.iter().map(|id| id.0).collect::<Vec<_>>();
let messages = Self::get_messages(transactions_session, &slot_ids)?;
let mut transactions = vec![];
for message in messages {
let SignerMessage::Transactions(chunk_transactions) = message else {
warn!("Signer wrote an unexpected type to the transactions slot");
continue;
};
debug!(
"Retrieved {} transactions from signer ID {}.",
chunk_transactions.len(),
signer_id
);
transactions.extend(chunk_transactions);
}
Ok(transactions)
Expand Down
16 changes: 9 additions & 7 deletions stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
return None;
}
for signer in self.stacks_signers.values_mut() {
signer.refresh_coordinator();
if signer.approved_aggregate_public_key.is_none() {
if let Err(e) =
signer.update_dkg(&self.stacks_client, res.clone(), current_reward_cycle)
{
error!("{signer}: failed to update DKG: {e}");
}
}
let event_parity = match event {
Some(SignerEvent::BlockValidationResponse(_)) => Some(current_reward_cycle % 2),
// Block proposal events do have reward cycles, but each proposal has its own cycle,
Expand All @@ -383,13 +391,6 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
if event_parity == Some(other_signer_parity) {
continue;
}

if signer.approved_aggregate_public_key.is_none() {
if let Err(e) = signer.update_dkg(&self.stacks_client) {
error!("{signer}: failed to update DKG: {e}");
}
}
signer.refresh_coordinator();
if let Err(e) = signer.process_event(
&self.stacks_client,
event.as_ref(),
Expand Down Expand Up @@ -422,6 +423,7 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
None
}
}

#[cfg(test)]
mod tests {
use blockstack_lib::chainstate::stacks::boot::NakamotoSignerEntry;
Expand Down
118 changes: 106 additions & 12 deletions stacks-signer/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,24 @@ pub enum Command {
},
}

/// The signer operation types that can be performed
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Operation {
/// RUnning a DKG round
Dkg,
/// Running a sign round
Sign,
}

/// The Signer state
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum State {
/// The signer is uninitialized and should read stackerdb to restore state
Uninitialized,
/// The signer is idle, waiting for messages and commands
Idle,
/// The signer is executing a DKG or Sign round
OperationInProgress,
OperationInProgress(Operation),
}

/// The stacks signer registered for the reward cycle
Expand Down Expand Up @@ -215,6 +226,43 @@ impl Signer {
fn get_coordinator_dkg(&self) -> (u32, PublicKey) {
self.coordinator_selector.get_coordinator()
}

/// Read stackerdb messages in case the signer was started late or restarted and missed incoming DKG messages
pub fn read_dkg_stackerdb_messages(
&mut self,
stacks_client: &StacksClient,
res: Sender<Vec<OperationResult>>,
current_reward_cycle: u64,
) -> Result<(), ClientError> {
if self.state != State::Uninitialized {
// We should only read stackerdb if we are uninitialized
return Ok(());
};
let ordered_packets = self
.stackerdb
.get_dkg_packets(&self.signer_slot_ids)?
.iter()
.filter_map(|packet| {
let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) {
self.get_coordinator_dkg().1
} else {
debug!(
"{self}: Received a non-DKG message in the DKG message queue. Ignoring it."
jferrant marked this conversation as resolved.
Show resolved Hide resolved
);
return None;
};
self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey)
})
.collect::<Vec<_>>();
// We successfully read stackerdb so we are no longer uninitialized
self.state = State::Idle;
debug!(
"{self}: Processing {} DKG messages from stackerdb: {ordered_packets:?}",
ordered_packets.len()
);
self.handle_packets(stacks_client, res, &ordered_packets, current_reward_cycle);
Ok(())
}
}

impl From<SignerConfig> for Signer {
Expand Down Expand Up @@ -290,7 +338,7 @@ impl From<SignerConfig> for Signer {
Self {
coordinator,
state_machine,
state: State::Idle,
state: State::Uninitialized,
commands: VecDeque::new(),
stackerdb,
mainnet: signer_config.mainnet,
Expand Down Expand Up @@ -343,8 +391,8 @@ impl Signer {
}

/// Update operation
fn update_operation(&mut self) {
self.state = State::OperationInProgress;
fn update_operation(&mut self, operation: Operation) {
self.state = State::OperationInProgress(operation);
self.coordinator_selector.last_message_time = Some(Instant::now());
}

Expand Down Expand Up @@ -380,6 +428,7 @@ impl Signer {
return;
}
}
self.update_operation(Operation::Dkg);
}
Command::Sign {
block,
Expand Down Expand Up @@ -425,9 +474,9 @@ impl Signer {
return;
}
}
self.update_operation(Operation::Sign);
}
}
self.update_operation();
}

/// Attempt to process the next command in the queue, and update state accordingly
Expand All @@ -437,6 +486,10 @@ impl Signer {
current_reward_cycle: u64,
) {
match &self.state {
State::Uninitialized => {
// We cannot process any commands until we have restored our state
warn!("{self}: Cannot process commands until state is restored. Waiting...");
}
State::Idle => {
let Some(command) = self.commands.front() else {
debug!("{self}: Nothing to process. Waiting for command...");
Expand All @@ -460,10 +513,10 @@ impl Signer {
.expect("BUG: Already asserted that the command queue was not empty");
self.execute_command(stacks_client, &command);
}
State::OperationInProgress => {
State::OperationInProgress(op) => {
// We cannot execute the next command until the current one is finished...
debug!(
"{self}: Waiting for operation to finish. Coordinator state = {:?}",
"{self}: Waiting for {op:?} operation to finish. Coordinator state = {:?}",
self.coordinator.state
);
}
Expand Down Expand Up @@ -696,9 +749,26 @@ impl Signer {
self.process_operation_results(stacks_client, &operation_results);
self.send_operation_results(res, operation_results);
self.finish_operation();
} else if !packets.is_empty() && self.coordinator.state != CoordinatorState::Idle {
// We have received a message and are in the middle of an operation. Update our state accordingly
self.update_operation();
} else if !packets.is_empty() {
// We have received a message. Update our state accordingly
// Let us be extra explicit in case a new state type gets added to wsts' state machine
match &self.coordinator.state {
CoordinatorState::Idle => {}
CoordinatorState::DkgPublicDistribute
| CoordinatorState::DkgPublicGather
| CoordinatorState::DkgPrivateDistribute
| CoordinatorState::DkgPrivateGather
| CoordinatorState::DkgEndDistribute
| CoordinatorState::DkgEndGather => {
self.update_operation(Operation::Dkg);
}
CoordinatorState::NonceRequest(_, _)
| CoordinatorState::NonceGather(_, _)
| CoordinatorState::SigShareRequest(_, _)
| CoordinatorState::SigShareGather(_, _) => {
self.update_operation(Operation::Sign);
}
}
}

debug!("{self}: Saving signer state");
Expand Down Expand Up @@ -1364,7 +1434,12 @@ impl Signer {
}

/// Update the DKG for the provided signer info, triggering it if required
pub fn update_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
pub fn update_dkg(
&mut self,
stacks_client: &StacksClient,
res: Sender<Vec<OperationResult>>,
current_reward_cycle: u64,
) -> Result<(), ClientError> {
let old_dkg = self.approved_aggregate_public_key;
self.approved_aggregate_public_key =
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
Expand All @@ -1380,8 +1455,27 @@ impl Signer {
self.approved_aggregate_public_key
);
}
if matches!(self.state, State::OperationInProgress(Operation::Dkg)) {
// We already have DKG, abort operation and reset state.
debug!(
"{self}: DKG has already been set. Aborting DKG round {}.",
self.coordinator.current_dkg_id
);
self.finish_operation();
}
jferrant marked this conversation as resolved.
Show resolved Hide resolved
if self.state == State::Uninitialized {
// If we successfully load the DKG value, we are fully initialized
self.state = State::Idle;
}
return Ok(());
};
}
// Check if we missed any DKG messages due to a restart or being late to the party
// Note: We currently only check for DKG specific messages as we cannot rejoin a sign
// round due to a miner overwriting its own message slots (impossible to recover without every message)
if let Err(e) = self.read_dkg_stackerdb_messages(&stacks_client, res, current_reward_cycle)
{
error!("{self}: failed to read stackerdb messages: {e}");
}
if self.should_queue_dkg(stacks_client)? {
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
self.commands.push_front(Command::Dkg);
Expand Down
Loading