Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network graph with traceroute #24

Merged
merged 17 commits into from
Jan 3, 2022
154 changes: 154 additions & 0 deletions skylark/benchmark/network/traceroute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import argparse
import re
import os
from datetime import datetime
import json
from typing import List, Tuple

from loguru import logger
from tqdm import tqdm

from skylark import skylark_root
from skylark.benchmark.utils import provision, split_list
from skylark.compute.aws.aws_cloud_provider import AWSCloudProvider
from skylark.compute.aws.aws_server import AWSServer
from skylark.compute.gcp.gcp_cloud_provider import GCPCloudProvider
from skylark.compute.gcp.gcp_server import GCPServer
from skylark.compute.server import Server
from skylark.utils import do_parallel

# traceroute parser from https://github.com/ckreibich/tracerouteparser.py
from skylark.benchmark.network.tracerouteparser import TracerouteParser


def parse_args():
aws_regions = AWSCloudProvider.region_list()
gcp_regions = GCPCloudProvider.region_list()
parser = argparse.ArgumentParser(description="Provision EC2 instances")
parser.add_argument("--aws_instance_class", type=str, default="i3en.large", help="Instance class")
parser.add_argument("--aws_region_list", type=str, nargs="*", default=aws_regions)

parser.add_argument("--gcp_project", type=str, default="bair-commons-307400", help="GCP project")
parser.add_argument("--gcp_instance_class", type=str, default="n1-highcpu-8", help="Instance class")
parser.add_argument("--gcp_region_list", type=str, nargs="*", default=gcp_regions)
parser.add_argument(
"--gcp_test_standard_network", action="store_true", help="Test GCP standard network in addition to premium (default)"
)

parser.add_argument("--setup_script", type=str, default=None, help="Setup script to run on each instance (URL), optional")
args = parser.parse_args()

# filter by valid regions
args.aws_region_list = [r for r in args.aws_region_list if r in aws_regions]
args.gcp_region_list = [r for r in args.gcp_region_list if r in gcp_regions]

return args


def main(args):
data_dir = skylark_root / "data"
log_dir = data_dir / "logs" / "traceroute" / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_dir.mkdir(exist_ok=True, parents=True)

aws = AWSCloudProvider()
gcp = GCPCloudProvider(args.gcp_project)
aws_instances, gcp_instances = provision(
aws=aws,
gcp=gcp,
aws_regions_to_provision=args.aws_region_list,
gcp_regions_to_provision=args.gcp_region_list,
aws_instance_class=args.aws_instance_class,
gcp_instance_class=args.gcp_instance_class,
gcp_use_premium_network=True,
setup_script=args.setup_script,
log_dir=str(log_dir),
)
instance_list: List[Server] = [i for ilist in aws_instances.values() for i in ilist]
instance_list.extend([i for ilist in gcp_instances.values() for i in ilist])
if args.gcp_test_standard_network:
logger.info(f"Provisioning standard GCP instances")
_, gcp_standard_instances = provision(
aws=aws,
gcp=gcp,
aws_regions_to_provision=[],
gcp_regions_to_provision=args.gcp_region_list,
aws_instance_class=args.aws_instance_class,
gcp_instance_class=args.gcp_instance_class,
gcp_use_premium_network=False,
setup_script=args.setup_script,
log_dir=str(log_dir),
)
instance_list.extend([i for ilist in gcp_standard_instances.values() for i in ilist])

def setup(instance: Server):
instance.run_command("sudo apt-get update")
instance.run_command("sudo apt-get install traceroute")
instance.run_command("sudo apt-get install whois")

do_parallel(setup, instance_list, progress_bar=True, n=24, desc="Setup")

# run traceroute on each pair of instances
def run_traceroute(arg_pair: Tuple[Server, Server]):
instance_src, instance_dst = arg_pair
src_ip, dst_ip = instance_src.public_ip, instance_dst.public_ip
logger.info(f"traceroute {instance_src.region_tag} -> {instance_dst.region_tag} ({dst_ip})")
stdout, stderr = instance_src.run_command(f"traceroute {dst_ip}")

parser = TracerouteParser()
parser.parse_data(stdout)
result = {}
for idx in range(len(parser.hops)):
result[idx] = []
for probe in parser.hops[idx].probes:

stdout, stderr = instance_src.run_command(f"whois {probe.ipaddr}")
lines = stdout.split("\n")
orgline = None
for l in lines:
if "OrgName:" in l:
orgline = l
result[idx].append({"ipaddr": probe.ipaddr, "name": probe.name, "rtt": probe.rtt, "org": orgline})

tqdm.write(
f"({instance_src.region_tag}:{instance_src.network_tier} -> {instance_dst.region_tag}:{instance_dst.network_tier}) hops: {len(result.keys())}"
)
instance_src.close_server()
instance_dst.close_server()

return result

traceroute_results = []
instance_pairs = [(i1, i2) for i1 in instance_list for i2 in instance_list if i1 != i2]
with tqdm(total=len(instance_pairs), desc="Total traceroute evaluation") as pbar:
groups = split_list(instance_pairs)
for group_idx, group in enumerate(groups):
results = do_parallel(
run_traceroute,
group,
progress_bar=True,
desc=f"Parallel eval group {group_idx}",
n=36,
arg_fmt=lambda x: f"{x[0].region_tag}:{x[0].network_tier} to {x[1].region_tag}:{x[1].network_tier}",
)
for pair, result in results:
pbar.update(1)
result_rec = {}
result_rec["src"] = pair[0].region_tag
result_rec["dst"] = pair[1].region_tag
result_rec["src_instance_class"] = pair[0].instance_class()
result_rec["dst_instance_class"] = pair[1].instance_class()
result_rec["src_network_tier"] = pair[0].network_tier()
result_rec["dst_network_tier"] = pair[1].network_tier()
result_rec["result"] = result
traceroute_results.append(result_rec)

traceroute_dir = data_dir / "traceroute"
traceroute_dir.mkdir(exist_ok=True, parents=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

with open(str(traceroute_dir / f"traceroute_{timestamp}.json"), "w") as f:
json.dump(traceroute_results, f)


if __name__ == "__main__":
main(parse_args())
215 changes: 215 additions & 0 deletions skylark/benchmark/network/tracerouteparser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
"""
A traceroute output parser, structuring the traceroute into a
sequence of hops, each containing individual probe results.

Courtesy of the Netalyzr project: http://netalyzr.icsi.berkeley.edu
"""
# ChangeLog
# ---------
#
# 1.0: Initial release, tested on Linux/Android traceroute inputs only.
# Also Python 2 only, most likely. (Send patches!)
#
# Copyright 2013 Christian Kreibich. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY DIRECT,
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from io import StringIO
import re


class Probe(object):
"""
Abstraction of an individual probe in a traceroute.
"""

def __init__(self):
self.ipaddr = None
self.name = None
self.rtt = None # RTT in ms
self.anno = None # Annotation, such as !H, !N, !X, etc

def clone(self):
"""
Return a copy of this probe, conveying the same endpoint.
"""
copy = Probe()
copy.ipaddr = self.ipaddr
copy.name = self.name
return copy


class Hop(object):
"""
A traceroute hop consists of a number of probes.
"""

def __init__(self):
self.idx = None # Hop count, starting at 1
self.probes = [] # Series of Probe instances

def add_probe(self, probe):
"""Adds a Probe instance to this hop's results."""
self.probes.append(probe)

def __str__(self):
res = []
last_probe = None
for probe in self.probes:
if probe.name is None:
res.append("*")
continue
anno = "" if probe.anno is None else " " + probe.anno
if last_probe is None or last_probe.name != probe.name:
res.append("%s (%s) %1.3f ms%s" % (probe.name, probe.ipaddr, probe.rtt, anno))
else:
res.append("%1.3f ms%s" % (probe.rtt, anno))
last_probe = probe
return " ".join(res)


class TracerouteParser(object):
"""
A parser for traceroute text. A traceroute consists of a sequence of
hops, each of which has at least one probe. Each probe records IP,
hostname and timing information.
"""

HEADER_RE = re.compile(r"traceroute to (\S+) \((\d+\.\d+\.\d+\.\d+)\)")

def __init__(self):
self.dest_ip = None
self.dest_name = None
self.hops = []

def __str__(self):
res = ["traceroute to %s (%s)" % (self.dest_name, self.dest_ip)]
ctr = 1
for hop in self.hops:
res.append("%2d %s" % (ctr, str(hop)))
ctr += 1
return "\n".join(res)

def parse_data(self, data):
"""Parser entry point, given string of the whole traceroute output."""
self.parse_hdl(StringIO(data))

def parse_hdl(self, hdl):
"""Parser entry point, given readable file handle."""
self.dest_ip = None
self.dest_name = None
self.hops = []

for line in hdl:
line = line.strip()
if line == "":
continue
if line.lower().startswith("traceroute"):
# It's the header line at the beginning of the traceroute.
mob = self.HEADER_RE.match(line)
if mob:
self.dest_ip = mob.group(2)
self.dest_name = mob.group(1)
else:
hop = self._parse_hop(line)
self.hops.append(hop)

def _parse_hop(self, line):
"""Internal helper, parses a single line in the output."""
parts = line.split()
parts.pop(0) # Drop hop number, implicit in resulting sequence
hop = Hop()
probe = None

while len(parts) > 0:
probe = self._parse_probe(parts, probe)
if probe:
hop.add_probe(probe)

return hop

def _parse_probe(self, parts, last_probe=None):
"""Internal helper, parses the next probe's results from a line."""
try:
probe = Probe() if last_probe is None else last_probe.clone()

tok1 = parts.pop(0)
if tok1 == "*":
return probe

tok2 = parts.pop(0)
if tok2 == "ms":
# This is an additional RTT for the same endpoint we
# saw before.
probe.rtt = float(tok1)
if len(parts) > 0 and parts[0].startswith("!"):
probe.anno = parts.pop(0)
else:
# This is a probe result from a different endpoint
probe.name = tok1
probe.ipaddr = tok2[1:][:-1]
probe.rtt = float(parts.pop(0))
parts.pop(0) # Drop "ms"
if len(parts) > 0 and parts[0].startswith("!"):
probe.anno = parts.pop(0)

return probe

except (IndexError, ValueError):
return None


def demo():
"""A simple example."""

tr_data = """
traceroute to edgecastcdn.net (72.21.81.13), 30 hops max, 38 byte packets
1 * *
2 * *
3 * *
4 10.251.11.32 (10.251.11.32) 3574.616 ms 0.153 ms
5 10.251.10.2 (10.251.10.2) 465.821 ms 2500.031 ms
6 172.18.68.206 (172.18.68.206) 170.197 ms 78.979 ms
7 172.18.59.165 (172.18.59.165) 151.123 ms 525.177 ms
8 172.18.59.170 (172.18.59.170) 150.909 ms 172.18.59.174 (172.18.59.174) 62.591 ms
9 172.18.75.5 (172.18.75.5) 123.078 ms 68.847 ms
10 12.91.11.5 (12.91.11.5) 79.834 ms 556.366 ms
11 cr2.ptdor.ip.att.net (12.123.157.98) 245.606 ms 83.038 ms
12 cr81.st0wa.ip.att.net (12.122.5.197) 80.078 ms 96.588 ms
13 gar1.omhne.ip.att.net (12.122.82.17) 363.800 ms 12.122.111.9 (12.122.111.9) 72.113 ms
14 206.111.7.89.ptr.us.xo.net (206.111.7.89) 188.965 ms 270.203 ms
15 xe-0-6-0-5.r04.sttlwa01.us.ce.gin.ntt.net (129.250.196.230) 706.390 ms ae-6.r21.sttlwa01.us.bb.gin.ntt.net (129.250.5.44) 118.042 ms
16 xe-9-3-2-0.co1-96c-1b.ntwk.msn.net (207.46.47.85) 675.110 ms 72.21.81.13 (72.21.81.13) 82.306 ms

"""
# Create parser instance:
trp = TracerouteParser()

# Give it some data:
trp.parse_data(tr_data)


if __name__ == "__main__":
demo()
Loading