From 8ba369588dc4668ea328c5a02f370e3248afccea Mon Sep 17 00:00:00 2001 From: Kyle Vedder Date: Thu, 11 Apr 2024 16:33:47 -0400 Subject: [PATCH] Cleanup jlaunch with multi-backend support --- .gitignore | 3 +- data_prep_scripts/check_nsfp_status.py | 22 --- data_prep_scripts/compute_total_runtime.py | 58 ------ data_prep_scripts/investigate_bucket_sizes.py | 125 ------------- data_prep_scripts/investigate_pointclouds.py | 153 ---------------- data_prep_scripts/jlaunch.py | 20 +++ .../jlaunch_backends/__init__.py | 5 + data_prep_scripts/jlaunch_backends/ngc.py | 48 +++++ .../jlaunch_backends/shared_utils.py | 18 ++ data_prep_scripts/jlaunch_backends/slurm.py | 168 ++++++++++++++++++ data_prep_scripts/split_nsfp_jobs.py | 118 ------------ data_prep_scripts/split_nsfp_jobs_sbatch.py | 140 --------------- .../split_optimization_jobs_jlaunch.py | 140 +++++++++++++++ 13 files changed, 401 insertions(+), 617 deletions(-) delete mode 100644 data_prep_scripts/check_nsfp_status.py delete mode 100644 data_prep_scripts/compute_total_runtime.py delete mode 100644 data_prep_scripts/investigate_bucket_sizes.py delete mode 100644 data_prep_scripts/investigate_pointclouds.py create mode 100644 data_prep_scripts/jlaunch.py create mode 100644 data_prep_scripts/jlaunch_backends/__init__.py create mode 100644 data_prep_scripts/jlaunch_backends/ngc.py create mode 100644 data_prep_scripts/jlaunch_backends/shared_utils.py create mode 100644 data_prep_scripts/jlaunch_backends/slurm.py delete mode 100644 data_prep_scripts/split_nsfp_jobs.py delete mode 100644 data_prep_scripts/split_nsfp_jobs_sbatch.py create mode 100644 data_prep_scripts/split_optimization_jobs_jlaunch.py diff --git a/.gitignore b/.gitignore index 055d29a..0298649 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ cluster_tb_logs/ dataset_count_compute_save_dir/ checkpoint_eval_launch_dir/ screenshots/ -eval_results/ \ No newline at end of file +eval_results/ +launch_files/ \ No newline at end of file diff --git a/data_prep_scripts/check_nsfp_status.py b/data_prep_scripts/check_nsfp_status.py deleted file mode 100644 index 1d7d397..0000000 --- a/data_prep_scripts/check_nsfp_status.py +++ /dev/null @@ -1,22 +0,0 @@ -from pathlib import Path -import argparse -import numpy as np - -# Path to the NSFP flows -parser = argparse.ArgumentParser() -parser.add_argument('nsfp_flows_dir', type=Path, help='path to the NSFP flows') -parser.add_argument('expected_num_flows', - type=int, - help='expected number of flows') -args = parser.parse_args() - -assert args.nsfp_flows_dir.exists(), 'NSFP flows dir does not exist' -assert args.expected_num_flows > 0, 'Expected number of flows must be positive' - -subfolders = sorted([f for f in args.nsfp_flows_dir.glob('*') if f.is_dir()]) - -# Check that all subfolders have the expected number of flows -for subfolder in subfolders: - num_flows = len([f for f in subfolder.glob('*.npz') if f.is_file()]) - if num_flows != args.expected_num_flows: - print(subfolder.name, num_flows) diff --git a/data_prep_scripts/compute_total_runtime.py b/data_prep_scripts/compute_total_runtime.py deleted file mode 100644 index c4f15d5..0000000 --- a/data_prep_scripts/compute_total_runtime.py +++ /dev/null @@ -1,58 +0,0 @@ -import argparse -from core_utils import load_npz -from pathlib import Path -import tqdm -from multiprocessing import Pool, cpu_count -from functools import partial -import numpy as np - -parser = argparse.ArgumentParser(description='Process some folders.') -parser.add_argument('--cpus', type=int, default=cpu_count(), - help='Number of CPUs to use for multiprocessing') -parser.add_argument('--key', type=str, default="delta_time", - help='Key to use for measuring delta time') -args = parser.parse_args() - -cpus = args.cpus - - -argoverse_train_dir = Path('/efs/argoverse2/train_nsfp_flow') -argoverse_val_dir = Path('/efs/argoverse2/val_nsfp_flow') -argoverse_chodosh_train_dir = Path('/efs/argoverse2/train_chodosh_flow/') -waymo_open_train_dir = Path('/efs/waymo_open_processed_flow/train_nsfp_flow') - - -def process_file(file, key="delta_time"): - npz_data = dict(load_npz(file, verbose=False)) - return npz_data[key] - -def process_folder(path : Path): - time_list = [] - - process_function = partial(process_file, key=args.key) - if cpus <= 1: - # Use a for loop instead of multiprocessing - subfolders = path.iterdir() - file_list = [subfolder.glob("*.npz") for subfolder in subfolders] - file_list = [file for sublist in file_list for file in sublist] - for file in tqdm.tqdm(file_list): - delta_time = process_function(file) - time_list.append(delta_time) - - else: - with Pool(processes=cpus) as pool: - subfolders = path.iterdir() - file_list = [subfolder.glob("*.npz") for subfolder in subfolders] - delta_time_list = pool.map(process_function, [file for sublist in file_list for file in sublist]) - time_list.extend(delta_time_list) - return time_list - - -# waymo_train = process_folder(waymo_open_train_dir) -# print("Waymo train: ", waymo_train) -# argo_train = process_folder(argoverse_train_dir) -# print("Argo train: ", argo_train) -# argo_val = process_folder(argoverse_val_dir) -# print("Argo val: ", argo_val) -argo_chodosh_train = process_folder(argoverse_chodosh_train_dir) -print("Argo chodosh train: ", np.sum(argo_chodosh_train), np.mean(argo_chodosh_train), np.std(argo_chodosh_train)) \ No newline at end of file diff --git a/data_prep_scripts/investigate_bucket_sizes.py b/data_prep_scripts/investigate_bucket_sizes.py deleted file mode 100644 index ab8be2a..0000000 --- a/data_prep_scripts/investigate_bucket_sizes.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -# set OMP_NUM_THREADS before importing torch or numpy -os.environ["OMP_NUM_THREADS"] = "1" -import argparse -from pathlib import Path -import numpy as np -import tqdm -import joblib -import multiprocessing -from core_utils import save_pickle -import open3d as o3d -from pointclouds import PointCloud -import torch - -from dataloaders import ArgoverseSupervisedFlowSequenceLoader, ArgoverseUnsupervisedFlowSequenceLoader, WaymoSupervisedFlowSequenceLoader, WaymoUnsupervisedFlowSequenceLoader - -# cli argument to pick argo or waymo -parser = argparse.ArgumentParser() -parser.add_argument('dataset', - type=str, - help='argo or waymo', - choices=['argo', 'waymo']) -parser.add_argument('--cpus', - type=int, - help='number of cpus to use', - default=multiprocessing.cpu_count() - 1) -args = parser.parse_args() - -argoverse_data = Path("/efs/argoverse2") -waymo_data = Path("/efs/waymo_open_processed_flow") - -DYNAMIC_THRESHOLD = 0.5 - - -def get_pc_infos(seq_idx, supervised_seq, unsupervised_seq): - infos_lst = [] - max_range = min(len(supervised_seq), len(unsupervised_seq)) - for idx in range(max_range): - if idx == max_range - 1: - continue - supervised_frame = supervised_seq.load(idx, idx) - unsupervised_frame = unsupervised_seq.load(idx, idx) - - assert ( - supervised_frame['relative_pc'] == - unsupervised_frame['relative_pc'] - ), f"supervised_frame['relative_pc'] {supervised_frame['relative_pc']} != unsupervised_frame['relative_pc'] {unsupervised_frame['relative_pc']}" - - valid_indices = unsupervised_frame['valid_idxes'] - classes = supervised_frame['pc_classes'][valid_indices] - pc = supervised_frame['relative_pc'][valid_indices] - flowed_pc = supervised_frame['relative_flowed_pc'][valid_indices] - flow = flowed_pc - pc - - flow_speed_meters_per_second = np.linalg.norm(flow, - axis=1) * 10.0 - - if args.dataset == 'waymo': - BACKGROUND_ID = 0 - else: - BACKGROUND_ID = -1 - - background_count = np.sum(classes == BACKGROUND_ID) - foreground_static_count = np.sum( - (classes > BACKGROUND_ID) & (flow_speed_meters_per_second <= DYNAMIC_THRESHOLD)) - foreground_dynamic_count = np.sum( - (classes > BACKGROUND_ID) & (flow_speed_meters_per_second > DYNAMIC_THRESHOLD)) - - num_points = len(pc) - if num_points < 100: - print("Seq idx", seq_idx, "idx", idx, "num_points", num_points) - infos_lst.append({ - "background_count": background_count, - "foreground_static_count": foreground_static_count, - "foreground_dynamic_count": foreground_dynamic_count, - }) - print("Finished sequence", - seq_idx, - "with", - len(supervised_seq), - "frames", - flush=True) - return infos_lst - - -if args.dataset == 'waymo': - supervised_seq_loader = WaymoSupervisedFlowSequenceLoader(waymo_data / - "training") - unsupervised_seq_loader = WaymoUnsupervisedFlowSequenceLoader( - waymo_data / "training", waymo_data / "train_nsfp_flow") -else: - supervised_seq_loader = ArgoverseSupervisedFlowSequenceLoader( - argoverse_data / "val", argoverse_data / "val_sceneflow") - unsupervised_seq_loader = ArgoverseUnsupervisedFlowSequenceLoader( - argoverse_data / "val", argoverse_data / "val_nsfp_flow") -seq_ids = supervised_seq_loader.get_sequence_ids() - -if args.cpus > 1: - # Use joblib to parallelize the loading of the sequences - pc_infos_lst = joblib.Parallel(n_jobs=args.cpus)( - joblib.delayed(get_pc_infos)( - idx, supervised_seq_loader.load_sequence(seq_id), - unsupervised_seq_loader.load_sequence(seq_id)) - for idx, seq_id in enumerate(tqdm.tqdm(seq_ids))) - pc_infos_lst = [item for sublist in pc_infos_lst for item in sublist] -else: - pc_infos_lst = [] - for idx, seq_id in enumerate(tqdm.tqdm(seq_ids)): - supervised_seq = supervised_seq_loader.load_sequence(seq_id) - unsupervised_seq = unsupervised_seq_loader.load_sequence(seq_id) - pc_infos = get_pc_infos(idx, supervised_seq, unsupervised_seq) - pc_infos_lst.extend(pc_infos) - -save_pickle( - f"validation_results/{args.dataset}_validation_pointcloud_pc_buckets.pkl", - pc_infos_lst) - -background_counts = [e["background_count"] for e in pc_infos_lst] -foreground_static_counts = [e["foreground_static_count"] for e in pc_infos_lst] -foreground_dynamic_counts = [e["foreground_dynamic_count"] for e in pc_infos_lst] - - -print("Background Count Sum:", np.sum(background_counts), "Mean:", np.mean(background_counts), "Std:", np.std(background_counts), "Median:", np.median(background_counts)) -print("Foreground Static Count Sum:", np.sum(foreground_static_counts), "Mean:", np.mean(foreground_static_counts), "Std:", np.std(foreground_static_counts), "Median:", np.median(foreground_static_counts)) -print("Foreground Dynamic Count Sum:", np.sum(foreground_dynamic_counts), "Mean:", np.mean(foreground_dynamic_counts), "Std:", np.std(foreground_dynamic_counts), "Median:", np.median(foreground_dynamic_counts)) \ No newline at end of file diff --git a/data_prep_scripts/investigate_pointclouds.py b/data_prep_scripts/investigate_pointclouds.py deleted file mode 100644 index 5bae8ce..0000000 --- a/data_prep_scripts/investigate_pointclouds.py +++ /dev/null @@ -1,153 +0,0 @@ -import os -# set OMP_NUM_THREADS before importing torch or numpy -os.environ["OMP_NUM_THREADS"] = "1" -import argparse -from pathlib import Path -import numpy as np -import tqdm -import joblib -import multiprocessing -from core_utils import save_pickle -import open3d as o3d -from pointclouds import PointCloud -import torch - -from dataloaders import ArgoverseSupervisedFlowSequenceLoader, ArgoverseUnsupervisedFlowSequenceLoader, WaymoSupervisedFlowSequenceLoader, WaymoUnsupervisedFlowSequenceLoader - -# cli argument to pick argo or waymo -parser = argparse.ArgumentParser() -parser.add_argument('dataset', - type=str, - help='argo or waymo', - choices=['argo', 'waymo']) -parser.add_argument('--cpus', - type=int, - help='number of cpus to use', - default=multiprocessing.cpu_count() - 1) -args = parser.parse_args() - -argoverse_data = Path("/efs/argoverse2") -waymo_data = Path("/efs/waymo_open_processed_flow") - -SLOW_THRESHOLD = 0.4 -FAST_THRESHOLD = 1.0 - - -def get_pc_infos(seq_idx, supervised_seq, unsupervised_seq): - infos_lst = [] - max_range = min(len(supervised_seq), len(unsupervised_seq)) - for idx in range(max_range): - if idx == max_range - 1: - continue - supervised_frame = supervised_seq.load(idx, idx) - unsupervised_frame = unsupervised_seq.load(idx, idx) - - assert ( - supervised_frame['relative_pc'] == - unsupervised_frame['relative_pc'] - ), f"supervised_frame['relative_pc'] {supervised_frame['relative_pc']} != unsupervised_frame['relative_pc'] {unsupervised_frame['relative_pc']}" - - valid_indices = unsupervised_frame['valid_idxes'] - classes = supervised_frame['pc_classes'][valid_indices] - pc = supervised_frame['relative_pc'][valid_indices] - pseudoflow = unsupervised_frame['flow'] - if pseudoflow.ndim == 3 and pseudoflow.shape[0] == 1: - pseudoflow = pseudoflow.squeeze(0) - - assert len(valid_indices) == len( - pseudoflow - ), f"len(valid_indices) {len(valid_indices)} != len(pseudoflow) {len(pseudoflow)}" - - pseudoflow_speed_meters_per_second = np.linalg.norm(pseudoflow, - axis=1) * 10.0 - - assert len(pc) == len( - pseudoflow_speed_meters_per_second - ), f"len(pc) {len(pc)} != len(pseudoflow_speed) {len(pseudoflow_speed_meters_per_second)}" - - slow_point_count = np.sum( - pseudoflow_speed_meters_per_second <= SLOW_THRESHOLD) - medium_point_count = np.sum( - (pseudoflow_speed_meters_per_second > SLOW_THRESHOLD) - & (pseudoflow_speed_meters_per_second < FAST_THRESHOLD)) - fast_point_count = np.sum( - pseudoflow_speed_meters_per_second >= FAST_THRESHOLD) - - foreground_count = np.sum(classes >= 0) - background_count = np.sum(classes == -1) - assert foreground_count + background_count == len( - pc - ), f"foreground_count {foreground_count} + background_count {background_count} != len(pc) {len(pc)}" - - num_points = len(pc) - if num_points < 100: - print("Seq idx", seq_idx, "idx", idx, "num_points", num_points) - infos_lst.append({ - "num_points": num_points, - "foreground_count": foreground_count, - "background_count": background_count, - "slow_point_count": slow_point_count, - "medium_point_count": medium_point_count, - "fast_point_count": fast_point_count, - }) - print("Finished sequence", - seq_idx, - "with", - len(supervised_seq), - "frames", - flush=True) - return infos_lst - - -if args.dataset == 'waymo': - supervised_seq_loader = WaymoSupervisedFlowSequenceLoader(waymo_data / - "training") - unsupervised_seq_loader = WaymoUnsupervisedFlowSequenceLoader( - waymo_data / "training", waymo_data / "train_nsfp_flow") -else: - supervised_seq_loader = ArgoverseSupervisedFlowSequenceLoader( - argoverse_data / "val", argoverse_data / "val_sceneflow") - unsupervised_seq_loader = ArgoverseUnsupervisedFlowSequenceLoader( - argoverse_data / "val", argoverse_data / "val_nsfp_flow") -seq_ids = supervised_seq_loader.get_sequence_ids() - -if args.cpus > 1: - # Use joblib to parallelize the loading of the sequences - pc_infos_lst = joblib.Parallel(n_jobs=args.cpus)( - joblib.delayed(get_pc_infos)( - idx, supervised_seq_loader.load_sequence(seq_id), - unsupervised_seq_loader.load_sequence(seq_id)) - for idx, seq_id in enumerate(seq_ids)) - pc_infos_lst = [item for sublist in pc_infos_lst for item in sublist] -else: - pc_infos_lst = [] - for idx, seq_id in enumerate(tqdm.tqdm(seq_ids)): - supervised_seq = supervised_seq_loader.load_sequence(seq_id) - unsupervised_seq = unsupervised_seq_loader.load_sequence(seq_id) - pc_infos = get_pc_infos(idx, supervised_seq, unsupervised_seq) - pc_infos_lst.extend(pc_infos) - -save_pickle( - f"validation_results/{args.dataset}_validation_pointcloud_pc_infos.pkl", - pc_infos_lst) - -pc_sizes_lst = [e["num_points"] for e in pc_infos_lst] -foreground_counts = [e["foreground_count"] for e in pc_infos_lst] -background_counts = [e["background_count"] for e in pc_infos_lst] - -slow_counts = [e["slow_point_count"] for e in pc_infos_lst] -medium_counts = [e["medium_point_count"] for e in pc_infos_lst] -fast_counts = [e["fast_point_count"] for e in pc_infos_lst] - -print("PC Point Count Mean:", np.mean(pc_sizes_lst), "Std:", - np.std(pc_sizes_lst), "Median:", np.median(pc_sizes_lst)) -print("Foreground Mean:", np.mean(foreground_counts), "Std:", - np.std(foreground_counts), "Median:", np.median(foreground_counts)) -print("Background Mean:", np.mean(background_counts), "Std:", - np.std(background_counts), "Median:", np.median(background_counts)) -print("Slow Mean:", np.mean(slow_counts), "Std:", np.std(slow_counts), - "Median:", np.median(slow_counts)) -print("Medium Mean:", np.mean(medium_counts), "Std:", np.std(medium_counts), - "Median:", np.median(medium_counts)) -print("Fast Mean:", np.mean(fast_counts), "Std:", np.std(fast_counts), - "Median:", np.median(fast_counts)) diff --git a/data_prep_scripts/jlaunch.py b/data_prep_scripts/jlaunch.py new file mode 100644 index 0000000..5e1f159 --- /dev/null +++ b/data_prep_scripts/jlaunch.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 +import sys + +sys.path.insert(0, ".") + +import click + +from data_prep_scripts.jlaunch_backends import backends_list + + +@click.group() +def cli(): + pass + + +for backend in backends_list: + cli.add_command(backend) + +if __name__ == "__main__": + cli() diff --git a/data_prep_scripts/jlaunch_backends/__init__.py b/data_prep_scripts/jlaunch_backends/__init__.py new file mode 100644 index 0000000..0f80d7b --- /dev/null +++ b/data_prep_scripts/jlaunch_backends/__init__.py @@ -0,0 +1,5 @@ +from .slurm import backend_slurm +from .ngc import backend_ngc +from .shared_utils import run_cmd + +backends_list = [backend_slurm, backend_ngc] diff --git a/data_prep_scripts/jlaunch_backends/ngc.py b/data_prep_scripts/jlaunch_backends/ngc.py new file mode 100644 index 0000000..1d265ec --- /dev/null +++ b/data_prep_scripts/jlaunch_backends/ngc.py @@ -0,0 +1,48 @@ +import click +from .shared_utils import run_cmd + + +@click.command() +@click.argument("command", type=str) +@click.option("--instance", type=str, default="dgx1v.16g.1.norm", help="Instance type") +@click.option("--team", type=str, required=True, help="Team name") +@click.option("--name", type=str, required=True, help="Name of the environment") +@click.option( + "--image", + type=str, + required=True, + help="Docker image to use", +) +@click.option( + "--result", + type=click.Path(exists=False, file_okay=True, dir_okay=True, writable=True), + default="/result", + help="Path to store results (e.g., /result)", +) +@click.option( + "--workspace", + type=(str, str), + multiple=True, + help="Workspace mount points. Can be used multiple times (e.g., user:host_path)", +) +def backend_ngc( + command: str, + instance: str, + team: str, + name: str, + image: str, + result: str, + workspace: list[tuple[str, str]], +): + """Launches a development environment with the specified options.""" + + command_content = f"""ngc batch run --instance {instance} --team {team} --name {name} --image {image} --result {result}""" + for user, host_path in workspace: + command_content += f" --workspace {user}:{host_path}" + command_content += f' --command "{command}"' + + print(command_content) + + +if __name__ == "__main__": + backend_ngc() diff --git a/data_prep_scripts/jlaunch_backends/shared_utils.py b/data_prep_scripts/jlaunch_backends/shared_utils.py new file mode 100644 index 0000000..f4cd7e5 --- /dev/null +++ b/data_prep_scripts/jlaunch_backends/shared_utils.py @@ -0,0 +1,18 @@ +import subprocess + + +def run_cmd(cmd: str, return_stdout: bool = False): + res = subprocess.run(cmd, shell=True, encoding="utf-8", capture_output=True) + if res.returncode != 0: + print(f"Error running command: {cmd}") + print(res.stdout) + print(res.stderr) + else: + lines = res.stdout.strip().splitlines() + if len(lines) > 0: + print(lines[0].strip()) + print("...") + print(lines[-1].strip()) + assert res.returncode == 0, f"Command {cmd} failed with return code {res.returncode}" + if return_stdout: + return res.stdout diff --git a/data_prep_scripts/jlaunch_backends/slurm.py b/data_prep_scripts/jlaunch_backends/slurm.py new file mode 100644 index 0000000..588d445 --- /dev/null +++ b/data_prep_scripts/jlaunch_backends/slurm.py @@ -0,0 +1,168 @@ +import click +from pathlib import Path +from .shared_utils import run_cmd + + +@click.command(help="SLURM job submission with configuration options") +@click.argument("command", type=str) +@click.option( + "--job_dir", + type=click.Path(exists=True, file_okay=False, dir_okay=True), + default="./job_dir/", + help="Directory to store job-related files", +) +@click.option("--num_gpus", type=int, default=1, help="Number of GPUs to request") +@click.option("--cpus_per_gpu", type=int, default=2, help="CPUs to allocate per GPU") +@click.option("--mem_per_gpu", type=int, default=12, help="Memory (in GB) to allocate per GPU") +@click.option("--runtime_mins", type=int, default=180, help="Job runtime in minutes") +@click.option( + "--runtime_hours", + type=int, + default=None, + help="Job runtime in hours (overrides --runtime_mins)", +) +@click.option("--name", type=str, default="slurm", help="Name for the job") +@click.option("--qos", type=str, default="ee-med", help="Quality of service (QoS) for the job") +@click.option("--partition", type=str, default="eaton-compute", help="Partition to target") +@click.option("--dry_run", is_flag=True, help="Do not submit the job, only print the configuration") +@click.option( + "--blacklist_substring", + type=str, + default=None, + help="Filter out nodes containing this substring", +) +def backend_slurm( + command: str, + job_dir: Path, + num_gpus: int, + cpus_per_gpu: int, + mem_per_gpu: int, + runtime_mins: int, + runtime_hours: int, + name: str, + qos: str, + partition: str, + dry_run: bool, + blacklist_substring: str, +): + job_dir = Path(job_dir).absolute() + num_prior_jobs = len(list([e for e in job_dir.iterdir() if e.is_dir()])) + jobdir_path = job_dir / f"{num_prior_jobs:06d}" + jobdir_path.mkdir(exist_ok=True, parents=True) + job_runtime_mins = runtime_mins if runtime_hours is None else runtime_hours * 60 + + node_blacklist = get_node_blacklist(blacklist_substring) + make_command_file(jobdir_path, command) + make_sbatch( + jobdir_path, + name, + qos, + partition, + job_runtime_mins, + num_gpus, + mem_per_gpu, + cpus_per_gpu, + node_blacklist, + ) + if dry_run: + return + print("RUN THIS COMMAND TO SUBMIT THE JOB:") + print("|") + print("|") + print("|") + print("|") + print("|") + print("V") + print(f"sbatch {jobdir_path}/sbatch.bash") + print("^") + print("|") + print("|") + print("|") + print("|") + print("|") + + print(f"Config files written to {jobdir_path.absolute()}") + + +def load_available_nodes(): + res = run_cmd("sinfo --Node | awk '{print $1}' | tail +2", return_stdout=True) + available_nodes = res.split("\n") + return [e.strip() for e in available_nodes] + + +def get_node_blacklist(blacklist_substring: str): # Type hint added + if blacklist_substring is None: + return [] + + node_list = load_available_nodes() + node_blacklist_list = [] + if "," in blacklist_substring: + node_blacklist_list = [e.strip() for e in blacklist_substring.split(",")] + else: + node_blacklist_list = [blacklist_substring] + + print(f"Blacklisting nodes with substrings {node_blacklist_list}") + print(f"Available nodes: {node_list}") + + def is_blacklisted(node): + for blacklist_str in node_blacklist_list: + if blacklist_str in node: + return True + return False + + node_blacklist = [node for node in node_list if is_blacklisted(node)] + print(f"Blacklisted nodes: {node_blacklist}") + return node_blacklist + + +def get_runtime_format(runtime_mins: int): + hours = runtime_mins // 60 + minutes = runtime_mins % 60 + return f"{hours:02d}:{minutes:02d}:00" + + +def make_command_file(jobdir_path: Path, command: str): + command_path = jobdir_path / f"command.sh" + command_file_content = f"""#!/bin/bash +{command} +""" + with open(command_path, "w") as f: + f.write(command_file_content) + + +def make_sbatch( + jobdir_path: Path, + job_name: str, + qos: str, + partition: str, + job_runtime_mins: int, + num_gpus: int, + mem_per_gpu: int, + cpus_per_gpu: int, + node_blacklist: list[str], +): + current_working_dir = Path.cwd().absolute() + sbatch_path = jobdir_path / f"sbatch.bash" + docker_image_path = Path("kylevedder_offline_sceneflow_latest.sqsh").absolute() + assert ( + docker_image_path.is_file() + ), f"Docker image {docker_image_path} squash file does not exist" + sbatch_file_content = f"""#!/bin/bash +#SBATCH --job-name={job_name} +#SBATCH --qos={qos} +#SBATCH --partition={partition} +#SBATCH --nodes=1 +#SBATCH --output={jobdir_path}/job.out +#SBATCH --error={jobdir_path}/job.err +#SBATCH --time={get_runtime_format(job_runtime_mins)} +#SBATCH --gpus={num_gpus} +#SBATCH --mem-per-gpu={mem_per_gpu}G +#SBATCH --cpus-per-gpu={cpus_per_gpu} +#SBATCH --exclude={','.join(node_blacklist)} +#SBATCH --container-mounts=../../datasets/:/efs/,{current_working_dir}:/project +#SBATCH --container-image={docker_image_path} + +bash {jobdir_path}/command.sh && echo 'done' > {jobdir_path}/job.done +""" + with open(sbatch_path, "w") as f: + f.write(sbatch_file_content) diff --git a/data_prep_scripts/split_nsfp_jobs.py b/data_prep_scripts/split_nsfp_jobs.py deleted file mode 100644 index 8311528..0000000 --- a/data_prep_scripts/split_nsfp_jobs.py +++ /dev/null @@ -1,118 +0,0 @@ -import argparse -from pathlib import Path -import math -import shutil - -# Get path to argoverse lidar dataset and number of sequences per job -parser = argparse.ArgumentParser() -parser.add_argument('lidar_path', type=Path) -parser.add_argument('sequences_per_job', type=int) -parser.add_argument('base_config', type=Path) -parser.add_argument('--reset_config_dir', action='store_true') -parser.add_argument('--configs_path', - type=Path, - default=Path("./nsfp_split_configs")) -parser.add_argument('--runtime_mins', type=int, default=180) -parser.add_argument('--job_prefix', type=str, default='np') -args = parser.parse_args() - -assert args.lidar_path.is_dir(), f"Path {args.lidar_path} is not a directory" -assert args.sequences_per_job > 0, f"Number of sequences per job must be positive" -assert args.base_config.is_file( -), f"Config file {args.base_config} does not exist" - -assert args.runtime_mins > 0, f"Runtime must be positive" - -sequence_folders = sorted([c for c in args.lidar_path.glob("*") if c.is_dir()], - key=lambda x: x.name.lower()) -num_jobs = math.ceil(len(sequence_folders) / args.sequences_per_job) - -print(f"Splitting {len(sequence_folders)} sequences into {num_jobs} jobs") - -job_sequence_names_lst = [] -for i in range(num_jobs): - start = i * args.sequences_per_job - end = min(start + args.sequences_per_job, len(sequence_folders)) - job_sequence_folders = sequence_folders[start:end] - job_sequence_names = [f.name for f in job_sequence_folders] - job_sequence_names_lst.append(job_sequence_names) - -sequence_names_set = set(f for seqs in job_sequence_names_lst for f in seqs) -assert len(sequence_names_set) == len( - sequence_folders), "Some sequences are missing from jobs" - -configs_path = args.configs_path -if args.reset_config_dir: - if configs_path.exists(): - shutil.rmtree(configs_path) - configs_path.mkdir(exist_ok=False) -else: - configs_path.mkdir(exist_ok=True, parents=True) - - -def get_runtime_format(runtime_mins): - hours = runtime_mins // 60 - minutes = runtime_mins % 60 - return f"{hours:02d}:{minutes:02d}:00" - - -def make_config(i, job_sequence_names): - config_path = configs_path / f"nsfp_split_{i:06d}.py" - config_file_content = f""" -_base_ = '../{args.base_config}' -test_loader = dict(args=dict(log_subset={job_sequence_names})) -""" - with open(config_path, "w") as f: - f.write(config_file_content) - - -def make_srun(i): - srun_path = configs_path / f"srun_{i:06d}.sh" - docker_image_path = Path( - "kylevedder_offline_sceneflow_latest.sqsh").absolute() - assert docker_image_path.is_file( - ), f"Docker image {docker_image_path} squash file does not exist" - srun_file_content = f"""#!/bin/bash -srun --gpus=1 --mem-per-gpu=12G --cpus-per-gpu=2 --time={get_runtime_format(args.runtime_mins)} --exclude=kd-2080ti-2.grasp.maas --job-name={args.job_prefix}{i:06d} --container-mounts=../../datasets/:/efs/,`pwd`:/project --container-image={docker_image_path} bash -c "python test_pl.py {configs_path}/nsfp_split_{i:06d}.py; echo 'done' > {configs_path}/nsfp_{i:06d}.done" -""" - with open(srun_path, "w") as f: - f.write(srun_file_content) - - -def make_screen(i): - screen_path = configs_path / f"screen_{i:06d}.sh" - screen_file_content = f"""#!/bin/bash -rm -f {configs_path}/nsfp_{i:06d}.out; - screen -L -Logfile {configs_path}/nsfp_{i:06d}.out -dmS nsfp_{i:06d} bash {configs_path}/srun_{i:06d}.sh -""" - with open(screen_path, "w") as f: - f.write(screen_file_content) - - -def make_runall(): - runall_path = configs_path / f"runall.sh" - runall_file_content = f"""#!/bin/bash -for i in {configs_path / "screen_*.sh"}; do - DONEFILE=nsfp_split_configs/nsfp_${{i:26:6}}.done - if [ -f $DONEFILE ]; then - echo "Skipping already completed0: $i" - continue - fi - echo "Launching: $i" - bash $i -done -""" - with open(runall_path, "w") as f: - f.write(runall_file_content) - - -for i, job_sequence_names in enumerate(job_sequence_names_lst): - make_config(i, job_sequence_names) - make_srun(i) - make_screen(i) - if i % 100 == 0: - print(f"Made configs for {i} jobs") - -make_runall() - -print(f"Config files written to {configs_path.absolute()}") diff --git a/data_prep_scripts/split_nsfp_jobs_sbatch.py b/data_prep_scripts/split_nsfp_jobs_sbatch.py deleted file mode 100644 index 9b019a2..0000000 --- a/data_prep_scripts/split_nsfp_jobs_sbatch.py +++ /dev/null @@ -1,140 +0,0 @@ -import argparse -from pathlib import Path -import math -import shutil - -# Get path to argoverse lidar dataset and number of sequences per job -parser = argparse.ArgumentParser() -parser.add_argument('lidar_path', type=Path) -parser.add_argument('sequences_per_job', type=int) -parser.add_argument('base_config', type=Path) -parser.add_argument('--reset_config_dir', action='store_true') -parser.add_argument('--configs_path', - type=Path, - default=Path("./nsfp_split_configs")) -parser.add_argument('--runtime_mins', type=int, default=180) -parser.add_argument('--job_prefix', type=str, default='nsfp') -parser.add_argument('--elevated', action='store_true') -parser.add_argument('--job_subset_file', type=Path, default=None) -args = parser.parse_args() - -assert args.lidar_path.is_dir(), f"Path {args.lidar_path} is not a directory" -assert args.sequences_per_job > 0, f"Number of sequences per job must be positive" -assert args.base_config.is_file( -), f"Config file {args.base_config} does not exist" - -assert args.runtime_mins > 0, f"Runtime must be positive" - -sequence_folders = sorted([c for c in args.lidar_path.glob("*") if c.is_dir()], - key=lambda x: x.name.lower()) -num_jobs = math.ceil(len(sequence_folders) / args.sequences_per_job) - -print(f"Splitting {len(sequence_folders)} sequences into {num_jobs} jobs") - -job_sequence_names_lst = [] -for i in range(num_jobs): - start = i * args.sequences_per_job - end = min(start + args.sequences_per_job, len(sequence_folders)) - job_sequence_folders = sequence_folders[start:end] - job_sequence_names = [f.name for f in job_sequence_folders] - job_sequence_names_lst.append(job_sequence_names) - -if args.job_subset_file is not None: - assert args.job_subset_file.is_file( - ), f"Job subset file {args.job_subset_file} does not exist" - print(f"Using job subset file {args.job_subset_file} to filter jobs") - - def parse_line(line: str) -> str: - if ' ' not in line: - return line.strip() - # Get the first column - return line.split(' ')[0].strip() - - with open(args.job_subset_file, "r") as f: - valid_jobs = [parse_line(l) for l in f.readlines()] - valid_jobs = set(valid_jobs) - - # Filter out jobs that are not in the subset - job_sequence_names_lst = [[job for job in sequence if job in valid_jobs] - for sequence in job_sequence_names_lst] - - # Filter out empty sequences - job_sequence_names_lst = [ - sequence for sequence in job_sequence_names_lst if len(sequence) > 0 - ] - - print(f"Filtered to {len(job_sequence_names_lst)} jobs") - -# sequence_names_set = set(f for seqs in job_sequence_names_lst for f in seqs) -# assert len(sequence_names_set) == len( -# sequence_folders), "Some sequences are missing from jobs" - -configs_path = args.configs_path -if args.reset_config_dir: - if configs_path.exists(): - shutil.rmtree(configs_path) - configs_path.mkdir(exist_ok=False) -else: - configs_path.mkdir(exist_ok=True, parents=True) - - -def get_runtime_format(runtime_mins): - hours = runtime_mins // 60 - minutes = runtime_mins % 60 - return f"{hours:02d}:{minutes:02d}:00" - - -def make_config(i, job_sequence_names): - config_path = configs_path / f"nsfp_split_{i:06d}.py" - config_file_content = f""" -_base_ = '../{args.base_config}' -test_loader = dict(args=dict(log_subset={job_sequence_names})) -""" - with open(config_path, "w") as f: - f.write(config_file_content) - - -def make_sbatch(): - current_working_dir = Path.cwd().absolute() - sbatch_path = configs_path / f"sbatch.bash" - if args.elevated: - qos = "ee-med" - partition = "eaton-compute" - else: - qos = "batch" - partition = "batch" - sbatch_file_content = f"""#!/bin/bash -#SBATCH --job-name={args.job_prefix} -#SBATCH --output={configs_path}/nsfp_%a.out -#SBATCH --error={configs_path}/nsfp_%a.err -#SBATCH --time={get_runtime_format(args.runtime_mins)}""" - if args.elevated: - sbatch_file_content += f""" -#SBATCH --qos={qos} -#SBATCH --partition={partition}""" - sbatch_file_content += f""" -#SBATCH --gpus=1 -#SBATCH --mem-per-gpu=12G -#SBATCH --cpus-per-gpu=2 -#SBATCH --exclude=kd-2080ti-2.grasp.maas -#SBATCH --array=0-{len(job_sequence_names_lst) - 1} -#SBATCH --container-mounts=../../datasets/:/efs/,{current_working_dir}:/project -#SBATCH --container-image={current_working_dir / "kylevedder_offline_sceneflow_latest.sqsh"} - -echo "Running job $SLURM_ARRAY_TASK_ID on $HOSTNAME" -export MY_RUN_ID=$(printf "%06d" $SLURM_ARRAY_TASK_ID) -python test_pl.py {configs_path}/nsfp_split_$MY_RUN_ID.py; echo 'done' > {configs_path}/nsfp_$MY_RUN_ID.done -""" - with open(sbatch_path, "w") as f: - f.write(sbatch_file_content) - - -for i, job_sequence_names in enumerate(job_sequence_names_lst): - make_config(i, job_sequence_names) - if i % 100 == 0: - print(f"Made configs for {i} jobs") -print(f"Made total of {len(job_sequence_names_lst)} jobs") - -make_sbatch() - -print(f"Config files written to {configs_path.absolute()}") diff --git a/data_prep_scripts/split_optimization_jobs_jlaunch.py b/data_prep_scripts/split_optimization_jobs_jlaunch.py new file mode 100644 index 0000000..7196d11 --- /dev/null +++ b/data_prep_scripts/split_optimization_jobs_jlaunch.py @@ -0,0 +1,140 @@ +import argparse +from pathlib import Path +import math +import shutil + + +def build_config( + job_idx: int, num_jobs: int, base_config: Path, config_file_save_path: Path +) -> None: + assert base_config.is_file(), f"Config file {base_config} does not exist" + assert job_idx >= 0, f"Job index must be non-negative" + assert num_jobs > 0, f"Number of jobs must be positive" + assert job_idx < num_jobs, f"Job index must be less than number of jobs" + + custom_config_content = f"""_base_ = "{base_config}" +test_dataset = dict(args=dict(split=dict(split_idx={job_idx}, num_splits={num_jobs}))) +""" + with open(config_file_save_path, "w") as f: + f.write(custom_config_content) + + +def build_jlaunch( + job_config: Path, + jlaunch_save_path: Path, + backend: str, + job_name: str, + jlaunch_args: list[str], +): + assert job_config.is_file(), f"Config file {job_config} does not exist" + + jlaunch_content = f"""#!/bin/bash +python data_prep_scripts/jlaunch.py backend-{backend} "python test_pl.py {job_config}" --name {job_name} {' '.join(jlaunch_args)} +""" + + with open(jlaunch_save_path, "w") as f: + f.write(jlaunch_content) + + +def build_split( + idx: int, + num_jobs: int, + base_config: Path, + launch_files_dir: Path, + backend: str, + job_name: str, + jlaunch_args: list[str], +) -> Path: + """ + Build the jlaunch file for a single job + """ + + job_dir = launch_files_dir / f"job_{idx:06d}" + + # Remove the job directory if it already exists + if job_dir.exists(): + shutil.rmtree(job_dir) + + job_dir.mkdir(exist_ok=True, parents=True) + + job_config = job_dir / f"config.py" + build_config(idx, num_jobs, base_config, job_config) + jlaunch_config = job_dir / "jlaunch.sh" + build_jlaunch(job_config, jlaunch_config, backend, job_name, jlaunch_args) + + return jlaunch_config + + +def build_launch_all(jlaunches: list[Path], launch_files_dir: Path) -> Path: + """ + Write a bash script to launch all jobs + """ + + launch_all_path = launch_files_dir / "launch_all.sh" + + launch_all_content = f"""#!/bin/bash +""" + for jlaunch in jlaunches: + assert jlaunch.is_file(), f"Jlaunch file {jlaunch} does not exist" + launch_all_content += f"bash {jlaunch}\n" + + # Remove the launch all file if it already exists + if launch_all_path.exists(): + launch_all_path.unlink() + with open(launch_all_path, "w") as f: + f.write(launch_all_content) + + return launch_all_path + + +def build_splits( + base_config: Path, + num_jobs: int, + launch_files_dir: Path, + backend: str, + base_name: str, + jlaunch_args: list[str], +): + assert base_config.is_file(), f"Config file {base_config} does not exist" + assert num_jobs > 0, f"Number of jobs must be positive" + + # Create a directory to store the jlaunch files + launch_files_dir.mkdir(exist_ok=True, parents=True) + + # Create a jlaunch file for each job + jlaunch_files = [ + build_split( + idx, + num_jobs, + base_config, + launch_files_dir, + backend, + f"{base_name}_{idx:06d}", + jlaunch_args, + ) + for idx in range(num_jobs) + ] + + launch_all = build_launch_all(jlaunch_files, launch_files_dir) + print(f"Launch all script written to {launch_all}") + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument("base_config", type=Path) + parser.add_argument("num_jobs", type=int) + parser.add_argument("launch_files_dir", type=Path) + parser.add_argument("backend", type=str, choices=["slurm", "ngc"]) + parser.add_argument("base_name", type=str) + parser.add_argument("jlaunch_args", nargs=argparse.REMAINDER) + args = parser.parse_args() + jlaunch_args = list(args.jlaunch_args) + build_splits( + args.base_config, + args.num_jobs, + args.launch_files_dir, + args.backend, + args.base_name, + jlaunch_args, + )