Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/signers read stackerdb #4658

Merged
merged 14 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/bitcoin-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ jobs:
- tests::nakamoto_integrations::follower_bootup
- tests::nakamoto_integrations::forked_tenure_is_ignored
- tests::signer::stackerdb_dkg
- tests::signer::stackerdb_sign
- tests::signer::stackerdb_sign_request_rejected
jferrant marked this conversation as resolved.
Show resolved Hide resolved
- tests::signer::stackerdb_block_proposal
- tests::signer::stackerdb_filter_bad_transactions
- tests::signer::stackerdb_mine_2_nakamoto_reward_cycles
- tests::signer::stackerdb_sign_after_signer_reboot
- tests::nakamoto_integrations::stack_stx_burn_op_integration_test
- tests::signer::stackerdb_delayed_dkg
# Do not run this one until we figure out why it fails in CI
# - tests::neon_integrations::bitcoin_reorg_flap
steps:
Expand Down
75 changes: 55 additions & 20 deletions stacks-signer/src/client/stackerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use slog::{slog_debug, slog_error, slog_warn};
use stacks_common::codec::{read_next, StacksMessageCodec};
use stacks_common::types::chainstate::StacksPrivateKey;
use stacks_common::{debug, error, warn};
use wsts::net::Packet;

use super::ClientError;
use crate::client::retry_with_exponential_backoff;
Expand Down Expand Up @@ -179,45 +180,79 @@ impl StackerDB {
}
}

/// Get the transactions from stackerdb for the signers
fn get_transactions(
transactions_session: &mut StackerDBSession,
signer_ids: &[SignerSlotID],
) -> Result<Vec<StacksTransaction>, ClientError> {
/// Get all signer messages from stackerdb for the given slot IDs
fn get_messages(
session: &mut StackerDBSession,
slot_ids: &[u32],
) -> Result<Vec<SignerMessage>, ClientError> {
let mut messages = vec![];
let send_request = || {
transactions_session
.get_latest_chunks(&signer_ids.iter().map(|id| id.0).collect::<Vec<_>>())
session
.get_latest_chunks(slot_ids)
.map_err(backoff::Error::transient)
};
let chunk_ack = retry_with_exponential_backoff(send_request)?;
let mut transactions = Vec::new();
for (i, chunk) in chunk_ack.iter().enumerate() {
let signer_id = *signer_ids
.get(i)
.expect("BUG: retrieved an unequal amount of chunks to requested chunks");
let Some(data) = chunk else {
continue;
};
let Ok(message) = read_next::<SignerMessage, _>(&mut &data[..]) else {
if !data.is_empty() {
warn!("Failed to deserialize chunk data into a SignerMessage");
debug!(
"signer #{signer_id}: Failed chunk ({}): {data:?}",
&data.len(),
);
debug!("slot #{i}: Failed chunk ({}): {data:?}", &data.len(),);
}
continue;
};
messages.push(message);
}
Ok(messages)
}

/// Get the ordered DKG packets from stackerdb for the signer slot IDs.
pub fn get_dkg_packets(
&mut self,
signer_ids: &[SignerSlotID],
) -> Result<Vec<Packet>, ClientError> {
let packet_slots = &[
MessageSlotID::DkgBegin,
MessageSlotID::DkgPublicShares,
MessageSlotID::DkgPrivateBegin,
MessageSlotID::DkgPrivateShares,
MessageSlotID::DkgEndBegin,
MessageSlotID::DkgEnd,
];
let slot_ids = signer_ids.iter().map(|id| id.0).collect::<Vec<_>>();
let mut packets = vec![];
for packet_slot in packet_slots {
let session = self
.signers_message_stackerdb_sessions
.get_mut(packet_slot)
.ok_or(ClientError::NotConnected)?;
let messages = Self::get_messages(session, &slot_ids)?;
for message in messages {
let SignerMessage::Packet(packet) = message else {
warn!("Found an unexpected type in a packet slot {packet_slot}");
continue;
};
packets.push(packet);
}
}
Ok(packets)
}

/// Get the transactions from stackerdb for the signers
fn get_transactions(
transactions_session: &mut StackerDBSession,
signer_ids: &[SignerSlotID],
) -> Result<Vec<StacksTransaction>, ClientError> {
let slot_ids = signer_ids.iter().map(|id| id.0).collect::<Vec<_>>();
let messages = Self::get_messages(transactions_session, &slot_ids)?;
let mut transactions = vec![];
for message in messages {
let SignerMessage::Transactions(chunk_transactions) = message else {
warn!("Signer wrote an unexpected type to the transactions slot");
continue;
};
debug!(
"Retrieved {} transactions from signer ID {}.",
chunk_transactions.len(),
signer_id
);
transactions.extend(chunk_transactions);
}
Ok(transactions)
Expand Down
2 changes: 1 addition & 1 deletion stacks-signer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ DKG transaction fee: {tx_fee} uSTX
"#,
node_host = self.node_host,
endpoint = self.endpoint,
stacks_address = self.stacks_address.to_string(),
stacks_address = self.stacks_address,
public_key = StacksPublicKey::from_private(&self.stacks_private_key).to_hex(),
network = self.network,
db_path = self.db_path.to_str().unwrap_or_default(),
Expand Down
2 changes: 1 addition & 1 deletion stacks-signer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fn write_chunk_to_stdout(chunk_opt: Option<Vec<u8>>) {
if let Some(chunk) = chunk_opt.as_ref() {
let hexed_string = to_hex(chunk);
let hexed_chunk = hexed_string.as_bytes();
let bytes = io::stdout().write(&hexed_chunk).unwrap();
let bytes = io::stdout().write(hexed_chunk).unwrap();
if bytes < hexed_chunk.len() {
print!(
"Failed to write complete chunk to stdout. Missing {} bytes",
Expand Down
5 changes: 4 additions & 1 deletion stacks-signer/src/runloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,9 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
continue;
}
if signer.approved_aggregate_public_key.is_none() {
if let Err(e) = signer.refresh_dkg(&self.stacks_client) {
if let Err(e) =
signer.refresh_dkg(&self.stacks_client, res.clone(), current_reward_cycle)
{
error!("{signer}: failed to refresh DKG: {e}");
}
}
Expand Down Expand Up @@ -421,6 +423,7 @@ impl SignerRunLoop<Vec<OperationResult>, RunLoopCommand> for RunLoop {
None
}
}

#[cfg(test)]
mod tests {
use blockstack_lib::chainstate::stacks::boot::NakamotoSignerEntry;
Expand Down
141 changes: 111 additions & 30 deletions stacks-signer/src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ pub enum Operation {
/// The Signer state
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum State {
/// The signer is uninitialized and should read stackerdb to restore state
Uninitialized,
/// The signer is idle, waiting for messages and commands
Idle,
/// The signer is executing a DKG or Sign round
Expand Down Expand Up @@ -234,6 +236,43 @@ impl Signer {
fn get_coordinator_dkg(&self) -> (u32, PublicKey) {
self.coordinator_selector.get_coordinator()
}

/// Read stackerdb messages in case the signer was started late or restarted and missed incoming DKG messages
pub fn read_dkg_stackerdb_messages(
&mut self,
stacks_client: &StacksClient,
res: Sender<Vec<OperationResult>>,
current_reward_cycle: u64,
) -> Result<(), ClientError> {
if self.state != State::Uninitialized {
// We should only read stackerdb if we are uninitialized
return Ok(());
}
let ordered_packets = self
.stackerdb
.get_dkg_packets(&self.signer_slot_ids)?
.iter()
.filter_map(|packet| {
let coordinator_pubkey = if Self::is_dkg_message(&packet.msg) {
self.get_coordinator_dkg().1
} else {
debug!(
"{self}: Received a non-DKG message in the DKG message queue. Ignoring it."
jferrant marked this conversation as resolved.
Show resolved Hide resolved
);
return None;
};
self.verify_packet(stacks_client, packet.clone(), &coordinator_pubkey)
})
.collect::<Vec<_>>();
// We successfully read stackerdb so we are no longer uninitialized
self.state = State::Idle;
debug!(
"{self}: Processing {} DKG messages from stackerdb: {ordered_packets:?}",
ordered_packets.len()
);
self.handle_packets(stacks_client, res, &ordered_packets, current_reward_cycle);
Ok(())
}
}

impl From<SignerConfig> for Signer {
Expand Down Expand Up @@ -297,7 +336,7 @@ impl From<SignerConfig> for Signer {

if let Some(state) = load_encrypted_signer_state(
&mut stackerdb,
signer_config.signer_slot_id.into(),
signer_config.signer_slot_id,
&state_machine.network_private_key,
).or_else(|err| {
warn!("Failed to load encrypted signer state from StackerDB, falling back to SignerDB: {err}");
Expand All @@ -312,7 +351,7 @@ impl From<SignerConfig> for Signer {
Self {
coordinator,
state_machine,
state: State::Idle,
state: State::Uninitialized,
commands: VecDeque::new(),
stackerdb,
mainnet: signer_config.mainnet,
Expand Down Expand Up @@ -403,6 +442,7 @@ impl Signer {
return;
}
}
self.update_operation(Operation::Dkg);
}
Command::Sign {
block_proposal,
Expand Down Expand Up @@ -449,6 +489,7 @@ impl Signer {
return;
}
}
self.update_operation(Operation::Sign);
}
}
}
Expand All @@ -460,6 +501,10 @@ impl Signer {
current_reward_cycle: u64,
) {
match &self.state {
State::Uninitialized => {
// We cannot process any commands until we have restored our state
warn!("{self}: Cannot process commands until state is restored. Waiting...");
}
State::Idle => {
let Some(command) = self.commands.front() else {
debug!("{self}: Nothing to process. Waiting for command...");
Expand Down Expand Up @@ -685,13 +730,13 @@ impl Signer {
}
}

if packets.iter().any(|packet| match packet.msg {
Message::DkgEnd(_) => true,
_ => false,
}) {
if packets
.iter()
.any(|packet| matches!(packet.msg, Message::DkgEnd(_)))
{
debug!("{self}: Saving signer state");
self.save_signer_state()
.expect(&format!("{self}: Failed to save signer state"));
.unwrap_or_else(|_| panic!("{self}: Failed to save signer state"));
}
self.send_outbound_messages(signer_outbound_messages);
self.send_outbound_messages(coordinator_outbound_messages);
Expand Down Expand Up @@ -1316,42 +1361,78 @@ impl Signer {
}
}

/// Refresh DKG value and queue DKG command if necessary
pub fn refresh_dkg(&mut self, stacks_client: &StacksClient) -> Result<(), ClientError> {
// First check if we should queue DKG based on contract vote state and stackerdb transactions
let should_queue = self.should_queue_dkg(stacks_client)?;
// Before queueing the command, check one last time if DKG has been
// approved. It could have happened after the last call to
// `get_approved_aggregate_key` but before the theshold check in
// `should_queue_dkg`.
/// Refresh DKG and queue it if required
pub fn refresh_dkg(
&mut self,
stacks_client: &StacksClient,
res: Sender<Vec<OperationResult>>,
current_reward_cycle: u64,
) -> Result<(), ClientError> {
// First attempt to retrieve the aggregate key from the contract.
self.update_approved_aggregate_key(stacks_client)?;
if self.approved_aggregate_public_key.is_some() {
return Ok(());
}
// Check stackerdb for any missed DKG messages to catch up our state.
self.read_dkg_stackerdb_messages(&stacks_client, res, current_reward_cycle)?;
// Check if we should still queue DKG
if !self.should_queue_dkg(stacks_client)? {
return Ok(());
}
// Because there could be a slight delay in reading pending transactions and a key being approved by the contract,
// check one last time if the approved key was set since we finished the should queue dkg call
self.update_approved_aggregate_key(stacks_client)?;
if self.approved_aggregate_public_key.is_some() {
return Ok(());
}
if self.commands.front() != Some(&Command::Dkg) {
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
self.commands.push_front(Command::Dkg);
} else {
debug!("{self}: DKG command already queued...");
}
Ok(())
}

/// Overwrites the approved aggregate key to the value in the contract, updating state accordingly
pub fn update_approved_aggregate_key(
&mut self,
stacks_client: &StacksClient,
) -> Result<(), ClientError> {
let old_dkg = self.approved_aggregate_public_key;
self.approved_aggregate_public_key =
stacks_client.get_approved_aggregate_key(self.reward_cycle)?;
if self.approved_aggregate_public_key.is_some() {
// TODO: this will never work as is. We need to have stored our party shares on the side etc for this particular aggregate key.
// Need to update state to store the necessary info, check against it to see if we have participated in the winning round and
// then overwrite our value accordingly. Otherwise, we will be locked out of the round and should not participate.
let internal_dkg = self.coordinator.aggregate_public_key;
if internal_dkg != self.approved_aggregate_public_key {
warn!("{self}: we do not support changing the internal DKG key yet. Expected {internal_dkg:?} got {:?}", self.approved_aggregate_public_key);
}
jferrant marked this conversation as resolved.
Show resolved Hide resolved
self.coordinator
.set_aggregate_public_key(self.approved_aggregate_public_key);
if old_dkg != self.approved_aggregate_public_key {
warn!(
"{self}: updated DKG value to {:?}.",
"{self}: updated DKG value from {old_dkg:?} to {:?}.",
self.approved_aggregate_public_key
);
}
if let State::OperationInProgress(Operation::Dkg) = self.state {
debug!(
"{self}: DKG has already been set. Aborting DKG operation {}.",
self.coordinator.current_dkg_id
);
self.finish_operation();
}
} else if should_queue {
if self.commands.front() != Some(&Command::Dkg) {
info!("{self} is the current coordinator and must trigger DKG. Queuing DKG command...");
self.commands.push_front(Command::Dkg);
} else {
debug!("{self}: DKG command already queued...");
match self.state {
State::OperationInProgress(Operation::Dkg) => {
debug!(
"{self}: DKG has already been set. Aborting DKG operation {}.",
jferrant marked this conversation as resolved.
Show resolved Hide resolved
self.coordinator.current_dkg_id
);
self.finish_operation();
}
State::Uninitialized => {
// If we successfully load the DKG value, we are fully initialized
self.state = State::Idle;
}
_ => {
// do nothing
}
}
}
Ok(())
Expand Down Expand Up @@ -1433,7 +1514,7 @@ impl Signer {
else {
continue;
};
let Some(dkg_public_key) = self.coordinator.aggregate_public_key.clone() else {
let Some(dkg_public_key) = self.coordinator.aggregate_public_key else {
break;
};
if params.aggregate_key == dkg_public_key
Expand Down
Loading