Skip to content

Commit

Permalink
resolve reverse dns concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
yu-re-ka committed Nov 23, 2023
1 parent 475fd29 commit a0fe5dc
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 57 deletions.
139 changes: 93 additions & 46 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use crate::store::{Query, QueryLimits, Store, QueryResult, RouteAttrs, RouterId, ResolvedRouteAttrs, ResolvedNexthop, NetQuery};
use crate::store::{NetQuery, Query, QueryLimits, QueryResult, ResolvedNexthop, Store};
use axum::body::StreamBody;
use axum::extract::{Query as AxumQuery, State};
use axum::http::StatusCode;
use axum::response::{Response, IntoResponse};
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::Router;
use futures_util::{FutureExt, StreamExt};
use hickory_resolver::config::LookupIpStrategy;
use hickory_resolver::TokioAsyncResolver;
use futures_util::{FutureExt, StreamExt};
use log::*;
use serde::{Serialize, Deserialize};
use ipnet::IpNet;
use std::net::IpAddr;
use std::collections::HashMap;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::convert::Infallible;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::sync::Arc;

Expand All @@ -31,18 +31,12 @@ pub struct ApiServerConfig {
}

#[derive(Debug, Clone, Serialize)]
pub enum NexthopResolved {
ReverseDns(String),
RouterId(RouterId),
}

#[derive(Debug, Clone, Serialize, Default)]
pub struct RouteAttrsResolved {
#[serde(flatten)]
inner: RouteAttrs,
communities_resolved: HashMap<(u16, u16), String>,
large_communities_resolved: HashMap<(u32, u32, u32), String>,
nexthop_resolved: Option<NexthopResolved>,
pub enum ApiResult {
Route(QueryResult),
ReverseDns {
nexthop: IpAddr,
nexthop_resolved: ResolvedNexthop,
},
}

// Make our own error that wraps `anyhow::Error`.
Expand Down Expand Up @@ -78,7 +72,13 @@ async fn parse_or_resolve(resolver: &TokioAsyncResolver, name: String) -> anyhow
return Ok(addr.into());
}

Ok(resolver.lookup_ip(&name).await?.iter().next().ok_or(anyhow::anyhow!("Name resolution failure"))?.into())
Ok(resolver
.lookup_ip(&format!("{}.", name))
.await?
.iter()
.next()
.ok_or(anyhow::anyhow!("Name resolution failure"))?
.into())
}

async fn query<T: Store>(
Expand All @@ -89,7 +89,9 @@ async fn query<T: Store>(

let net_query = match query.net_query {
NetQuery::Contains(name) => NetQuery::Contains(parse_or_resolve(&resolver, name).await?),
NetQuery::MostSpecific(name) => NetQuery::MostSpecific(parse_or_resolve(&resolver, name).await?),
NetQuery::MostSpecific(name) => {
NetQuery::MostSpecific(parse_or_resolve(&resolver, name).await?)
}
NetQuery::Exact(name) => NetQuery::Exact(parse_or_resolve(&resolver, name).await?),
NetQuery::OrLonger(name) => NetQuery::OrLonger(parse_or_resolve(&resolver, name).await?),
};
Expand All @@ -98,7 +100,7 @@ async fn query<T: Store>(
table_query: query.table_query,
net_query,
limits: query.limits,
as_path_regex: query.as_path_regex
as_path_regex: query.as_path_regex,
};

let mut limits = query.limits.take().unwrap_or(cfg.query_limits.clone());
Expand All @@ -108,32 +110,77 @@ async fn query<T: Store>(
cfg.query_limits.max_results_per_table,
);
query.limits = Some(limits);
let stream = store.get_routes(query)
.then(move |route| {
let resolver = resolver.clone();
async move {
QueryResult {
client: route.client, net: route.net, session: route.session,
state: route.state, table: route.table,
attrs: ResolvedRouteAttrs {
resolved_communities: Default::default(),
resolved_large_communities: Default::default(),
resolved_nexthop: match route.attrs.nexthop.as_ref() {
Some(nexthop) => match resolver.reverse_lookup(*nexthop).await.ok().and_then(|reverse| reverse.iter().next().map(|x| x.0.clone())) {
Some(reverse) => ResolvedNexthop::ReverseDns(reverse.to_string()),
None => ResolvedNexthop::None,

// for deduplicating the nexthop resolutions
let have_resolved = Arc::new(std::sync::Mutex::new(HashSet::new()));

let stream = store
.get_routes(query)
.flat_map_unordered(None, move |route| {
enum StreamState {
SendRoute,
SendDns,
Done,
}

futures_util::stream::unfold(
(
StreamState::SendRoute,
route,
resolver.clone(),
have_resolved.clone(),
),
move |(state, route, resolver, have_resolved)| {
Box::pin(async move {
match state {
StreamState::SendRoute => Some((
ApiResult::Route(route.clone()),
(StreamState::SendDns, route, resolver, have_resolved),
)),
StreamState::SendDns => match route.attrs.nexthop.clone() {
Some(nexthop) => {
if have_resolved.lock().unwrap().insert(nexthop) {
resolver
.reverse_lookup(nexthop)
.await
.ok()
.and_then(|reverse| {
reverse.iter().next().map(|x| x.0.clone())
})
.map(|x| {
(
ApiResult::ReverseDns {
nexthop,
nexthop_resolved:
ResolvedNexthop::ReverseDns(
x.to_string(),
),
},
(
StreamState::Done,
route,
resolver,
have_resolved,
),
)
})
} else {
None
}
}
None => None,
},
StreamState::Done => None,
}
None => ResolvedNexthop::None,
},
inner: route.attrs,
})
},
}
}
})
.map(|route| {
let json = serde_json::to_string(&route).unwrap();
Ok::<_, Infallible>(format!("{}\n", json))
});
)
})
.map(|result| {
let json = serde_json::to_string(&result).unwrap();
Ok::<_, Infallible>(format!("{}\n", json))
});

Ok(StreamBody::new(stream))
}

Expand Down
13 changes: 2 additions & 11 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,6 @@ pub enum ResolvedNexthop {
ReverseDns(String),
}

#[derive(Debug, Clone, Serialize, Default)]
pub struct ResolvedRouteAttrs {
#[serde(flatten)]
pub inner: RouteAttrs,
pub resolved_communities: HashMap<(u16, u16), String>,
pub resolved_large_communities: HashMap<(u32, u32, u32), String>,
pub resolved_nexthop: ResolvedNexthop,
}

#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SessionId {
Expand Down Expand Up @@ -132,7 +123,7 @@ pub struct Query<T = IpNet> {

#[derive(Debug, Clone, Serialize)]
#[serde(deny_unknown_fields)]
pub struct QueryResult<T = RouteAttrs> {
pub struct QueryResult {
pub state: RouteState,
pub net: IpNet,
#[serde(flatten)]
Expand All @@ -142,7 +133,7 @@ pub struct QueryResult<T = RouteAttrs> {
#[serde(flatten)]
pub session: Option<Session>,
#[serde(flatten)]
pub attrs: T,
pub attrs: RouteAttrs,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down

0 comments on commit a0fe5dc

Please sign in to comment.