Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(floodsub): Add option to configure the maximum message length #5588

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ libp2p-connection-limits = { version = "0.4.0", path = "misc/connection-limits"
libp2p-core = { version = "0.42.0", path = "core" }
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-floodsub = { version = "0.46.0", path = "protocols/floodsub" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem like a breaking change, do we need a minor version bump instead of patch?

Copy link
Member

@dariusc93 dariusc93 Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume that the introduction of a new public field in FloodsubConfig could be considered as one, especially given that it doesnt use the non_exhaustive attribute. I could be wrong though, but would have to check semver on that. If it is the case that it is a breaking change, we could set this PR to draft until the next round of breaking changes.

Copy link
Author

@romac romac Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I unfortunately do not see a way around doing a minor version bump, at least according to cargo-semver-checks:

argo semver-checks --baseline-version 0.45.0 --release-type patch
     Parsing libp2p-floodsub v0.46.0 (current)
      Parsed [   2.067s] (current)
     Parsing libp2p-floodsub v0.45.0 (baseline, cached)
      Parsed [   0.170s] (baseline)
    Checking libp2p-floodsub v0.45.0 -> v0.46.0 (assume patch change)
     Checked [   0.013s] 79 checks: 78 pass, 1 fail, 0 warn, 0 skip

--- failure constructible_struct_adds_field: externally-constructible struct adds field ---

Description:
A pub struct constructible with a struct literal has a new pub field. Existing struct literals must be updated to include the new field.
        ref: https://doc.rust-lang.org/reference/expressions/struct-expr.html
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.34.0/src/lints/constructible_struct_adds_field.ron

Failed in:
  field FloodsubProtocol.max_message_len in /Users/romac/Informal/Code/rust-libp2p/protocols/floodsub/src/protocol.rs:43
  field FloodsubConfig.max_message_len in /Users/romac/Informal/Code/rust-libp2p/protocols/floodsub/src/lib.rs:54
  field FloodsubRpc.max_message_len in /Users/romac/Informal/Code/rust-libp2p/protocols/floodsub/src/protocol.rs:155
  field FloodsubRpc.max_message_len in /Users/romac/Informal/Code/rust-libp2p/protocols/floodsub/src/protocol.rs:155

     Summary semver requires new major version: 1 major and 0 minor checks failed
    Finished [   2.267s] libp2p-floodsub

Note that what cargo-semver-checks calls a major version corresponds to a minor version bump for libp2p-floodsub since the crate is at 0.x.y.

Copy link
Author

@romac romac Sep 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, if we are doing a breaking change in libp2p-floodsub, would you accept a PR to align the naming of types with the other crates, as per #2217?

ie. rename FloodsubConfig to just Config, Floodsub to Behavior and FloodsubEvent to Event, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, if we are doing a breaking change in libp2p-floodsub, would you accept a PR to align the naming of types with the other crates, as per #2217?

ie. rename FloodsubConfig to just Config, Floodsub to Behavior and FloodsubEvent to Event, etc.

We can do this in a separate PR and just use an alias pointing to the new names with the alias being deprecated :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, will open a PR for this sometime soon then :)

Copy link
Member

@jxs jxs Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's the problem, those fields don't need to be public in FloodsubConfig, and probably FloodsubRpc and FloodsubProtocol also don't need to be public. Could you also do it in the subsequent PR @romac?
Thanks!

libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.45.1", path = "protocols/identify" }
libp2p-identity = { version = "0.2.9" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/floodsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.46.0

- Add a `max_message_len` option to `FloodsubConfig` for configuring the maximum message length.
See [PR 5588](https://github.com/libp2p/rust-libp2p/pull/5588)

## 0.45.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/floodsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-floodsub"
edition = "2021"
rust-version = { workspace = true }
description = "Floodsub protocol for libp2p"
version = "0.45.0"
version = "0.46.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
24 changes: 22 additions & 2 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use libp2p_swarm::{
dial_opts::DialOpts, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour,
NotifyHandler, OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use libp2p_swarm::{OneShotHandlerConfig, SubstreamProtocol};
use smallvec::SmallVec;
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::task::{Context, Poll};
Expand All @@ -45,6 +46,7 @@ pub struct Floodsub {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<ToSwarm<FloodsubEvent, FloodsubRpc>>,

/// The floodsub configuration
config: FloodsubConfig,

/// List of peers to send messages to.
Expand Down Expand Up @@ -97,6 +99,7 @@ impl Floodsub {
topic,
action: FloodsubSubscriptionAction::Subscribe,
}],
max_message_len: self.config.max_message_len,
},
});
}
Expand Down Expand Up @@ -133,6 +136,7 @@ impl Floodsub {
topic: topic.clone(),
action: FloodsubSubscriptionAction::Subscribe,
}],
max_message_len: self.config.max_message_len,
},
});
}
Expand Down Expand Up @@ -163,6 +167,7 @@ impl Floodsub {
topic: topic.clone(),
action: FloodsubSubscriptionAction::Unsubscribe,
}],
max_message_len: self.config.max_message_len,
},
});
}
Expand Down Expand Up @@ -263,6 +268,7 @@ impl Floodsub {
event: FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
max_message_len: self.config.max_message_len,
},
});
}
Expand Down Expand Up @@ -293,6 +299,7 @@ impl Floodsub {
topic,
action: FloodsubSubscriptionAction::Subscribe,
}],
max_message_len: self.config.max_message_len,
},
});
}
Expand Down Expand Up @@ -338,7 +345,13 @@ impl NetworkBehaviour for Floodsub {
_: &Multiaddr,
_: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Default::default())
Ok(OneShotHandler::new(
SubstreamProtocol::new(
FloodsubProtocol::new().with_max_message_len(self.config.max_message_len),
(),
),
OneShotHandlerConfig::default(),
))
}

fn handle_established_outbound_connection(
Expand All @@ -349,7 +362,13 @@ impl NetworkBehaviour for Floodsub {
_: Endpoint,
_: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Default::default())
Ok(OneShotHandler::new(
SubstreamProtocol::new(
FloodsubProtocol::new().with_max_message_len(self.config.max_message_len),
(),
),
OneShotHandlerConfig::default(),
))
}

fn on_connection_handler_event(
Expand Down Expand Up @@ -460,6 +479,7 @@ impl NetworkBehaviour for Floodsub {
FloodsubRpc {
subscriptions: Vec::new(),
messages: vec![message.clone()],
max_message_len: self.config.max_message_len,
},
));
}
Expand Down
18 changes: 18 additions & 0 deletions protocols/floodsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

use libp2p_identity::PeerId;
use protocol::DEFAULT_MAX_MESSAGE_LEN_BYTES;

pub mod protocol;

Expand All @@ -48,13 +49,30 @@ pub struct FloodsubConfig {
/// `true` if messages published by local node should be propagated as messages received from
/// the network, `false` by default.
pub subscribe_local_messages: bool,

/// Maximum message length in bytes. Defaults to 2KiB.
pub max_message_len: usize,
}

impl FloodsubConfig {
pub fn new(local_peer_id: PeerId) -> Self {
Self {
local_peer_id,
subscribe_local_messages: false,
max_message_len: DEFAULT_MAX_MESSAGE_LEN_BYTES,
}
}

/// Set whether or not messages published by local node should be
/// propagated as messages received from the network.
pub fn with_subscribe_local_messages(mut self, subscribe_local_messages: bool) -> Self {
self.subscribe_local_messages = subscribe_local_messages;
self
}

/// Set the maximum message length in bytes.
pub fn with_max_message_len(mut self, max_message_len: usize) -> Self {
self.max_message_len = max_message_len;
self
}
}
36 changes: 28 additions & 8 deletions protocols/floodsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,35 @@ use libp2p_identity::PeerId;
use libp2p_swarm::StreamProtocol;
use std::{io, iter, pin::Pin};

const MAX_MESSAGE_LEN_BYTES: usize = 2048;
pub(crate) const DEFAULT_MAX_MESSAGE_LEN_BYTES: usize = 2048;

const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/floodsub/1.0.0");

/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
#[derive(Debug, Clone, Default)]
pub struct FloodsubProtocol {}
#[derive(Debug, Clone)]
pub struct FloodsubProtocol {
/// Maximum message length in bytes.
pub max_message_len: usize,
}

impl FloodsubProtocol {
/// Builds a new `FloodsubProtocol`.
pub fn new() -> FloodsubProtocol {
FloodsubProtocol {}
/// Builds a new `FloodsubProtocol` with default parameters.
pub fn new() -> Self {
Self::default()
}

/// Set the maximum message length.
pub fn with_max_message_len(mut self, max_message_len: usize) -> Self {
self.max_message_len = max_message_len;
self
}
}

impl Default for FloodsubProtocol {
fn default() -> Self {
Self {
max_message_len: DEFAULT_MAX_MESSAGE_LEN_BYTES,
}
}
}

Expand All @@ -68,7 +85,7 @@ where
Box::pin(async move {
let mut framed = Framed::new(
socket,
quick_protobuf_codec::Codec::<proto::RPC>::new(MAX_MESSAGE_LEN_BYTES),
quick_protobuf_codec::Codec::<proto::RPC>::new(self.max_message_len),
);

let rpc = framed
Expand Down Expand Up @@ -102,6 +119,7 @@ where
topic: Topic::new(sub.topic_id.unwrap_or_default()),
})
.collect(),
max_message_len: self.max_message_len,
})
})
}
Expand Down Expand Up @@ -132,6 +150,8 @@ pub struct FloodsubRpc {
pub messages: Vec<FloodsubMessage>,
/// List of subscriptions.
pub subscriptions: Vec<FloodsubSubscription>,
/// Maximum message length in bytes.
pub max_message_len: usize,
}

impl UpgradeInfo for FloodsubRpc {
Expand All @@ -155,7 +175,7 @@ where
Box::pin(async move {
let mut framed = Framed::new(
socket,
quick_protobuf_codec::Codec::<proto::RPC>::new(MAX_MESSAGE_LEN_BYTES),
quick_protobuf_codec::Codec::<proto::RPC>::new(self.max_message_len),
);
framed.send(self.into_rpc()).await?;
framed.close().await?;
Expand Down
Loading