Skip to content

Commit

Permalink
New throughput tool
Browse files Browse the repository at this point in the history
  • Loading branch information
parasj committed Jan 17, 2022
1 parent d6f4f9a commit a5d6a4d
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 0 deletions.
2 changes: 2 additions & 0 deletions skylark/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from skylark import GB, MB, print_header, config_file
import skylark.cli.cli_aws
import skylark.cli.cli_azure
import skylark.cli.experiments
from skylark.cli.cli_helper import (
copy_local_local,
copy_local_s3,
Expand All @@ -40,6 +41,7 @@
from skylark.replicate.replicator_client import ReplicatorClient

app = typer.Typer(name="skylark")
app.add_typer(skylark.cli.experiments.app, name="experiments")
app.add_typer(skylark.cli.cli_aws.app, name="aws")
app.add_typer(skylark.cli.cli_azure.app, name="azure")

Expand Down
6 changes: 6 additions & 0 deletions skylark/cli/experiments/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import typer

from skylark.cli.experiments.throughput import throughput_grid

app = typer.Typer(name="experiments")
app.command()(throughput_grid)
208 changes: 208 additions & 0 deletions skylark/cli/experiments/throughput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
from functools import partial
import json
from pathlib import Path
from typing import List, Optional, Tuple
import sys
from datetime import datetime
import os

from tqdm import tqdm
import pandas as pd
import typer
import questionary

from skylark import GB, skylark_root
from skylark.benchmark.utils import provision, split_list
from skylark.cli.cli_helper import load_config
from skylark.compute.aws.aws_cloud_provider import AWSCloudProvider
from skylark.compute.azure.azure_cloud_provider import AzureCloudProvider
from skylark.compute.gcp.gcp_cloud_provider import GCPCloudProvider
from skylark.compute.server import Server
from skylark.utils.utils import do_parallel

aws_regions = AWSCloudProvider.region_list()
azure_regions = AzureCloudProvider.region_list()
gcp_regions = GCPCloudProvider.region_list()


log_info = partial(typer.secho, fg="blue")
log_success = partial(typer.secho, fg="green")
log_error = partial(typer.secho, fg="red")


def setup(instance: Server):
instance.run_command()


def start_iperf3_client(arg_pair: Tuple[Server, Server], iperf3_log_dir: Path, iperf3_runtime: int, iperf3_connections: int):
instance_src, instance_dst = arg_pair
tag = f"{instance_src.region_tag}:{instance_src.network_tier()}_{instance_dst.region_tag}:{instance_dst.network_tier()}"
out_rec = dict(
tag=tag,
)

# run benchmark
stdout, stderr = instance_src.run_command(
f"iperf3 -J -Z -C cubic -t {iperf3_runtime} -P {iperf3_connections} -c {instance_dst.public_ip()}"
)

# save logs
with (iperf3_log_dir / f"{tag}.stdout").open("w") as f:
f.write(stdout)
if stderr:
with (iperf3_log_dir / f"{tag}.stderr").open("W") as f:
f.write(stderr)

try:
result = json.loads(stdout)
except json.JSONDecodeError as e:
log_error(f"({instance_src.region_tag} -> {instance_dst.region_tag}) iperf3 client failed: {stdout} {stderr}")
out_rec["success"] = False
out_rec["exception"] = str(e)
return out_rec

out_rec["throughput_sent"] = result["end"]["sum_sent"]["bits_per_second"]
out_rec["throughput_recieved"] = result["end"]["sum_received"]["bits_per_second"]
out_rec["cpu_utilization"] = result["end"]["cpu_utilization_percent"]["host_total"]
out_rec["success"] = True

instance_src.close_server()
instance_dst.close_server()
return out_rec


def throughput_grid(
resume_from_file: Optional[str] = typer.Argument(
None, help="Resume from a past result. Pass the directory for the past result to resume. Default is None."
),
# regions
aws_region_list: List[str] = typer.Option(aws_regions, "--aws_region_list", "-aws"),
azure_region_list: List[str] = typer.Option(azure_regions, "--azure_region_list", "-azure"),
gcp_region_list: List[str] = typer.Option(gcp_regions, "--gcp_region_list", "-gcp"),
# instances to provision
aws_instance_class: str = typer.Option("m5.8xlarge", "--aws_instance_class", help="AWS instance class to use"),
azure_instance_class: str = typer.Option("Standard_D32_v4", "--azure_instance_class", help="Azure instance class to use"),
gcp_instance_class: str = typer.Option("n2-standard-32", "--gcp_instance_class", help="GCP instance class to use"),
# cloud options
gcp_test_standard_network: bool = typer.Option(
False, "--gcp_test_standard_network", help="Test GCP standard network in addition to premium (default to false)"
),
azure_test_standard_network: bool = typer.Option(
False, "--azure_test_standard_network", help="Test Azure standard network in addition to premium (default to false)"
),
gcp_project: Optional[str] = None,
azure_subscription: Optional[str] = None,
# iperf3 options
iperf3_runtime: int = typer.Option(4, "--iperf3_runtime", help="Runtime for iperf3 in seconds"),
iperf3_connections: int = typer.Option(64, "--iperf3_connections", help="Number of connections to test"),
):
config = load_config()
gcp_project = gcp_project or config.get("gcp_project_id")
azure_subscription = azure_subscription or config.get("azure_subscription_id")
log_info(f"Loaded from config file: gcp_project={gcp_project}, azure_subscription={azure_subscription}")

# todo handle resume directory

# validate arguments
if not all(r in aws_regions for r in aws_region_list):
typer.secho(f"Invalid AWS region list: {aws_region_list}", fg="red")
typer.Abort()
if not all(r in azure_regions for r in azure_region_list):
typer.secho(f"Invalid Azure region list: {azure_region_list}", fg="red")
typer.Abort()
if not all(r in gcp_regions for r in gcp_region_list):
typer.secho(f"Invalid GCP region list: {gcp_region_list}", fg="red")
typer.Abort()
assert not gcp_test_standard_network, "GCP standard network is not supported yet"
assert not azure_test_standard_network, "Azure standard network is not supported yet"

# provision servers
aws = AWSCloudProvider()
azure = AzureCloudProvider(azure_subscription)
gcp = GCPCloudProvider(gcp_project)
aws_instances, azure_instances, gcp_instances = provision(
aws=aws,
azure=azure,
gcp=gcp,
aws_regions_to_provision=aws_region_list,
azure_regions_to_provision=azure_region_list,
gcp_regions_to_provision=gcp_region_list,
aws_instance_class=aws_instance_class,
azure_instance_class=azure_instance_class,
gcp_instance_class=gcp_instance_class,
gcp_use_premium_network=not gcp_test_standard_network,
)
instance_list: List[Server] = [i for ilist in aws_instances.values() for i in ilist]
instance_list.extend([i for ilist in azure_instances.values() for i in ilist])
instance_list.extend([i for ilist in gcp_instances.values() for i in ilist])

# setup instances
setup_cmd = "(sudo apt-get update && sudo apt-get install -y iperf3 nuttcp); pkill iperf3 nuttcp; iperf3 -s -D -J"
do_parallel(lambda x: x.run_command(setup_cmd), instance_list, progress_bar=True, n=-1, desc="Setup")

# build experiment
instance_pairs = [(i1, i2) for i1 in instance_list for i2 in instance_list if i1 != i2]
groups = split_list(instance_pairs)

# confirm experiment
experiment_tag_words = os.popen("bash scripts/utils/get_random_word_hash.sh").read().strip()
experiment_tag = f"{experiment_tag_words}_{iperf3_runtime}s_{iperf3_connections}c_{datetime.now().strftime('%Y.%m.%d_%H.%M')}"
typer.secho(f"\nExperiment configuration: (total pairs = {len(instance_pairs)})", fg="red", bold=True)
for group_idx, group in enumerate(groups):
typer.secho(f"\tGroup {group_idx}: ({len(group)} items)", fg="green", bold=True)
for instance_pair in group:
typer.secho(f"\t{instance_pair[0].region_tag} -> {instance_pair[1].region_tag}")
gbyte_sent = len(instance_pairs) * 5.0 / 8 * iperf3_runtime
typer.secho(f"\niperf_runtime={iperf3_runtime}, iperf3_connections={iperf3_connections}", fg="blue")
typer.secho(f"Approximate runtime: {len(groups) * (10 + iperf3_runtime)}s (assuming 10s startup time)", fg="blue")
typer.secho(f"Approximate data to send: {gbyte_sent:.2f}GB (assuming 5Gbps)", fg="blue")
typer.secho(f"Approximate cost: ${gbyte_sent * 0.1:.2f} (assuming $0.10/GB)", fg="red")
sys.stdout.flush()
sys.stderr.flush()
if not questionary.confirm(f"Launch experiment {experiment_tag}?", default=False).ask():
log_error("Exiting")
sys.exit(1)

# make experiment directory
data_dir = skylark_root / "data"
log_dir = data_dir / "logs" / "throughput_grid" / f"{experiment_tag}"
log_dir.mkdir(exist_ok=True, parents=True)
raw_iperf3_log_dir = log_dir / "raw_iperf3_logs"
raw_iperf3_log_dir.mkdir(exist_ok=True, parents=True)
log_info(f"Experiment tag: {experiment_tag}")
log_info(f"Log directory: {log_dir}")
log_info(f"Raw iperf3 log directory: {raw_iperf3_log_dir}")

# define iperf3 client function
def client_fn(instance_pair):
instance_src, instance_dst = instance_pair
result_rec = dict(
src_region=instance_src.region_tag,
src_tier=instance_src.network_tier(),
src_instance_class=instance_src.instance_class,
dst_region=instance_dst.region_tag,
dst_tier=instance_dst.network_tier(),
dst_instance_class=instance_dst.instance_class,
iperf3_connections=iperf3_connections,
iperf3_runtime=iperf3_runtime,
)
rec = start_iperf3_client(
instance_pair, iperf3_log_dir=raw_iperf3_log_dir, iperf3_runtime=iperf3_runtime, iperf3_connections=iperf3_connections
)
if rec is not None:
result_rec.update(rec)
pbar.update(1)
tqdm.write(f"{result_rec['tag']}: {result_rec.get('throughput_sent', 0.) / GB:.2f}Gbps")
return rec

# run experiments
new_througput_results = []
with tqdm(total=len(instance_pairs), desc="Total throughput evaluation") as pbar:
for group_idx, group in enumerate(groups):
tag_fmt = lambda x: f"{x[0].region_tag}:{x[0].network_tier()} to {x[1].region_tag}:{x[1].network_tier()}"
results = do_parallel(client_fn, group, progress_bar=True, desc=f"Parallel eval group {group_idx}", n=-1, arg_fmt=tag_fmt)
new_througput_results.extend([rec for args, rec in results if rec is not None])

# build dataframe from results
df = pd.DataFrame(new_througput_results)
df.to_csv(log_dir / "throughput.csv", index=False)

0 comments on commit a5d6a4d

Please sign in to comment.