Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[testing-on-gke part 6.9] Export output to a gsheet #2524

Open
wants to merge 3 commits into
base: garnitin/add-gke-load-testing/add-gsheet-utility
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 106 additions & 70 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import pprint
import subprocess
import sys
from typing import List, Tuple

# local library imports
sys.path.append("../")
import dlio_workload
from utils.utils import get_memory, get_cpu, unix_to_timestamp, standard_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch
from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS
from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS, default_service_account_key_file, export_to_csv, export_to_gsheet

_LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs"

Expand All @@ -51,6 +52,31 @@
"gcsfuse_mount_options": "",
}

mash_installed = False

_HEADER = (
"File Size",
"File #",
"Total Size (GB)",
"Batch Size",
"Scenario",
"Epoch",
"Duration (s)",
"GPU Utilization (%)",
"Throughput (sample/s)",
"Throughput (MB/s)",
"Throughput over Local SSD (%)",
"GCSFuse Lowest Memory (MB)",
"GCSFuse Highest Memory (MB)",
"GCSFuse Lowest CPU (core)",
"GCSFuse Highest CPU (core)",
"Pod-name",
"Start",
"End",
"GcsfuseMountOptions",
"InstanceID",
)


def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int:
"""Downloads instanceId-specific dlio outputs for each dlioWorkload locally.
Expand All @@ -70,11 +96,9 @@ def downloadDlioOutputs(dlioWorkloads: set, instanceId: str) -> int:
for dlioWorkload in dlioWorkloads:
srcObjects = f"gs://{dlioWorkload.bucket}/logs/{instanceId}"
print(f"Downloading DLIO logs from the {srcObjects} ...")
returncode, errorStr = download_gcs_objects(
srcObjects, _LOCAL_LOGS_LOCATION
)
returncode = download_gcs_objects(srcObjects, _LOCAL_LOGS_LOCATION)
if returncode < 0:
print(f"Failed to download DLIO logs from {srcObjects}: {errorStr}")
print(f"Failed to download DLIO logs from {srcObjects}: {returncode}")
return returncode
return 0

Expand Down Expand Up @@ -231,68 +255,87 @@ def fetch_cpu_memory_data():
return output


def writeRecordsToCsvOutputFile(output: dict, output_file_path: str):
with open(output_file_path, "a") as output_file:
# Write a new header row.
output_file.write(
"File Size,File #,Total Size (GB),Batch Size,Scenario,Epoch,Duration"
" (s),GPU Utilization (%),Throughput (sample/s),Throughput"
" (MB/s),Throughput over Local SSD (%),GCSFuse Lowest Memory"
" (MB),GCSFuse Highest Memory (MB),GCSFuse Lowest CPU (core),GCSFuse"
" Highest CPU (core),Pod,Start,End,GcsfuseMountOptions,InstanceID\n"
)
def writeOutput(
output: dict,
args: dict,
):
rows = []

for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
* int(record_set["num_files_train"])
/ (1024**3)
)

for scenario in SUPPORTED_SCENARIOS:
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue
for key in output:
record_set = output[key]
total_size = int(
int(record_set["mean_file_size"])
* int(record_set["num_files_train"])
/ (1024**3)
)

for i in range(len(record_set["records"]["local-ssd"])):
r = record_set["records"][scenario][i]

try:
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
else:
r["throughput_over_local_ssd"] = "NA"
for scenario in SUPPORTED_SCENARIOS:
if scenario not in record_set["records"]:
print(f"{scenario} not in output so skipping")
continue

except ZeroDivisionError:
print("Got ZeroDivisionError. Ignoring it.")
r["throughput_over_local_ssd"] = 0
for i in range(len(record_set["records"][scenario])):
r = record_set["records"][scenario][i]

except Exception as e:
print(
"Error: failed to parse/write record-set for"
f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}"
try:
if "local-ssd" in record_set["records"] and (
len(record_set["records"]["local-ssd"])
== len(record_set["records"][scenario])
):
r["throughput_over_local_ssd"] = round(
r["train_throughput_mb_per_second"]
/ record_set["records"]["local-ssd"][i][
"train_throughput_mb_per_second"
]
* 100,
2,
)
continue
else:
r["throughput_over_local_ssd"] = "NA"

output_file.write(
f"{record_set['mean_file_size']},{record_set['num_files_train']},{total_size},{record_set['batch_size']},{scenario},"
)
output_file.write(
f"{r['epoch']},{r['duration']},{r['train_au_percentage']},{r['train_throughput_samples_per_second']},{r['train_throughput_mb_per_second']},{r['throughput_over_local_ssd']},{r['lowest_memory']},{r['highest_memory']},{r['lowest_cpu']},{r['highest_cpu']},{r['pod_name']},{r['start']},{r['end']},\"{r['gcsfuse_mount_options']}\",{args.instance_id}\n"
except ZeroDivisionError:
print("Got ZeroDivisionError. Ignoring it.")
r["throughput_over_local_ssd"] = 0

except Exception as e:
print(
"Error: failed to parse/write record-set for"
f" scenario: {scenario}, i: {i}, record: {r}, exception: {e}"
)
continue

output_file.close()
new_row = (
record_set["mean_file_size"],
record_set["num_files_train"],
total_size,
record_set["batch_size"],
scenario,
r["epoch"],
r["duration"],
r["train_au_percentage"],
r["train_throughput_samples_per_second"],
r["train_throughput_mb_per_second"],
r["throughput_over_local_ssd"],
r["lowest_memory"],
r["highest_memory"],
r["lowest_cpu"],
r["highest_cpu"],
r["pod_name"],
r["start"],
r["end"],
f'"{r["gcsfuse_mount_options"].strip()}"', # need to wrap in quotes to encapsulate commas in the value.
args.instance_id,
)
rows.append(new_row)

export_to_csv(output_file_path=args.output_file, header=_HEADER, rows=rows)
export_to_gsheet(
output_gsheet_id=args.output_gsheet_id,
output_worksheet_name=args.output_worksheet_name,
output_gsheet_keyfile=args.output_gsheet_keyfile,
header=_HEADER,
rows=rows,
)


if __name__ == "__main__":
Expand All @@ -302,20 +345,13 @@ def writeRecordsToCsvOutputFile(output: dict, output_file_path: str):
dlioWorkloads = dlio_workload.ParseTestConfigForDlioWorkloads(
args.workload_config
)
downloadDlioOutputs(dlioWorkloads, args.instance_id)
ret = downloadDlioOutputs(dlioWorkloads, args.instance_id)
if ret != 0:
print(f"failed to download dlio outputs: {ret}")

mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

output = createOutputScenariosFromDownloadedFiles(args)

output_file_path = args.output_file
# Create the parent directory of output_file_path if doesn't
# exist already.
ensure_directory_exists(os.path.dirname(output_file_path))
writeRecordsToCsvOutputFile(output, output_file_path)
print(
"\n\nSuccessfully published outputs of DLIO test runs to"
f" {output_file_path} !!!\n\n"
)
writeOutput(output, args)
Loading