Skip to content

Commit

Permalink
fix: peer in the up FIFO order
Browse files Browse the repository at this point in the history
  • Loading branch information
guissalustiano committed Jan 19, 2024
1 parent d852ee9 commit 3228691
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 57 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ readme = "README.md"

[dependencies]
petgraph = "0.6.4"

[dev-dependencies]
rayon = "1.8.1"
bzip2 = "0.4.4"
reqwest = { version = "0.11", features = ["blocking"] }
221 changes: 164 additions & 57 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
/// AS-relationship data file and run path exploration using valley-free routing
/// principle.
use std::{
collections::{HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet},
io,
};

use petgraph::{
algo::{all_simple_paths, astar},
graph::{DiGraph, NodeIndex},
visit::EdgeRef,
};
Expand All @@ -25,6 +26,7 @@ impl Default for RelType {
}
}

#[derive(Debug, Clone)]
pub struct Topology {
pub graph: DiGraph<u32, RelType>,
}
Expand All @@ -36,6 +38,9 @@ pub enum TopologyError {
ParseError(String),
}

pub type TopologyPath = Vec<u32>;
type TopologyPathIndex = Vec<NodeIndex>;

impl Topology {
pub fn from_edges(edges: Vec<(u32, u32, RelType)>) -> Self {
let mut graph = DiGraph::new();
Expand Down Expand Up @@ -185,6 +190,12 @@ impl Topology {
)
}

fn has_connection(&self, asn1: u32, asn2: u32) -> bool {
self.graph
.find_edge(self.index_of(asn1).unwrap(), self.index_of(asn2).unwrap())
.is_some()
}

/*
* Given the following topology:
*
Expand Down Expand Up @@ -219,105 +230,109 @@ impl Topology {
* petgraph library.
*/
pub fn paths_graph(&self, asn: u32) -> Topology {
let mut graph = DiGraph::new();
let mut topo = Topology {
graph: DiGraph::new(),
};

let node_map: HashMap<u32, NodeIndex> = self
.all_asns()
.into_iter()
.map(|asn| (asn, graph.add_node(asn)))
.collect();
self.all_asns().into_iter().for_each(|asn| {
topo.graph.add_node(asn);
});

let mut up_path_queue = VecDeque::<u32>::new();
let mut up_seen = HashSet::new();
let mut up_path_queue = Vec::new();
let mut up_seen = Vec::new();

// add first
up_path_queue.push_back(asn);
up_seen.insert(asn);
up_path_queue.push(asn);

while !up_path_queue.is_empty() {
let asn = up_path_queue.pop_front().unwrap(); // While check if has elements
let asn = up_path_queue.pop().unwrap(); // While check if has elements
up_seen.push(asn);

for provider_asn in self.providers_of(asn).unwrap() {
if up_seen.contains(&provider_asn) {
continue;
}
up_seen.insert(provider_asn);
up_path_queue.push_back(provider_asn);
up_path_queue.push(provider_asn);

graph.add_edge(
*node_map.get(&asn).unwrap(),
*node_map.get(&provider_asn).unwrap(),
topo.graph.add_edge(
topo.index_of(asn).unwrap(),
topo.index_of(provider_asn).unwrap(),
RelType::CustomerToProvider,
);
}
}

let mut peer_seen = HashSet::new();
let mut peer_seen = Vec::new();
// Iterate over all ASes reach by UP
// They can only do one PEAR, so we don't need a queue
// In order to avoid cycle, we need to first iterate with was first acess by UP
for asn in up_seen.clone().into_iter() {
for peer_asn in self.peers_of(asn).unwrap() {
peer_seen.insert(peer_asn);
graph.add_edge(
*node_map.get(&asn).unwrap(),
*node_map.get(&peer_asn).unwrap(),
RelType::PearToPear,
);
peer_seen.push(peer_asn);

if !topo.has_connection(peer_asn, asn) {
topo.graph.add_edge(
topo.index_of(asn).unwrap(),
topo.index_of(peer_asn).unwrap(),
RelType::PearToPear,
);
}
}
}

let mut down_seen = HashSet::new();
let mut down_seen = Vec::new();

let mut down_path_queue = VecDeque::<u32>::new();
up_seen
.iter()
.for_each(|asn| down_path_queue.push_back(*asn));
peer_seen
.iter()
.for_each(|asn| down_path_queue.push_back(*asn));
let mut down_path_queue: Vec<_> = up_seen
.into_iter()
.chain(peer_seen.into_iter())
.rev() // down propagate fisrt up then peer
.collect();

while !down_path_queue.is_empty() {
let asn = down_path_queue.pop_front().unwrap();
let asn = down_path_queue.pop().unwrap();

for customer_asn in self.customers_of(asn).unwrap() {
if up_seen.contains(&customer_asn) {
continue;
if !topo.has_connection(customer_asn, asn)
&& !topo.has_connection(asn, customer_asn)
{
topo.graph.add_edge(
topo.index_of(asn).unwrap(),
topo.index_of(customer_asn).unwrap(),
RelType::ProviderToCustomer,
);
}

graph.add_edge(
*node_map.get(&asn).unwrap(),
*node_map.get(&customer_asn).unwrap(),
RelType::ProviderToCustomer,
);

if !down_seen.contains(&customer_asn) && !down_path_queue.contains(&customer_asn) {
down_seen.insert(customer_asn);
down_path_queue.push_back(customer_asn);
down_seen.push(customer_asn);
down_path_queue.push(customer_asn);
}
}
}

// assert!(!is_cyclic_directed(&graph));
Topology { graph }
topo
}
}

#[cfg(test)]
mod test {
use petgraph::algo::is_cyclic_directed;
use std::{env, fs::File};

use bzip2::read::BzDecoder;
use petgraph::{algo::is_cyclic_directed, dot::Dot};
use rayon::iter::{IntoParallelIterator, ParallelIterator};

use super::*;

/*
* ┌───────┐
/* ┌───────┐
* │ 1 │
* └───┬───┘
* ┌─────┴─────┐
* └──┬─┬──┘
* ┌────┘ └────┐
* ┌───▼───┐ ┌───▼───┐
* │ 2 │ │ 3 │
* │ 2 ◄───► 3 │
* └───┬───┘ └───┬───┘
* └─────┬─────┘
* ┌───▼───┐
* └────┐ ┌────┘
* ┌──▼─▼──┐
* │ 4 │
* └───────┘
*/
Expand All @@ -331,6 +346,45 @@ mod test {
])
}

/* ┌─────┐
* │ 1 │
* └──┬──┘
* ┌──────┴─────┐
* ┌──▼──┐ ┌──▼──┐
* │ 2 │ │ 3 │
* └──┬──┘ └──┬──┘
* ┌─────┴────┐ ┌────┴────┐
* ┌──▼──┐ ┌─▼──▼─┐ ┌──▼──┐
* │ 4 │ │ 05 │ │ 6 │
* └─────┘ └──────┘ └─────┘
*/
fn piramid_topology() -> Topology {
Topology::from_edges(vec![
(1, 2, RelType::ProviderToCustomer),
(1, 3, RelType::ProviderToCustomer),
(2, 4, RelType::ProviderToCustomer),
(2, 5, RelType::ProviderToCustomer),
(3, 5, RelType::ProviderToCustomer),
(3, 6, RelType::ProviderToCustomer),
])
}

fn get_caida_data() -> impl std::io::Read {
let cachefile = env::temp_dir().join("20231201.as-rel.txt.bz2");
if cachefile.exists() {
return BzDecoder::new(File::open(cachefile).unwrap());
}

let url = "https://publicdata.caida.org/datasets/as-relationships/serial-1/20231201.as-rel.txt.bz2";
let mut response = reqwest::blocking::get(url).unwrap();

response
.copy_to(&mut File::create(cachefile.clone()).unwrap())
.unwrap();

BzDecoder::new(File::open(cachefile).unwrap())
}

#[test]
fn test_all_asns() {
let topo = diamond_topology();
Expand Down Expand Up @@ -380,6 +434,13 @@ mod test {
assert!(topo.is_ok());
}

#[test]
fn test_from_real_caida() {
let topo = Topology::from_caida(get_caida_data());

assert!(topo.is_ok());
}

#[test]
/* Input:
* ┌─────┐
Expand Down Expand Up @@ -421,11 +482,7 @@ mod test {

let topo = topo.paths_graph(4);

let has_edge = |asn1: u32, asn2: u32| {
topo.graph
.find_edge(topo.index_of(asn1).unwrap(), topo.index_of(asn2).unwrap())
.is_some()
};
let has_edge = |asn1: u32, asn2: u32| topo.has_connection(asn1, asn2);

assert!(has_edge(4, 2));

Expand All @@ -441,4 +498,54 @@ mod test {
assert_eq!(topo.graph.edge_count(), 7);
assert!(!is_cyclic_directed(&topo.graph));
}

#[test]
/* One possible expected output
* ┌───────┐
* │ 1 │
* └──▲─┬──┘
* ┌────┘ └────┐
* ┌───┴───┐ ┌───▼───┐
* │ 2 ├───► 3 │
* └───▲───┘ └───▲───┘
* └────┐ ┌────┘
* ┌──┴─┴──┐
* │ 4 │
* └───────┘
*/
fn test_path_graph_with_ciclic() {
let topo = diamond_topology();
let topo = topo.paths_graph(4);

let has_edge = |asn1: u32, asn2: u32| topo.has_connection(asn1, asn2);

println!("{:?}", Dot::new(&topo.graph));

assert!(!is_cyclic_directed(&topo.graph));
assert!(has_edge(4, 2));
assert!(has_edge(4, 3));

if has_edge(2, 3) {
assert!(!has_edge(3, 2));
assert!(has_edge(2, 1));
assert!(has_edge(1, 3));
} else if has_edge(3, 2) {
assert!(!has_edge(2, 3));
assert!(has_edge(3, 1));
assert!(has_edge(1, 2));
} else {
panic!("should have edge between 2 and 3");
}
}

#[test]
#[ignore]
fn test_path_graph_never_generate_ciclic() {
let topo = Topology::from_caida(get_caida_data()).unwrap();

topo.all_asns().into_par_iter().for_each(|asn| {
let topo = topo.paths_graph(asn);
assert!(!is_cyclic_directed(&topo.graph));
});
}
}

0 comments on commit 3228691

Please sign in to comment.