diff --git a/Cargo.lock b/Cargo.lock index df60760987..d792d3d13a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1320,7 +1320,7 @@ dependencies = [ [[package]] name = "cdn-broker" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.7#5406fde54e61058428a7b55e1a98b699f0f606f1" +source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.9#a7523f310b92b15f943b9adffe48fe304eb30e01" dependencies = [ "async-std", "cdn-proto", @@ -1344,7 +1344,7 @@ dependencies = [ [[package]] name = "cdn-client" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.7#5406fde54e61058428a7b55e1a98b699f0f606f1" +source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.9#a7523f310b92b15f943b9adffe48fe304eb30e01" dependencies = [ "async-std", "cdn-proto", @@ -1360,7 +1360,7 @@ dependencies = [ [[package]] name = "cdn-marshal" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.7#5406fde54e61058428a7b55e1a98b699f0f606f1" +source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.9#a7523f310b92b15f943b9adffe48fe304eb30e01" dependencies = [ "async-std", "cdn-proto", @@ -1374,7 +1374,7 @@ dependencies = [ [[package]] name = "cdn-proto" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.7#5406fde54e61058428a7b55e1a98b699f0f606f1" +source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.9#a7523f310b92b15f943b9adffe48fe304eb30e01" dependencies = [ "anyhow", "ark-serialize", @@ -3114,12 +3114,14 @@ dependencies = [ "rand 0.8.5", "serde", "sha2 0.10.8", + "simple_moving_average", "snafu", "surf-disco", "time 0.3.36", "tokio", "toml", "tracing", + "twox-hash", "url", "vbs", ] @@ -3577,7 +3579,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite 0.2.14", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -4232,7 +4234,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -6087,9 +6089,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.26.1" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e902a69d09078829137b4a5d9d082e0490393537badd7c91a3d69d14639e115f" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" dependencies = [ "arc-swap", "async-trait", @@ -6098,7 +6100,6 @@ dependencies = [ "futures", "futures-util", "itoa", - "num-bigint", "percent-encoding", "pin-project-lite 0.2.14", "ryu", @@ -6925,6 +6926,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" +[[package]] +name = "simple_moving_average" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a4b144ad185430cd033299e2c93e465d5a7e65fbb858593dc57181fa13cd310" +dependencies = [ + "num-traits", +] + [[package]] name = "slab" version = "0.4.9" @@ -8199,6 +8209,16 @@ dependencies = [ "utf-8", ] +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "typeid" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 41365c9a2c..8d86f25969 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ espresso-systems-common = { git = "https://github.com/espressosystems/espresso-s ethereum-types = { version = "0.14", default-features = false, features = [ "serialize", ] } -derive_more = { version = "1.0", features = [ "from" ] } +derive_more = { version = "1.0", features = ["from"] } futures = { version = "0.3", default-features = false } # TODO generic-array should not be a direct dependency # https://github.com/EspressoSystems/HotShot/issues/1850 @@ -121,10 +121,10 @@ anyhow = "1" # Push CDN imports -cdn-client = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.7" } -cdn-broker = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.7" } -cdn-marshal = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.7" } -cdn-proto = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.7" } +cdn-client = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.9" } +cdn-broker = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.9" } +cdn-marshal = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.9" } +cdn-proto = { git = "https://github.com/EspressoSystems/Push-CDN", tag = "0.4.9" } ### Profiles ### diff --git a/crates/examples/combined/all.rs b/crates/examples/combined/all.rs index 4ca1e46f3f..07c5a35803 100644 --- a/crates/examples/combined/all.rs +++ b/crates/examples/combined/all.rs @@ -14,10 +14,10 @@ use async_compatibility_layer::{ art::async_spawn, logging::{setup_backtrace, setup_logging}, }; -use cdn_broker::Broker; +use cdn_broker::{reexports::def::hook::NoMessageHook, Broker}; use cdn_marshal::Marshal; use hotshot::{ - traits::implementations::{KeyPair, TestingDef, WrappedSignatureKey}, + traits::implementations::{HotShotMessageHook, KeyPair, TestingDef, WrappedSignatureKey}, types::SignatureKey, }; use hotshot_example_types::{node_types::TestVersions, state_types::TestTypes}; @@ -72,28 +72,34 @@ async fn main() { let private_address = format!("127.0.0.1:{private_port}"); let public_address = format!("127.0.0.1:{public_port}"); - let config: cdn_broker::Config::SignatureKey>> = - cdn_broker::Config { - discovery_endpoint: discovery_endpoint.clone(), - public_advertise_endpoint: public_address.clone(), - public_bind_endpoint: public_address, - private_advertise_endpoint: private_address.clone(), - private_bind_endpoint: private_address, - - keypair: KeyPair { - public_key: WrappedSignatureKey(broker_public_key), - private_key: broker_private_key.clone(), - }, + // Create the message hooks + let broker_message_hook = NoMessageHook; + let user_message_hook = HotShotMessageHook::default(); + + let config: cdn_broker::Config> = cdn_broker::Config { + discovery_endpoint: discovery_endpoint.clone(), + public_advertise_endpoint: public_address.clone(), + public_bind_endpoint: public_address, + private_advertise_endpoint: private_address.clone(), + private_bind_endpoint: private_address, + + keypair: KeyPair { + public_key: WrappedSignatureKey(broker_public_key), + private_key: broker_private_key.clone(), + }, + + metrics_bind_endpoint: None, + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), - metrics_bind_endpoint: None, - ca_cert_path: None, - ca_key_path: None, - global_memory_pool_size: Some(1024 * 1024 * 1024), - }; + user_message_hook, + broker_message_hook, + }; // Create and spawn the broker async_spawn(async move { - let broker: Broker::SignatureKey>> = + let broker: Broker> = Broker::new(config).await.expect("broker failed to start"); // Error if we stopped unexpectedly @@ -121,10 +127,9 @@ async fn main() { // Spawn the marshal async_spawn(async move { - let marshal: Marshal::SignatureKey>> = - Marshal::new(marshal_config) - .await - .expect("failed to spawn marshal"); + let marshal: Marshal> = Marshal::new(marshal_config) + .await + .expect("failed to spawn marshal"); // Error if we stopped unexpectedly if let Err(err) = marshal.start().await { diff --git a/crates/examples/push-cdn/all.rs b/crates/examples/push-cdn/all.rs index 3d3de5d42e..be606ef54a 100644 --- a/crates/examples/push-cdn/all.rs +++ b/crates/examples/push-cdn/all.rs @@ -11,10 +11,13 @@ pub mod types; use std::path::Path; use async_compatibility_layer::art::async_spawn; -use cdn_broker::{reexports::crypto::signature::KeyPair, Broker}; +use cdn_broker::{ + reexports::{crypto::signature::KeyPair, def::hook::NoMessageHook}, + Broker, +}; use cdn_marshal::Marshal; use hotshot::{ - traits::implementations::{TestingDef, WrappedSignatureKey}, + traits::implementations::{HotShotMessageHook, TestingDef, WrappedSignatureKey}, types::SignatureKey, }; use hotshot_example_types::{node_types::TestVersions, state_types::TestTypes}; @@ -78,28 +81,34 @@ async fn main() { let private_address = format!("127.0.0.1:{private_port}"); let public_address = format!("127.0.0.1:{public_port}"); - let config: cdn_broker::Config::SignatureKey>> = - cdn_broker::Config { - discovery_endpoint: discovery_endpoint.clone(), - public_advertise_endpoint: public_address.clone(), - public_bind_endpoint: public_address, - private_advertise_endpoint: private_address.clone(), - private_bind_endpoint: private_address, - - keypair: KeyPair { - public_key: WrappedSignatureKey(broker_public_key), - private_key: broker_private_key.clone(), - }, + // Create the message hooks + let broker_message_hook = NoMessageHook; + let user_message_hook = HotShotMessageHook::default(); + + let config: cdn_broker::Config> = cdn_broker::Config { + discovery_endpoint: discovery_endpoint.clone(), + public_advertise_endpoint: public_address.clone(), + public_bind_endpoint: public_address, + private_advertise_endpoint: private_address.clone(), + private_bind_endpoint: private_address, + + keypair: KeyPair { + public_key: WrappedSignatureKey(broker_public_key), + private_key: broker_private_key.clone(), + }, + + metrics_bind_endpoint: None, + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), - metrics_bind_endpoint: None, - ca_cert_path: None, - ca_key_path: None, - global_memory_pool_size: Some(1024 * 1024 * 1024), - }; + user_message_hook, + broker_message_hook, + }; // Create and spawn the broker async_spawn(async move { - let broker: Broker::SignatureKey>> = + let broker: Broker> = Broker::new(config).await.expect("broker failed to start"); // Error if we stopped unexpectedly @@ -125,10 +134,9 @@ async fn main() { // Spawn the marshal async_spawn(async move { - let marshal: Marshal::SignatureKey>> = - Marshal::new(marshal_config) - .await - .expect("failed to spawn marshal"); + let marshal: Marshal> = Marshal::new(marshal_config) + .await + .expect("failed to spawn marshal"); // Error if we stopped unexpectedly if let Err(err) = marshal.start().await { diff --git a/crates/examples/push-cdn/broker.rs b/crates/examples/push-cdn/broker.rs index 7eabbec50f..11226c4bbe 100644 --- a/crates/examples/push-cdn/broker.rs +++ b/crates/examples/push-cdn/broker.rs @@ -7,9 +7,11 @@ //! The following is the main `Broker` binary, which just instantiates and runs //! a `Broker` object. use anyhow::Result; -use cdn_broker::{Broker, Config}; +use cdn_broker::{reexports::def::hook::NoMessageHook, Broker, Config}; use clap::Parser; -use hotshot::traits::implementations::{KeyPair, ProductionDef, WrappedSignatureKey}; +use hotshot::traits::implementations::{ + HotShotMessageHook, KeyPair, ProductionDef, WrappedSignatureKey, +}; use hotshot_example_types::node_types::TestTypes; use hotshot_types::traits::{node_implementation::NodeType, signature_key::SignatureKey}; use sha2::Digest; @@ -91,8 +93,12 @@ async fn main() -> Result<()> { let (public_key, private_key) = ::SignatureKey::generated_from_seed_indexed(key_hash.into(), 1337); + // Create the message hooks + let broker_message_hook = NoMessageHook; + let user_message_hook = HotShotMessageHook::default(); + // Create config - let broker_config: Config::SignatureKey>> = Config { + let broker_config: Config> = Config { ca_cert_path: args.ca_cert_path, ca_key_path: args.ca_key_path, @@ -108,6 +114,9 @@ async fn main() -> Result<()> { private_bind_endpoint: args.private_bind_endpoint, private_advertise_endpoint: args.private_advertise_endpoint, global_memory_pool_size: Some(args.global_memory_pool_size), + + user_message_hook, + broker_message_hook, }; // Create new `Broker` diff --git a/crates/examples/push-cdn/marshal.rs b/crates/examples/push-cdn/marshal.rs index 39d2267bd8..fde57cd28d 100644 --- a/crates/examples/push-cdn/marshal.rs +++ b/crates/examples/push-cdn/marshal.rs @@ -12,7 +12,6 @@ use cdn_marshal::{Config, Marshal}; use clap::Parser; use hotshot::traits::implementations::ProductionDef; use hotshot_example_types::node_types::TestTypes; -use hotshot_types::traits::node_implementation::NodeType; use tracing_subscriber::EnvFilter; // TODO: forall, add logging where we need it @@ -81,8 +80,7 @@ async fn main() -> Result<()> { }; // Create new `Marshal` from the config - let marshal = - Marshal::::SignatureKey>>::new(config).await?; + let marshal = Marshal::>::new(config).await?; // Start the main loop, consuming it marshal.start().await?; diff --git a/crates/hotshot/Cargo.toml b/crates/hotshot/Cargo.toml index 9dd8e2a35a..651f8de256 100644 --- a/crates/hotshot/Cargo.toml +++ b/crates/hotshot/Cargo.toml @@ -61,7 +61,9 @@ blake3.workspace = true sha2 = { workspace = true } url = { workspace = true } num_enum = "0.7" +twox-hash = { version = "1", default-features = false } parking_lot = "0.12" +simple_moving_average = "1" [target.'cfg(all(async_executor_impl = "tokio"))'.dependencies] tokio = { workspace = true } diff --git a/crates/hotshot/src/traits.rs b/crates/hotshot/src/traits.rs index b0aa3a67e3..7447d5a0c8 100644 --- a/crates/hotshot/src/traits.rs +++ b/crates/hotshot/src/traits.rs @@ -24,8 +24,12 @@ pub mod implementations { }, memory_network::{MasterMap, MemoryNetwork}, push_cdn_network::{ - CdnMetricsValue, KeyPair, ProductionDef, PushCdnNetwork, TestingDef, Topic as CdnTopic, - WrappedSignatureKey, + definition::{ + message_hook::HotShotMessageHook, signature_key::WrappedSignatureKey, + ProductionDef, TestingDef, Topic as CdnTopic, + }, + metrics::CdnMetricsValue, + KeyPair, PushCdnNetwork, }, }; } diff --git a/crates/hotshot/src/traits/networking/combined_network.rs b/crates/hotshot/src/traits/networking/combined_network.rs index 4bed1471b6..118519a242 100644 --- a/crates/hotshot/src/traits/networking/combined_network.rs +++ b/crates/hotshot/src/traits/networking/combined_network.rs @@ -261,9 +261,7 @@ impl TestableNetworkingImplementation for CombinedNetwor fn generator( expected_node_count: usize, num_bootstrap: usize, - network_id: usize, da_committee_size: usize, - is_da: bool, reliability_config: Option>, secondary_network_delay: Duration, ) -> AsyncGenerator> { @@ -271,18 +269,14 @@ impl TestableNetworkingImplementation for CombinedNetwor as TestableNetworkingImplementation>::generator( expected_node_count, num_bootstrap, - network_id, da_committee_size, - is_da, None, Duration::default(), ), as TestableNetworkingImplementation>::generator( expected_node_count, num_bootstrap, - network_id, da_committee_size, - is_da, reliability_config, Duration::default(), ) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index e75e601235..7a84da48eb 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -212,9 +212,7 @@ impl TestableNetworkingImplementation fn generator( expected_node_count: usize, num_bootstrap: usize, - _network_id: usize, da_committee_size: usize, - _is_da: bool, reliability_config: Option>, _secondary_network_delay: Duration, ) -> AsyncGenerator> { diff --git a/crates/hotshot/src/traits/networking/memory_network.rs b/crates/hotshot/src/traits/networking/memory_network.rs index ee3a55684f..91ad10200d 100644 --- a/crates/hotshot/src/traits/networking/memory_network.rs +++ b/crates/hotshot/src/traits/networking/memory_network.rs @@ -189,9 +189,7 @@ impl TestableNetworkingImplementation fn generator( _expected_node_count: usize, _num_bootstrap: usize, - _network_id: usize, da_committee_size: usize, - _is_da: bool, reliability_config: Option>, _secondary_network_delay: Duration, ) -> AsyncGenerator> { diff --git a/crates/hotshot/src/traits/networking/push_cdn_network/definition/message_hook.rs b/crates/hotshot/src/traits/networking/push_cdn_network/definition/message_hook.rs new file mode 100644 index 0000000000..0c734e0560 --- /dev/null +++ b/crates/hotshot/src/traits/networking/push_cdn_network/definition/message_hook.rs @@ -0,0 +1,531 @@ +#![allow(clippy::unnecessary_wraps)] +use std::hash::Hasher; +use std::num::NonZeroUsize; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{Context, Result}; +use cdn_broker::reexports::def::hook::{HookResult, MessageHookDef}; +use cdn_broker::reexports::message::{Broadcast, Direct, Message as PushCdnMessage}; +use lru::LruCache; +use parking_lot::Mutex; +use simple_moving_average::{SingleSumSMA, SMA as SmaTrait}; +use std::time::Duration; +use tracing::warn; +use twox_hash::xxh3::Hash64; + +/// A wrapper around an `SMA` type that allows for atomic +/// access of the previously calculated sum. +#[derive(Clone)] +struct Sma { + /// The "inner" moving average object + sma: Arc>>, + + /// The previously calculated sum + cached_sum: Arc, +} + +/// The type of message being processed. Is used downstream to determine +/// which sample and average to use when processing a message. +#[derive(Eq, PartialEq, Clone, Copy)] +enum MessageType { + /// A broadcast message + Broadcast, + /// A direct message + Direct, +} + +impl Sma { + /// Create a new `SMA` + fn new() -> Self { + Self { + cached_sum: Arc::new(AtomicU64::new(0)), + sma: Arc::new(Mutex::new(SingleSumSMA::new())), + } + } + + /// Commit a sample to the `SMA`. This will update the cached sum. + fn commit_sample(&mut self, sample: &mut Sample) { + // Calculate the sample's average bytes per second and reset the sample + let bytes_per_second = sample.get(); + sample.reset(); + + // Lock the `SMA`, add the sample, and get the new average + let mut sma_guard = self.sma.lock(); + sma_guard.add_sample(bytes_per_second); + let new_average = sma_guard.get_average(); + + // Store the new average in the cached sum + self.cached_sum.store(new_average, Ordering::Relaxed); + drop(sma_guard); + } + + /// Get the cached (most currently updated) sum + fn get(&self) -> u64 { + self.cached_sum.load(Ordering::Relaxed) + } +} + +/// A sample for the `SMA`. This is used to calculate the average bytes per second, +/// and is periodically committed and reset. +#[derive(Clone)] +struct Sample { + /// The number of bytes sent since `last_committed_time` + num_bytes_sent: u64, + + /// When we should start processing messages again + cooldown_until: Instant, + + /// The last time the sample was checked + last_checked_time: Instant, + + /// The last time the sample was committed + last_committed_time: Instant, +} + +impl Sample { + /// Create a new `Sample` + fn new() -> Self { + Self { + num_bytes_sent: 0, + cooldown_until: Instant::now(), + last_checked_time: Instant::now(), + last_committed_time: Instant::now(), + } + } + + /// Add bytes to the sample and increment the number of messages sent + fn add(&mut self, bytes: u64) { + self.num_bytes_sent += bytes; + } + + /// Get the number of bytes per second of the current sample + fn get(&self) -> u64 { + self.num_bytes_sent / self.last_committed_time.elapsed().as_secs().max(1) + } + + /// Reset the sample. This is used when the sample is committed. + fn reset(&mut self) { + self.num_bytes_sent = 0; + self.last_checked_time = Instant::now(); + self.last_committed_time = Instant::now(); + } +} + +#[derive(Clone)] +/// The message hook for `HotShot` messages. Each user has a unique message hook. +pub struct HotShotMessageHook { + /// The cache for message hashes. We use this to deduplicate a sliding window of + /// 100 messages. + message_hash_cache: LruCache, + + /// The sample check interval + sample_check_interval: Duration, + + /// The sample commit interval + sample_commit_interval: Duration, + + /// The multiple of our average that the local average is allowed to be + allowed_multiple: u64, + + /// The global moving average for the number of broadcast bytes per second + global_broadcast_bps: Sma, + + /// The local average for the number of broadcast bytes per second + local_broadcast_bps: Sample, + + /// The global moving average for the number of direct bytes per second + global_direct_bps: Sma, + + /// The local average for the number of direct bytes per second + local_direct_bps: Sample, +} + +impl Default for HotShotMessageHook { + /// # Panics + /// If 100 < 0 + fn default() -> Self { + Self { + sample_check_interval: Duration::from_secs(5), + sample_commit_interval: Duration::from_secs(120), + allowed_multiple: 4, + + global_broadcast_bps: Sma::new(), + global_direct_bps: Sma::new(), + local_broadcast_bps: Sample::new(), + local_direct_bps: Sample::new(), + message_hash_cache: LruCache::new(NonZeroUsize::new(100).unwrap()), + } + } +} + +impl HotShotMessageHook { + /// Create a new `HotShotMessageHook` + /// + /// # Panics + /// If 100 < 0 + #[must_use] + pub fn new( + sample_check_interval: Duration, + sample_commit_interval: Duration, + allowed_multiple: u64, + ) -> Self { + Self { + sample_check_interval, + sample_commit_interval, + allowed_multiple, + + global_broadcast_bps: Sma::new(), + global_direct_bps: Sma::new(), + local_broadcast_bps: Sample::new(), + local_direct_bps: Sample::new(), + + message_hash_cache: LruCache::new(NonZeroUsize::new(100).unwrap()), + } + } + + /// Process a message against the moving average + /// Returns whether or not the message should be skipped. + fn process_against_sma(&mut self, message_len: usize, message_type: MessageType) -> HookResult { + // Match the sample and `SMA` based on the message type + let (sample, sma) = match message_type { + MessageType::Broadcast => ( + &mut self.local_broadcast_bps, + &mut self.global_broadcast_bps, + ), + MessageType::Direct => (&mut self.local_direct_bps, &mut self.global_direct_bps), + }; + + // Get the current time + let now = Instant::now(); + + // Skip the message if we need to cool down + if sample.cooldown_until > now { + return HookResult::SkipMessage; + } + + // Commit the sample if that interval has elapsed + if now.duration_since(sample.last_committed_time) >= self.sample_commit_interval { + sma.commit_sample(sample); + } + + // Add the length to the local sample + sample.add(message_len as u64); + + // If we have surpassed the check interval, check the sample to make sure it does + // not exceed the `global average * allowed_multiple` + if now.duration_since(sample.last_checked_time) >= self.sample_check_interval { + // Get our local and global bps + let local_bps = sample.get(); + let mut global_bps = sma.get(); + + // Clamp the global bps to a minimum if it's not zero (meaning uninitialized) + if global_bps > 0 { + global_bps = std::cmp::max(global_bps, 5000); + } + + // Calculate the maximum allowed bps + let max_allowed_bps = global_bps * self.allowed_multiple; + + // If the local bps is greater than the allowed bps, calculate the cooldown and skip + // the message + if global_bps != 0 && local_bps > max_allowed_bps { + // Set the cooldown to the time it would take to get the local bps to the max allowed + sample.cooldown_until = + Instant::now() + Duration::from_secs(local_bps / max_allowed_bps); + + // Skip the message + return HookResult::SkipMessage; + } + + // Reset the check time + sample.last_checked_time = Instant::now(); + } + + HookResult::ProcessMessage + } + + /// Process against the local message cache. This is used to deduplicate messages. + /// Returns `true` if the message has been seen before, `false` otherwise. + /// + /// - `auxiliary_data` is used to take into account the message recipient or topics associated. + fn message_already_seen(&mut self, message: &[u8], auxiliary_data: &[u8]) -> bool { + // Calculate the hash of the message + let mut hasher = Hash64::default(); + hasher.write(message); + hasher.write(auxiliary_data); + + // Add it, returning if we've seen it before + self.message_hash_cache.put(hasher.finish(), ()).is_some() + } + + /// Process incoming broadcast messages from the user + fn process_broadcast_message(&mut self, broadcast: &mut Broadcast) -> Result { + // Process through the `SMA`. Skip if it's over the threshold + let HookResult::ProcessMessage = + self.process_against_sma(broadcast.message.len(), MessageType::Broadcast) + else { + warn!("Broadcast message not processed due to high message rate"); + return Ok(HookResult::SkipMessage); + }; + + // Skip the message if we've already seen it + if self.message_already_seen(&broadcast.message, &broadcast.topics) { + return Ok(HookResult::SkipMessage); + } + + // Make sure it is deserializable + // let (_, _) = Self::deserialize_message(&broadcast.message)?; + + Ok(HookResult::ProcessMessage) + } + + /// Process incoming direct messages from the user + fn process_direct_message(&mut self, direct: &mut Direct) -> Result { + // Process through the `SMA`. Skip if it's over the threshold + let HookResult::ProcessMessage = + self.process_against_sma(direct.message.len(), MessageType::Direct) + else { + warn!("Direct message not processed due to high message rate"); + return Ok(HookResult::SkipMessage); + }; + + // Skip the message if we've already seen it + if self.message_already_seen(&direct.message, &direct.recipient) { + return Ok(HookResult::SkipMessage); + } + + // Make sure it is deserializable + // let (_, _) = Self::deserialize_message(&direct.message)?; + + Ok(HookResult::ProcessMessage) + } + + // fn deserialize_message(message: &[u8]) -> Result<(Message, Version)> { + // // Hack off the version + // let (version, message) = + // Version::deserialize(&message).with_context(|| "failed to deserialize message")?; + + // // Deserialize the message + // let message = Serializer::>::deserialize_no_version(&message) + // .with_context(|| "failed to deserialize message")?; + + // // Return the version and message + // Ok((message, version)) + // } +} + +impl MessageHookDef for HotShotMessageHook { + /// Implement the hook trait for `HotShotMessageHook` + fn on_message_received(&mut self, message: &mut PushCdnMessage) -> Result { + match message { + PushCdnMessage::Broadcast(broadcast) => self + .process_broadcast_message(broadcast) + .with_context(|| "failed to process broadcast message"), + + PushCdnMessage::Direct(direct) => self + .process_direct_message(direct) + .with_context(|| "failed to process direct message"), + + _ => Ok(HookResult::ProcessMessage), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + pub fn deduplication_broadcast() { + // Create a new message hook + let mut hook = HotShotMessageHook::default(); + + // Create a message + let mut message = Vec::new(); + message.extend_from_slice(b"Hello, world!"); + + // Create a broadcast message + let mut broadcast = Broadcast { + message, + topics: vec![], + }; + + // Process the message, make sure it would've been sent + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.is_ok() && result.unwrap() == HookResult::ProcessMessage, + "Message should have been processed but was not" + ); + + // Send it again, this time it should be skipped + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.is_ok() && result.unwrap() == HookResult::SkipMessage, + "Message should have been skipped but was not" + ); + + // Alter the topics, it should be processed + broadcast.topics.push(1); + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.is_ok() && result.unwrap() == HookResult::ProcessMessage, + "Same message with different topics should have been processed but was not" + ); + + // Alter the message, it should be processed + broadcast.message.extend_from_slice(b"!"); + broadcast.topics.clear(); + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.is_ok() && result.unwrap() == HookResult::ProcessMessage, + "Different message with same topics should have been processed but was not" + ); + } + + #[test] + pub fn deduplication_direct() { + // Create a new message hook + let mut hook = HotShotMessageHook::default(); + + // Create a message + let mut message = Vec::new(); + message.extend_from_slice(b"Hello, world!"); + + // Create a broadcast message + let mut direct = Direct { + message, + recipient: vec![], + }; + + // Process the message, make sure it would've been sent + let result = hook.process_direct_message(&mut direct); + assert!( + result.is_ok() && result.unwrap() == HookResult::ProcessMessage, + "Message should have been processed but was not" + ); + + // Send it again, this time it should be skipped + let result = hook.process_direct_message(&mut direct); + assert!( + result.is_ok() && result.unwrap() == HookResult::SkipMessage, + "Message should have been skipped but was not" + ); + + // Alter the topics, it should be processed + direct.recipient.push(1); + let result = hook.process_direct_message(&mut direct); + assert!( + result.is_ok() && result.unwrap() == HookResult::ProcessMessage, + "Same message with different recipient should have been processed but was not" + ); + + // Alter the message, it should be processed + direct.message.extend_from_slice(b"!"); + direct.recipient.clear(); + let result = hook.process_direct_message(&mut direct); + assert!( + result.is_ok() && result.unwrap() == HookResult::ProcessMessage, + "Different message with same recipient should have been processed but was not" + ); + } + + #[test] + fn in_range() { + // Create a new message hook + let mut hook = HotShotMessageHook { + sample_check_interval: Duration::from_secs(1), + allowed_multiple: 1, + global_broadcast_bps: Sma { + // Pretend we've seen an average of 5000 bytes per second + cached_sum: Arc::new(AtomicU64::new(5000)), + sma: Arc::new(Mutex::new(SingleSumSMA::new())), + }, + local_broadcast_bps: Sample { + num_bytes_sent: 0, + cooldown_until: Instant::now(), + // Pretend we need to check the sample + last_checked_time: Instant::now().checked_sub(Duration::from_secs(1)).unwrap(), + last_committed_time: Instant::now(), + }, + ..Default::default() + }; + + // Create a message just within the range + let message = vec![0; 4800]; + let mut broadcast = Broadcast { + message, + topics: vec![], + }; + + // Process the message, make sure it would've been sent + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.is_ok(), + "Message should have been processed but was not", + ); + } + + #[test] + fn exceeding_range() { + // Create a new message hook + let mut hook = HotShotMessageHook { + sample_check_interval: Duration::from_secs(1), + allowed_multiple: 1, + global_broadcast_bps: Sma { + // Pretend we've seen an average of 5000 bytes per second + cached_sum: Arc::new(AtomicU64::new(5000)), + sma: Arc::new(Mutex::new(SingleSumSMA::new())), + }, + local_broadcast_bps: Sample { + num_bytes_sent: 0, + cooldown_until: Instant::now(), + // Pretend we need to check the sample + last_checked_time: Instant::now().checked_sub(Duration::from_secs(1)).unwrap(), + last_committed_time: Instant::now(), + }, + ..Default::default() + }; + + // Create a message exceeding the range + let message = vec![0; 10000]; + let mut broadcast = Broadcast { + message, + topics: vec![], + }; + + // Process the message, make sure it would've been sent + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.unwrap() == HookResult::SkipMessage, + "Message should have been skipped but was not", + ); + + // Wait one second, make sure it's still skipped + let message = vec![1; 10000]; + let mut broadcast = Broadcast { + message, + topics: vec![], + }; + std::thread::sleep(Duration::from_millis(1000)); + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.unwrap() == HookResult::SkipMessage, + "Message should have been skipped but was not", + ); + + // Wait another second and some change, make sure it's processed + let message = vec![2; 1]; + let mut broadcast = Broadcast { + message, + topics: vec![], + }; + std::thread::sleep(Duration::from_millis(3000)); + let result = hook.process_broadcast_message(&mut broadcast); + assert!( + result.unwrap() == HookResult::ProcessMessage, + "Message should have been processed but was not", + ); + } +} diff --git a/crates/hotshot/src/traits/networking/push_cdn_network/definition/mod.rs b/crates/hotshot/src/traits/networking/push_cdn_network/definition/mod.rs new file mode 100644 index 0000000000..1818e4e280 --- /dev/null +++ b/crates/hotshot/src/traits/networking/push_cdn_network/definition/mod.rs @@ -0,0 +1,80 @@ +use std::marker::PhantomData; + +use cdn_broker::reexports::{ + connection::protocols::{Quic, Tcp}, + def::{hook::NoMessageHook, ConnectionDef, RunDef, Topic as TopicTrait}, + discovery::{Embedded, Redis}, +}; +use hotshot_types::traits::{node_implementation::NodeType, signature_key::SignatureKey}; +use message_hook::HotShotMessageHook; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use signature_key::WrappedSignatureKey; + +/// Allows hooking of incoming messages to the CDN +pub mod message_hook; + +/// The CDN's signature key implementation, which wraps the real signature key +pub mod signature_key; + +/// The enum for the topics we can subscribe to in the Push CDN +#[repr(u8)] +#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)] +pub enum Topic { + /// The global topic + Global = 0, + /// The DA topic + Da = 1, +} + +/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter +/// topics that are not implemented at the application level. +impl TopicTrait for Topic {} + +/// The production run definition for the Push CDN. +/// Uses the real protocols and a Redis discovery client. +pub struct ProductionDef(PhantomData); +impl RunDef for ProductionDef { + type User = UserDef; + type Broker = BrokerDef; + type DiscoveryClientType = Redis; + type Topic = Topic; +} + +/// The user definition for the Push CDN. +/// Uses the Quic protocol and untrusted middleware. +pub struct UserDef(PhantomData); +impl ConnectionDef for UserDef { + type Scheme = WrappedSignatureKey; + type Protocol = Quic; + type MessageHook = HotShotMessageHook; +} + +/// The broker definition for the Push CDN. +/// Uses the TCP protocol and trusted middleware. +pub struct BrokerDef(PhantomData); +impl ConnectionDef for BrokerDef { + type Scheme = WrappedSignatureKey; + type Protocol = Tcp; + type MessageHook = NoMessageHook; +} + +/// The client definition for the Push CDN. Uses the Quic +/// protocol and no middleware. Differs from the user +/// definition in that is on the client-side. +#[derive(Clone)] +pub struct ClientDef(PhantomData); +impl ConnectionDef for ClientDef { + type Scheme = WrappedSignatureKey; + type Protocol = Quic; + type MessageHook = NoMessageHook; +} + +/// The testing run definition for the Push CDN. +/// Uses the real protocols, but with an embedded discovery clientn. +pub struct TestingDef(PhantomData); +impl RunDef for TestingDef { + type User = UserDef; + type Broker = BrokerDef; + type DiscoveryClientType = Embedded; + type Topic = Topic; +} diff --git a/crates/hotshot/src/traits/networking/push_cdn_network/definition/signature_key.rs b/crates/hotshot/src/traits/networking/push_cdn_network/definition/signature_key.rs new file mode 100644 index 0000000000..deca7e4396 --- /dev/null +++ b/crates/hotshot/src/traits/networking/push_cdn_network/definition/signature_key.rs @@ -0,0 +1,43 @@ +use bincode::Options; +use cdn_broker::reexports::crypto::signature::{Serializable, SignatureScheme}; +use hotshot_types::{traits::signature_key::SignatureKey, utils::bincode_opts}; + +/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme` +/// trait in order to sign and verify messages to/from the CDN. +#[derive(Clone, Eq, PartialEq)] +pub struct WrappedSignatureKey(pub T); +impl SignatureScheme for WrappedSignatureKey { + type PrivateKey = T::PrivateKey; + type PublicKey = Self; + + /// Sign a message of arbitrary data and return the serialized signature + fn sign(private_key: &Self::PrivateKey, message: &[u8]) -> anyhow::Result> { + let signature = T::sign(private_key, message)?; + // TODO: replace with rigorously defined serialization scheme... + // why did we not make `PureAssembledSignatureType` be `CanonicalSerialize + CanonicalDeserialize`? + Ok(bincode_opts().serialize(&signature)?) + } + + /// Verify a message of arbitrary data and return the result + fn verify(public_key: &Self::PublicKey, message: &[u8], signature: &[u8]) -> bool { + // TODO: replace with rigorously defined signing scheme + let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) { + Ok(key) => key, + Err(_) => return false, + }; + + public_key.0.validate(&signature, message) + } +} + +/// We need to implement the `Serializable` so the Push CDN can serialize the signatures +/// and public keys and send them over the wire. +impl Serializable for WrappedSignatureKey { + fn serialize(&self) -> anyhow::Result> { + Ok(self.0.to_bytes()) + } + + fn deserialize(serialized: &[u8]) -> anyhow::Result { + Ok(WrappedSignatureKey(T::from_bytes(serialized)?)) + } +} diff --git a/crates/hotshot/src/traits/networking/push_cdn_network/metrics.rs b/crates/hotshot/src/traits/networking/push_cdn_network/metrics.rs new file mode 100644 index 0000000000..31322f57d0 --- /dev/null +++ b/crates/hotshot/src/traits/networking/push_cdn_network/metrics.rs @@ -0,0 +1,28 @@ +use hotshot_types::traits::metrics::{Counter, Metrics, NoMetrics}; + +/// CDN-specific metrics +#[derive(Clone)] +pub struct CdnMetricsValue { + /// The number of failed messages + pub num_failed_messages: Box, +} + +impl CdnMetricsValue { + /// Populate the metrics with the CDN-specific ones + pub fn new(metrics: &dyn Metrics) -> Self { + // Create a subgroup for the CDN + let subgroup = metrics.subgroup("cdn".into()); + + // Create the CDN-specific metrics + Self { + num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None), + } + } +} + +impl Default for CdnMetricsValue { + // The default is empty metrics + fn default() -> Self { + Self::new(&*NoMetrics::boxed()) + } +} diff --git a/crates/hotshot/src/traits/networking/push_cdn_network.rs b/crates/hotshot/src/traits/networking/push_cdn_network/mod.rs similarity index 73% rename from crates/hotshot/src/traits/networking/push_cdn_network.rs rename to crates/hotshot/src/traits/networking/push_cdn_network/mod.rs index f675b01f37..c8e3bb7273 100644 --- a/crates/hotshot/src/traits/networking/push_cdn_network.rs +++ b/crates/hotshot/src/traits/networking/push_cdn_network/mod.rs @@ -6,7 +6,7 @@ #[cfg(feature = "hotshot-testing")] use std::sync::atomic::{AtomicBool, Ordering}; -use std::{collections::BTreeSet, marker::PhantomData, sync::Arc}; +use std::{collections::BTreeSet, sync::Arc}; #[cfg(feature = "hotshot-testing")] use std::{path::Path, time::Duration}; @@ -14,25 +14,20 @@ use async_compatibility_layer::channel::TrySendError; #[cfg(feature = "hotshot-testing")] use async_compatibility_layer::{art::async_sleep, art::async_spawn}; use async_trait::async_trait; -use bincode::config::Options; -use cdn_broker::reexports::{ - connection::protocols::Tcp, - def::{ConnectionDef, RunDef, Topic as TopicTrait}, - discovery::{Embedded, Redis}, -}; +use cdn_broker::reexports::def::hook::NoMessageHook; #[cfg(feature = "hotshot-testing")] use cdn_broker::{Broker, Config as BrokerConfig}; pub use cdn_client::reexports::crypto::signature::KeyPair; use cdn_client::{ - reexports::{ - connection::protocols::Quic, - crypto::signature::{Serializable, SignatureScheme}, - message::{Broadcast, Direct, Message as PushCdnMessage}, - }, + reexports::message::{Broadcast, Direct, Message}, Client, Config as ClientConfig, }; #[cfg(feature = "hotshot-testing")] use cdn_marshal::{Config as MarshalConfig, Marshal}; +use definition::{ + message_hook::HotShotMessageHook, signature_key::WrappedSignatureKey, ClientDef, TestingDef, + Topic, +}; use futures::channel::mpsc; #[cfg(feature = "hotshot-testing")] use hotshot_types::traits::network::{ @@ -43,133 +38,23 @@ use hotshot_types::{ data::ViewNumber, request_response::NetworkMsgResponseChannel, traits::{ - metrics::{Counter, Metrics, NoMetrics}, network::{BroadcastDelay, ConnectedNetwork, PushCdnNetworkError, Topic as HotShotTopic}, node_implementation::NodeType, signature_key::SignatureKey, }, - utils::bincode_opts, BoxSyncFuture, }; -use num_enum::{IntoPrimitive, TryFromPrimitive}; +use metrics::CdnMetricsValue; #[cfg(feature = "hotshot-testing")] use rand::{rngs::StdRng, RngCore, SeedableRng}; use tracing::error; -use super::NetworkError; - -/// CDN-specific metrics -#[derive(Clone)] -pub struct CdnMetricsValue { - /// The number of failed messages - pub num_failed_messages: Box, -} - -impl CdnMetricsValue { - /// Populate the metrics with the CDN-specific ones - pub fn new(metrics: &dyn Metrics) -> Self { - // Create a subgroup for the CDN - let subgroup = metrics.subgroup("cdn".into()); - - // Create the CDN-specific metrics - Self { - num_failed_messages: subgroup.create_counter("num_failed_messages".into(), None), - } - } -} - -impl Default for CdnMetricsValue { - // The default is empty metrics - fn default() -> Self { - Self::new(&*NoMetrics::boxed()) - } -} - -/// A wrapped `SignatureKey`. We need to implement the Push CDN's `SignatureScheme` -/// trait in order to sign and verify messages to/from the CDN. -#[derive(Clone, Eq, PartialEq)] -pub struct WrappedSignatureKey(pub T); -impl SignatureScheme for WrappedSignatureKey { - type PrivateKey = T::PrivateKey; - type PublicKey = Self; - - /// Sign a message of arbitrary data and return the serialized signature - fn sign(private_key: &Self::PrivateKey, message: &[u8]) -> anyhow::Result> { - let signature = T::sign(private_key, message)?; - // TODO: replace with rigorously defined serialization scheme... - // why did we not make `PureAssembledSignatureType` be `CanonicalSerialize + CanonicalDeserialize`? - Ok(bincode_opts().serialize(&signature)?) - } - - /// Verify a message of arbitrary data and return the result - fn verify(public_key: &Self::PublicKey, message: &[u8], signature: &[u8]) -> bool { - // TODO: replace with rigorously defined signing scheme - let signature: T::PureAssembledSignatureType = match bincode_opts().deserialize(signature) { - Ok(key) => key, - Err(_) => return false, - }; - - public_key.0.validate(&signature, message) - } -} - -/// We need to implement the `Serializable` so the Push CDN can serialize the signatures -/// and public keys and send them over the wire. -impl Serializable for WrappedSignatureKey { - fn serialize(&self) -> anyhow::Result> { - Ok(self.0.to_bytes()) - } - - fn deserialize(serialized: &[u8]) -> anyhow::Result { - Ok(WrappedSignatureKey(T::from_bytes(serialized)?)) - } -} +/// The run definition for the Push CDN +pub mod definition; +/// The metrics for the Push CDN +pub mod metrics; -/// The production run definition for the Push CDN. -/// Uses the real protocols and a Redis discovery client. -pub struct ProductionDef(PhantomData); -impl RunDef for ProductionDef { - type User = UserDef; - type Broker = BrokerDef; - type DiscoveryClientType = Redis; - type Topic = Topic; -} - -/// The user definition for the Push CDN. -/// Uses the Quic protocol and untrusted middleware. -pub struct UserDef(PhantomData); -impl ConnectionDef for UserDef { - type Scheme = WrappedSignatureKey; - type Protocol = Quic; -} - -/// The broker definition for the Push CDN. -/// Uses the TCP protocol and trusted middleware. -pub struct BrokerDef(PhantomData); -impl ConnectionDef for BrokerDef { - type Scheme = WrappedSignatureKey; - type Protocol = Tcp; -} - -/// The client definition for the Push CDN. Uses the Quic -/// protocol and no middleware. Differs from the user -/// definition in that is on the client-side. -#[derive(Clone)] -pub struct ClientDef(PhantomData); -impl ConnectionDef for ClientDef { - type Scheme = WrappedSignatureKey; - type Protocol = Quic; -} - -/// The testing run definition for the Push CDN. -/// Uses the real protocols, but with an embedded discovery client. -pub struct TestingDef(PhantomData); -impl RunDef for TestingDef { - type User = UserDef; - type Broker = BrokerDef; - type DiscoveryClientType = Embedded; - type Topic = Topic; -} +use super::NetworkError; /// A communication channel to the Push CDN, which is a collection of brokers and a marshal /// that helps organize them all. @@ -187,20 +72,6 @@ pub struct PushCdnNetwork { // request_receiver_channel: TakeReceiver, } -/// The enum for the topics we can subscribe to in the Push CDN -#[repr(u8)] -#[derive(IntoPrimitive, TryFromPrimitive, Clone, PartialEq, Eq)] -pub enum Topic { - /// The global topic - Global = 0, - /// The DA topic - Da = 1, -} - -/// Implement the `TopicTrait` for our `Topic` enum. We need this to filter -/// topics that are not implemented at the application level. -impl TopicTrait for Topic {} - impl PushCdnNetwork { /// Create a new `PushCdnNetwork` (really a client) from a marshal endpoint, a list of initial /// topics we are interested in, and our wrapped keypair that we use to authenticate with the @@ -271,9 +142,7 @@ impl TestableNetworkingImplementation fn generator( _expected_node_count: usize, _num_bootstrap: usize, - _network_id: usize, da_committee_size: usize, - _is_da: bool, _reliability_config: Option>, _secondary_network_delay: Duration, ) -> AsyncGenerator> { @@ -322,8 +191,11 @@ impl TestableNetworkingImplementation let broker_identifier = format!("{public_address}/{public_address}"); let other_broker_identifier = format!("{other_public_address}/{other_public_address}"); + // Create the message hooks + let user_message_hook = HotShotMessageHook::default(); + // Configure the broker - let config: BrokerConfig> = BrokerConfig { + let config: BrokerConfig> = BrokerConfig { public_advertise_endpoint: public_address.clone(), public_bind_endpoint: public_address, private_advertise_endpoint: private_address.clone(), @@ -338,11 +210,13 @@ impl TestableNetworkingImplementation ca_key_path: None, // 1GB global_memory_pool_size: Some(1024 * 1024 * 1024), + user_message_hook, + broker_message_hook: NoMessageHook, }; // Create and spawn the broker async_spawn(async move { - let broker: Broker> = + let broker: Broker> = Broker::new(config).await.expect("broker failed to start"); // If we are the first broker by identifier, we need to sleep a bit @@ -375,7 +249,7 @@ impl TestableNetworkingImplementation // Spawn the marshal async_spawn(async move { - let marshal: Marshal> = Marshal::new(marshal_config) + let marshal: Marshal> = Marshal::new(marshal_config) .await .expect("failed to spawn marshal"); @@ -568,8 +442,8 @@ impl ConnectedNetwork for PushCdnNetwork { }; // Extract the underlying message - let (PushCdnMessage::Broadcast(Broadcast { message, topics: _ }) - | PushCdnMessage::Direct(Direct { + let (Message::Broadcast(Broadcast { message, topics: _ }) + | Message::Direct(Direct { message, recipient: _, })) = message diff --git a/crates/types/src/simple_certificate.rs b/crates/types/src/simple_certificate.rs index 5d87393ea7..0e4986d37d 100644 --- a/crates/types/src/simple_certificate.rs +++ b/crates/types/src/simple_certificate.rs @@ -157,7 +157,7 @@ impl> membership.stake_table(), U256::from(Self::threshold(membership)), ); - let Ok(commit) = self.date_commitment(upgrade_lock).await else { + let Ok(commit) = self.data_commitment(upgrade_lock).await else { return false; }; ::check( @@ -172,7 +172,7 @@ impl> fn date(&self) -> &Self::Voteable { &self.data } - async fn date_commitment( + async fn data_commitment( &self, upgrade_lock: &UpgradeLock, ) -> Result>> { diff --git a/crates/types/src/simple_vote.rs b/crates/types/src/simple_vote.rs index a86452b9a9..45e131f756 100644 --- a/crates/types/src/simple_vote.rs +++ b/crates/types/src/simple_vote.rs @@ -147,7 +147,7 @@ impl Vote for SimpleVote Commitment { + fn data_commitment(&self) -> Commitment { self.data.commit() } } diff --git a/crates/types/src/traits/network.rs b/crates/types/src/traits/network.rs index c67644eecd..ccfb65feff 100644 --- a/crates/types/src/traits/network.rs +++ b/crates/types/src/traits/network.rs @@ -367,9 +367,7 @@ where fn generator( expected_node_count: usize, num_bootstrap: usize, - network_id: usize, da_committee_size: usize, - is_da: bool, reliability_config: Option>, secondary_network_delay: Duration, ) -> AsyncGenerator>; diff --git a/crates/types/src/traits/node_implementation.rs b/crates/types/src/traits/node_implementation.rs index 1bb9bcb3fc..73d37d8749 100644 --- a/crates/types/src/traits/node_implementation.rs +++ b/crates/types/src/traits/node_implementation.rs @@ -151,9 +151,7 @@ where >::generator( expected_node_count, num_bootstrap, - 0, da_committee_size, - false, reliability_config.clone(), secondary_network_delay, ) diff --git a/crates/types/src/vote.rs b/crates/types/src/vote.rs index 0123ba0085..ae71e06cbd 100644 --- a/crates/types/src/vote.rs +++ b/crates/types/src/vote.rs @@ -39,7 +39,7 @@ pub trait Vote: HasViewNumber { /// Gets the data which was voted on by this vote fn date(&self) -> &Self::Commitment; /// Gets the Data commitment of the vote - fn date_commitment(&self) -> Commitment; + fn data_commitment(&self) -> Commitment; /// Gets the public signature key of the votes creator/sender fn signing_key(&self) -> TYPES::SignatureKey; @@ -83,7 +83,7 @@ pub trait Certificate: HasViewNumber { /// Get the commitment which was voted on fn date(&self) -> &Self::Voteable; /// Get the vote commitment which the votes commit to - fn date_commitment( + fn data_commitment( &self, upgrade_lock: &UpgradeLock, ) -> impl std::future::Future>>>;