Skip to content

Commit

Permalink
refactor nexthop reverse dns code
Browse files Browse the repository at this point in the history
  • Loading branch information
yu-re-ka committed Jan 18, 2024
1 parent eef7c25 commit e5e6323
Showing 1 changed file with 26 additions and 60 deletions.
86 changes: 26 additions & 60 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::collections::HashSet;
use std::convert::Infallible;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;

#[cfg(feature = "embed-static")]
Expand Down Expand Up @@ -112,72 +113,37 @@ async fn query<T: Store>(
query.limits = Some(limits);

// for deduplicating the nexthop resolutions
let have_resolved = Arc::new(std::sync::Mutex::new(HashSet::new()));
let mut have_resolved = HashSet::new();

let stream = store
.get_routes(query)

// this little mess is required to attach reverse DNS resolutions for each nexthop, and
// resolve all the entries in parallel as they arrive, while also deduplicating them
.flat_map_unordered(None, move |route| {
enum StreamState {
SendRoute,
SendDns,
Done,
let futures = futures_util::stream::FuturesUnordered::<
Pin<Box<dyn std::future::Future<Output = Option<ApiResult>> + Send>>,
>::new();

futures.push(Box::pin(futures_util::future::ready(Some(
ApiResult::Route(route.clone()),
))));

if let Some(nexthop) = route.attrs.nexthop {
if have_resolved.insert(nexthop) {
let resolver = resolver.clone();
futures.push(Box::pin(async move {
resolver
.reverse_lookup(nexthop)
.await
.ok()
.and_then(|reverse| reverse.iter().next().map(|x| x.0.to_string()))
.map(|x| ApiResult::ReverseDns {
nexthop,
nexthop_resolved: ResolvedNexthop::ReverseDns(x),
})
}))
}
}

futures_util::stream::unfold(
(
StreamState::SendRoute,
route,
resolver.clone(),
have_resolved.clone(),
),
move |(state, route, resolver, have_resolved)| {
Box::pin(async move {
match state {
StreamState::SendRoute => Some((
ApiResult::Route(route.clone()),
(StreamState::SendDns, route, resolver, have_resolved),
)),
StreamState::SendDns => match route.attrs.nexthop.clone() {
Some(nexthop) => {
if have_resolved.lock().unwrap().insert(nexthop) {
resolver
.reverse_lookup(nexthop)
.await
.ok()
.and_then(|reverse| {
reverse.iter().next().map(|x| x.0.clone())
})
.map(|x| {
(
ApiResult::ReverseDns {
nexthop,
nexthop_resolved:
ResolvedNexthop::ReverseDns(
x.to_string(),
),
},
(
StreamState::Done,
route,
resolver,
have_resolved,
),
)
})
} else {
None
}
}
None => None,
},
StreamState::Done => None,
}
})
},
)
futures
})
.map(|result| {
let json = serde_json::to_string(&result).unwrap();
Expand Down

0 comments on commit e5e6323

Please sign in to comment.