Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/kad: Improve options to efficiently retrieve #2712

Merged
merged 33 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7ce7057
feat(kad): add limit option for getting providers
dignifiedquire Jun 16, 2022
769ce4b
feat(kad): report get_providers call event based
dignifiedquire Jun 16, 2022
f68c903
change `GetRecord`api
dignifiedquire Sep 27, 2022
baf3e60
apply cr
dignifiedquire Oct 4, 2022
5db1bfa
tests and fixups for getclosestpeers
dignifiedquire Nov 11, 2022
91b5f59
remove KademliaCaching
dignifiedquire Nov 11, 2022
30eb56c
apply cr: move to nonzerousize
dignifiedquire Nov 11, 2022
0720deb
fix example
dignifiedquire Nov 11, 2022
a0b26b0
happy clippy
dignifiedquire Nov 11, 2022
e0e79fd
protocols/kad: Refactor step tracking
mxinden Nov 11, 2022
a4f0210
cr feedback round 1
dignifiedquire Nov 12, 2022
4e448c4
cr: improve api for GetRecordOk and GetProvidersOk
dignifiedquire Nov 12, 2022
96a952e
fixup rebase of examples
dignifiedquire Nov 12, 2022
2724afd
switch to counter for number of observed record
dignifiedquire Nov 12, 2022
83a2a86
bring back kademliacaching
dignifiedquire Nov 12, 2022
b483983
examples/file-sharing: Revert usage of HashSet
mxinden Nov 17, 2022
aa8a6ce
examples/file-sharing: Finish query once provider is found
mxinden Nov 17, 2022
3912acc
protocols/kad: Remove pub(crate) from replication_factor
mxinden Nov 17, 2022
e339cdf
protocols/kad: Refactor get_record
mxinden Nov 17, 2022
3ced598
protocols/kad: Refactor get_providers step instantiation
mxinden Nov 17, 2022
ac2e525
protocols/kad: Remove pub from ProgressStep methods
mxinden Nov 17, 2022
5ca8d70
remove unused as_intermediary_result
dignifiedquire Nov 18, 2022
bd05da6
fix test cr comments
dignifiedquire Nov 18, 2022
c7c4341
fixup: rebase
dignifiedquire Nov 18, 2022
5ecf9cb
use get instead of checked_add
dignifiedquire Nov 18, 2022
055636e
Update protocols/kad/src/behaviour.rs
dignifiedquire Nov 18, 2022
d5cb7e9
Merge branch 'master' into feat-kad-count
dignifiedquire Nov 22, 2022
0e443d4
Merge remote-tracking branch 'upstream/master' into feat-kad-count
dignifiedquire Nov 24, 2022
c9c00df
add changelog
dignifiedquire Nov 24, 2022
05e29d4
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
1223f02
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
9c28d98
Update protocols/kad/CHANGELOG.md
dignifiedquire Nov 24, 2022
cfd5461
Merge branch 'master' into feat-kad-count
dignifiedquire Nov 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use libp2p::{
swarm::{NetworkBehaviour, SwarmEvent},
PeerId, Swarm,
};
use libp2p_kad::{GetProvidersOk, GetRecordOk};
use std::error::Error;

#[async_std::main]
Expand Down Expand Up @@ -120,33 +121,32 @@ async fn main() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryCompleted { result, ..})) => {
SwarmEvent::Behaviour(MyBehaviourEvent::Kademlia(KademliaEvent::OutboundQueryProgressed { result, ..})) => {
match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { key, providers, .. })) => {
for peer in providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(ok.key.as_ref()).unwrap()
"Peer {peer:?} provides key {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {err:?}");
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
QueryResult::GetRecord(Ok(
GetRecordOk::FoundRecord(PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
})
)) => {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
QueryResult::GetRecord(Ok(_)) => {}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {err:?}");
}
Expand Down Expand Up @@ -191,7 +191,7 @@ fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
}
}
};
kademlia.get_record(key, Quorum::One);
kademlia.get_record(key);
}
Some("GET_PROVIDERS") => {
let key = {
Expand Down
35 changes: 27 additions & 8 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ mod network {
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::StartProviding(_),
..
Expand All @@ -426,18 +426,37 @@ mod network {
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders {
providers,
..
})),
..
},
)) => {
let _ = self
.pending_get_providers
.remove(&id)
.expect("Completed query to be previously pending.")
.send(providers);
if let Some(sender) = self.pending_get_providers.remove(&id) {
sender.send(providers).expect("Receiver not to be dropped");

// Finish the query. We are only interested in the first result.
self.swarm
.behaviour_mut()
.kademlia
.query_mut(&id)
.unwrap()
.finish();
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
result:
QueryResult::GetProviders(Ok(
GetProvidersOk::FinishedWithNoAdditionalRecord { .. },
)),
..
},
)) => {}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::Message { message, .. },
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
let event = swarm.select_next_some().await;
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {
if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetClosestPeers(result),
..
}) = event
Expand Down
4 changes: 4 additions & 0 deletions misc/metrics/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090].

- Changed `Metrics::query_result_get_record_ok` from `Histogram` to a `Counter`.
See [PR 2712].

[PR 2982]: https://github.com/libp2p/rust-libp2p/pull/2982/
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712

# 0.10.0

Expand Down
23 changes: 14 additions & 9 deletions misc/metrics/src/kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::{Registry, Unit};

pub struct Metrics {
query_result_get_record_ok: Histogram,
query_result_get_record_ok: Counter,
query_result_get_record_error: Family<GetRecordResult, Counter>,

query_result_get_closest_peers_ok: Histogram,
Expand All @@ -48,7 +48,7 @@ impl Metrics {
pub fn new(registry: &mut Registry) -> Self {
let sub_registry = registry.sub_registry_with_prefix("kad");

let query_result_get_record_ok = Histogram::new(exponential_buckets(1.0, 2.0, 10));
let query_result_get_record_ok = Counter::default();
sub_registry.register(
"query_result_get_record_ok",
"Number of records returned by a successful Kademlia get record query.",
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Metrics {
impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
fn record(&self, event: &libp2p_kad::KademliaEvent) {
match event {
libp2p_kad::KademliaEvent::OutboundQueryCompleted { result, stats, .. } => {
libp2p_kad::KademliaEvent::OutboundQueryProgressed { result, stats, .. } => {
self.query_result_num_requests
.get_or_create(&result.into())
.observe(stats.num_requests().into());
Expand All @@ -180,9 +180,10 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {

match result {
libp2p_kad::QueryResult::GetRecord(result) => match result {
Ok(ok) => self
.query_result_get_record_ok
.observe(ok.records.len() as f64),
Ok(libp2p_kad::GetRecordOk::FoundRecord(_)) => {
self.query_result_get_record_ok.inc();
}
Ok(libp2p_kad::GetRecordOk::FinishedWithNoAdditionalRecord { .. }) => {}
Err(error) => {
self.query_result_get_record_error
.get_or_create(&error.into())
Expand All @@ -200,9 +201,13 @@ impl super::Recorder<libp2p_kad::KademliaEvent> for Metrics {
}
},
libp2p_kad::QueryResult::GetProviders(result) => match result {
Ok(ok) => self
.query_result_get_providers_ok
.observe(ok.providers.len() as f64),
Ok(libp2p_kad::GetProvidersOk::FoundProviders { providers, .. }) => {
self.query_result_get_providers_ok
.observe(providers.len() as f64);
}
Ok(libp2p_kad::GetProvidersOk::FinishedWithNoAdditionalRecord {
..
}) => {}
Err(error) => {
self.query_result_get_providers_error
.get_or_create(&error.into())
Expand Down
8 changes: 8 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
This would eventually lead to warning that says: "New inbound substream to PeerId exceeds inbound substream limit. No older substream waiting to be reused."
See [PR 3152].

- Refactor APIs to be streaming.
- Renamed `KademliaEvent::OutboundQueryCompleted` to `KademliaEvent::OutboundQueryProgressed`
- Instead of a single event `OutboundQueryCompleted`, there are now multiple events emitted, allowing the user to process them as they come in (via the new `OutboundQueryProgressed`). See `ProgressStep` to identify the final `OutboundQueryProgressed` of a single query.
- To finish a query early, i.e. before the final `OutboundQueryProgressed` of the query, a caller needs to call `query.finish()`.
- There is no more automatic caching of records. The user has to manually call `put_record_to` on the `QueryInfo::GetRecord.cache_candidates` to cache a record to a close peer that did not return the record on the foregone query.
See [PR 2712].

[PR 3085]: https://github.com/libp2p/rust-libp2p/pull/3085
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090
[PR 3152]: https://github.com/libp2p/rust-libp2p/pull/3152
[PR 2712]: https://github.com/libp2p/rust-libp2p/pull/2712

# 0.41.0

Expand Down
Loading