diff --git a/pyproject.toml b/pyproject.toml index d25e846269..d896789bba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,9 @@ target-version = "py38" # Allow imports relative to the "src" and "tests" directories. src = ["src", "tests"] +[tool.ruff.isort] +known-third-party = ["wandb", "comet_ml"] + [tool.ruff.mccabe] # Unlike Flake8, default to a complexity level of 10. max-complexity = 10 diff --git a/requirements/loggers.txt b/requirements/loggers.txt index f6a4157d0c..c84a876f06 100644 --- a/requirements/loggers.txt +++ b/requirements/loggers.txt @@ -1,6 +1,6 @@ comet-ml>=3.31.7 gradio>=2.9.4 tensorboard -wandb==0.12.17 +wandb>=0.13.0 GitPython ipykernel diff --git a/src/anomalib/utils/sweep/__init__.py b/src/anomalib/utils/sweep/__init__.py index 36493a3988..a32a24e50b 100644 --- a/src/anomalib/utils/sweep/__init__.py +++ b/src/anomalib/utils/sweep/__init__.py @@ -5,12 +5,16 @@ from .config import flatten_sweep_params, get_run_config, set_in_nested_config from .helpers import get_openvino_throughput, get_sweep_callbacks, get_torch_throughput +from .utils import Status, exception_wrapper, redirect_output __all__ = [ + "exception_wrapper", + "flatten_sweep_params", "get_run_config", - "set_in_nested_config", "get_sweep_callbacks", "get_openvino_throughput", "get_torch_throughput", - "flatten_sweep_params", + "set_in_nested_config", + "redirect_output", + "Status", ] diff --git a/src/anomalib/utils/sweep/utils.py b/src/anomalib/utils/sweep/utils.py new file mode 100644 index 0000000000..56a0bb9ccc --- /dev/null +++ b/src/anomalib/utils/sweep/utils.py @@ -0,0 +1,105 @@ +"""Additional utils for sweep.""" + +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +import functools +import io +import logging +import sys +from enum import Enum +from typing import Any, Callable + +logger = logging.getLogger(__name__) + + +def redirect_output(func: Callable) -> Callable[..., dict[str, Any]]: + """Decorator to redirect output of the function. + + Args: + func (function): Hides output of this function. + + Raises: + Exception: Incase the execution of function fails, it raises an exception. + + Returns: + object of the called function + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs) -> dict[str, Any]: + std_out = sys.stdout + sys.stdout = buf = io.StringIO() + try: + value = func(*args, **kwargs) + logger.info(buf.getvalue()) + logger.info(value) + except Exception as exp: + logger.exception( + "Error occurred while computing benchmark %s. Buffer: %s." "\n Method %s, args %s, kwargs %s", + exp, + buf.getvalue(), + func, + args, + kwargs, + ) + value = {} + sys.stdout = std_out + return value + + return wrapper + + +class Status(str, Enum): + """Status of the benchmarking run.""" + + SUCCESS = "success" + FAILED = "failed" + + +class Result: + def __init__(self, value: Any, status=Status.SUCCESS): + self.value = value + self.status = status + + def __bool__(self): + return self.status == Status.SUCCESS + + +def exception_wrapper(func: Callable) -> Callable[..., Result]: + """Wrapper method to handle exceptions. + + Args: + func (function): Function to be wrapped. + + Raises: + Exception: Incase the execution of function fails, it raises an exception. + + Example: + >>> @exception_wrapper + ... def func(): + ... raise Exception("Exception occurred") + >>> func() + Exception: Exception occurred + + Returns: + object of the called function + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs) -> Result: + try: + value = Result(value=func(*args, **kwargs)) + except Exception as exp: + logger.exception( + "Error occurred while computing benchmark %s. Method %s, args %s, kwargs %s", + exp, + func, + args, + kwargs, + ) + value = Result(False, Status.FAILED) + return value + + return wrapper diff --git a/tools/benchmarking/benchmark.py b/tools/benchmarking/benchmark.py index d55f6c9eaf..775232c9bd 100644 --- a/tools/benchmarking/benchmark.py +++ b/tools/benchmarking/benchmark.py @@ -1,87 +1,84 @@ """Benchmark all the algorithms in the repo.""" -# Copyright (C) 2022 Intel Corporation +# Copyright (C) 2023 Intel Corporation # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -import functools -import io import logging +from pathlib import Path + +# E402 Module level import not at top of file. Disabled as we need to redirect all outputs during the runs. +# ruff: noqa: E402 +# pylint: disable=wrong-import-position + +# File cannot be unique because if we create a unique name based on time, +# each process will create a new file +log_file = "runs/benchmark.log" +Path(log_file).parent.mkdir(exist_ok=True, parents=True) +logger_file_handler = logging.FileHandler(log_file) +logger_file_handler.setLevel(logging.INFO) + +# Redirect warnings and logs to file that are generated while importing +logging.captureWarnings(True) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.addHandler(logger_file_handler) + +# End of warnings capture | Rest of the imports follow + import math import multiprocessing -import sys import time -import warnings from argparse import ArgumentParser -from concurrent.futures import ProcessPoolExecutor, as_completed +from concurrent.futures import ProcessPoolExecutor from datetime import datetime -from pathlib import Path +from multiprocessing.managers import DictProxy from tempfile import TemporaryDirectory -from typing import cast +from typing import Any, cast import torch +import wandb from omegaconf import DictConfig, ListConfig, OmegaConf from pytorch_lightning import Trainer, seed_everything -from utils import upload_to_comet, upload_to_wandb, write_metrics +from rich import print # pylint: disable=W0622 | disable redefine print warning +from rich.console import Console +from rich.progress import Progress, TaskID +from rich.table import Table +from utils import write_metrics from anomalib.config import get_configurable_parameters, update_input_size_config from anomalib.data import get_datamodule from anomalib.deploy import export from anomalib.deploy.export import ExportMode from anomalib.models import get_model -from anomalib.utils.loggers import configure_logger from anomalib.utils.sweep import ( - get_openvino_throughput, + exception_wrapper, get_run_config, get_sweep_callbacks, - get_torch_throughput, + redirect_output, set_in_nested_config, ) -warnings.filterwarnings("ignore") - -logger = logging.getLogger(__name__) -configure_logger() -pl_logger = logging.getLogger(__file__) -for logger_name in ["pytorch_lightning", "torchmetrics", "os"]: - logging.getLogger(logger_name).setLevel(logging.ERROR) - +# TODO add torch and openvino throughputs. -def hide_output(func): - """Decorator to hide output of the function. +# Redirect future warnings and logs to file from all the imports +for name in logging.root.manager.loggerDict: + for filter_keys in ["lightning", "anomalib", "wandb", "comet", "py.warnings"]: + if filter_keys in name: + _logger = logging.getLogger(name) + _logger.setLevel(logging.WARNING) + _logger.handlers = [] + _logger.addHandler(logger_file_handler) - Args: - func (function): Hides output of this function. - - Raises: - Exception: Incase the execution of function fails, it raises an exception. - - Returns: - object of the called function - """ - - @functools.wraps(func) - def wrapper(*args, **kwargs): - std_out = sys.stdout - sys.stdout = buf = io.StringIO() - try: - value = func(*args, **kwargs) - except Exception as exp: - raise Exception(buf.getvalue()) from exp - sys.stdout = std_out - return value - return wrapper - - -@hide_output -def get_single_model_metrics(model_config: DictConfig | ListConfig, openvino_metrics: bool = False) -> dict: +@redirect_output +def get_single_model_metrics(model_config: DictConfig | ListConfig) -> dict[str, Any]: """Collects metrics for `model_name` and returns a dict of results. Args: model_config (DictConfig, ListConfig): Configuration for run - openvino_metrics (bool): If True, converts the model to OpenVINO format and gathers inference metrics. Returns: dict: Collection of all the metrics such as time taken, throughput and performance scores. @@ -89,6 +86,8 @@ def get_single_model_metrics(model_config: DictConfig | ListConfig, openvino_met with TemporaryDirectory() as project_path: model_config.project.path = project_path + model_config.trainer.enable_progress_bar = False + model_config.trainer.enable_model_summary = False datamodule = get_datamodule(model_config) model = get_model(model_config) @@ -121,210 +120,218 @@ def get_single_model_metrics(model_config: DictConfig | ListConfig, openvino_met export_root=project_path, ) - throughput = get_torch_throughput( - model_path=project_path, - test_dataset=datamodule.test_dataloader().dataset, - device=model_config.trainer.accelerator, - ) - - # Get OpenVINO metrics - openvino_throughput = float("nan") - if openvino_metrics: - # Create dirs for openvino model export - export( - task=model_config.dataset.task, - transform=trainer.datamodule.test_data.transform.to_dict(), - input_size=model_config.model.input_size, - model=model, - export_mode=ExportMode.OPENVINO, - export_root=project_path, - ) - openvino_throughput = get_openvino_throughput(model_path=project_path, test_dataset=datamodule.test_data) - # arrange the data data = { "Training Time (s)": training_time, "Testing Time (s)": testing_time, - f"Inference Throughput {model_config.trainer.accelerator} (fps)": throughput, - "OpenVINO Inference Throughput (fps)": openvino_throughput, } for key, val in test_results[0].items(): data[key] = float(val) - return data -def compute_on_cpu(sweep_config: DictConfig | ListConfig, folder: str | None = None): - """Compute all run configurations over a sigle CPU.""" - for run_config in get_run_config(sweep_config.grid_search): - model_metrics = sweep(run_config, 0, sweep_config.seed, False) - write_metrics(model_metrics, sweep_config.writer, folder) - - -def compute_on_gpu( - run_configs: list[DictConfig], - device: int, - seed: int, - writers: list[str], - folder: str | None = None, - compute_openvino: bool = False, -): - """Go over each run config and collect the result. +class Benchmark: + """Benchmarking runner Args: - run_configs (DictConfig | ListConfig): List of run configurations. - device (int): The GPU id used for running the sweep. - seed (int): Fix a seed. - writers (list[str]): Destinations to write to. - folder (optional, str): Sub-directory to which runs are written to. Defaults to None. If none writes to root. - compute_openvino (bool, optional): Compute OpenVINO throughput. Defaults to False. + config: (DictConfig | ListConfig): Sweep configuration. + n_gpus: (int): Number of devices to run the benchmark on. If n_gpus is 0, benchmarking is run on all available + GPUs. """ - for run_config in run_configs: - if isinstance(run_config, (DictConfig, ListConfig)): - model_metrics = sweep(run_config, device, seed, compute_openvino) - write_metrics(model_metrics, writers, folder) - else: - raise ValueError( - f"Expecting `run_config` of type DictConfig or ListConfig. Got {type(run_config)} instead." - ) - -def distribute_over_gpus(sweep_config: DictConfig | ListConfig, folder: str | None = None): - """Distribute metric collection over all available GPUs. This is done by splitting the list of configurations.""" - with ProcessPoolExecutor( - max_workers=torch.cuda.device_count(), mp_context=multiprocessing.get_context("spawn") - ) as executor: - run_configs = list(get_run_config(sweep_config.grid_search)) + def __init__(self, config: DictConfig | ListConfig, n_gpus: int = 0): + self.config = config + self.n_gpus = min(n_gpus, torch.cuda.device_count()) if n_gpus > 0 else torch.cuda.device_count() + self.runs_folder = f"runs/{datetime.strftime(datetime.now(), '%Y_%m_%d-%H_%M_%S')}" + Path(self.runs_folder).mkdir(exist_ok=True, parents=True) + self.run_failures: bool = False + + @exception_wrapper + def _sweep(self, device: int, run_config: DictConfig, seed: int = 42) -> dict[str, Any]: + """Run a single sweep on a device.""" + seed_everything(seed, workers=True) + # This assumes that `model_name` is always present in the sweep config. + model_config = get_configurable_parameters(model_name=run_config.model_name) + model_config.project.seed = seed + model_config = cast(DictConfig, model_config) # placate mypy + for param in run_config.keys(): + # grid search keys are always assumed to be strings + param = cast(str, param) # placate mypy + set_in_nested_config(model_config, param.split("."), run_config[param]) # type: ignore + + # convert image size to tuple in case it was updated by run config + model_config = update_input_size_config(model_config) + + # Set device in config. 0 - cpu, [0], [1].. - gpu id + if device != 0: + model_config.trainer.devices = [device - 1] + model_config.trainer.accelerator = "gpu" + + # Remove legacy flags + for legacy_device in ["num_processes", "gpus", "ipus", "tpu_cores"]: + if legacy_device in model_config.trainer: + model_config.trainer[legacy_device] = None + + # Run benchmarking for current config + model_metrics: dict[str, Any] = get_single_model_metrics(model_config=model_config) + output = f"One sweep run complete for model {model_config.model.name}" + output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else "" + output += str(model_metrics) + logger.info(output) + + # Append configuration of current run to the collected metrics + for key, value in run_config.items(): + # Skip adding model name to the dataframe + if key != "model_name": + model_metrics[key] = value + + # Add device name to list + model_metrics["device"] = "gpu" + model_metrics["model_name"] = run_config.model_name + + return model_metrics + + @exception_wrapper + def _compute( + self, progress: DictProxy, task_id: TaskID, device: int, run_configs: list[DictConfig] + ) -> dict[str, list[str]]: + """Iterate over configurations and compute & write metrics for single configuration. + + Args: + progress (DictProxy): Shared dict to write progress status for displaying in terminal. + task_id (TaskID): Task id for the current process. Used to identify the progress bar. + device (int): GPU id on which the benchmarking is run. + run_configs (list[DictConfig]): List of run configurations. + + Returns: + dict[str, list[str]]: Dictionary containing the metrics gathered from the sweep. + """ + result = [] + for idx, config in enumerate(run_configs): + output = self._sweep(device, config) + if output: + write_metrics(output.value, self.config.writer, self.runs_folder) + result.append(output.value) + else: + self.run_failures = True + + progress[str(task_id)] = {"completed": idx + 1, "total": len(run_configs)} + # convert list of dicts to dict of lists + return {key: [dic[key] for dic in result] for key in result[0]} + + @exception_wrapper + def _distribute(self): + run_configs = list(get_run_config(self.config.grid_search)) + step_size = math.ceil(len(run_configs) / self.n_gpus) jobs = [] - for device_id, run_split in enumerate( - range(0, len(run_configs), math.ceil(len(run_configs) / torch.cuda.device_count())) - ): - jobs.append( - executor.submit( - compute_on_gpu, - run_configs[run_split : run_split + math.ceil(len(run_configs) / torch.cuda.device_count())], - device_id + 1, - sweep_config.seed, - sweep_config.writer, - folder, - sweep_config.compute_openvino, - ) + results: list[dict[str, list[str]]] = [] + with Progress() as progress: + overall_progress_task = progress.add_task("[green]Overall Progress") + with multiprocessing.Manager() as manager: + _progress = manager.dict() + + with ProcessPoolExecutor( + max_workers=self.n_gpus, mp_context=multiprocessing.get_context("spawn") + ) as executor: + for device_id, run_split in enumerate(range(0, len(run_configs), step_size)): + task_id = progress.add_task(f"Running benchmark on GPU {device_id}") + _progress[str(task_id)] = {"completed": 0, "total": step_size} + jobs.append( + executor.submit( + self._compute, + _progress, + task_id, + device_id, + run_configs[run_split : run_split + step_size], + ) + ) + + # monitor the progress: + while (sum([job.done() for job in jobs])) < len(jobs): + progress.update( + overall_progress_task, + completed=sum([task["completed"] for task in _progress.values()]), + total=len(run_configs), + ) + for task_id, params in _progress.items(): + progress.update(TaskID(int(task_id)), completed=params["completed"], total=params["total"]) + + for job in jobs: + _result = job.result() + if _result: + results.append(_result.value) + else: + self.run_failures = True + + progress.update(overall_progress_task, completed=len(run_configs), total=len(run_configs)) + result = self._gather_results(results) + if result: + self._print_results(result.value) + else: + self.run_failures = True + + @exception_wrapper + def _gather_results(self, results: list[dict[str, list[str]]]) -> dict: + """Gather results from all processes. + + Args: + results (dict): Dictionary containing the results from all processes. + + Returns: + dict: Dictionary containing the results from all processes. + """ + result: dict[str, list] = {key: [] for key in results[0].keys()} + for _result in results: + for key, value in _result.items(): + result[key].extend(value) + return result + + @exception_wrapper + def _print_results(self, result: dict) -> None: + """Print the results in a tabular format. + + Args: + result (dict): Dictionary containing the results from all processes. + """ + console = Console() + table = Table(title="Benchmarking Results", show_header=True, header_style="bold magenta") + for column in result.keys(): + table.add_column(column) + for row in [*zip(*result.values())]: + table.add_row(*[str(value) for value in row]) + console.print(table) + + def run(self): + """Run the benchmarking.""" + logger.info( + "\n%s\n" "Starting benchmarking. %s" "\nDistributing benchmark collection over %s GPUs.", + "-" * 120, + datetime.strftime(datetime.now(), "%Y %m %d-%H %M %S"), + self.n_gpus, + ) + if not torch.cuda.is_available(): + logger.warning("Could not detect any cuda enabled devices") + + self._distribute() + if self.run_failures: + print( + "[bold red]There were some errors while collecting benchmark[/bold red]" + "\nPlease check the log file [magenta]runs/benchmark.log[/magenta]" + " for more details." ) - for job in jobs: - try: - job.result() - except Exception as exc: - raise Exception(f"Error occurred while computing benchmark on GPU {job}") from exc - - -def distribute(config: DictConfig | ListConfig): - """Run all cpu experiments on a single process. Distribute gpu experiments over all available gpus. - - Args: - config: (DictConfig | ListConfig): Sweep configuration. - """ - - runs_folder = datetime.strftime(datetime.now(), "%Y_%m_%d-%H_%M_%S") - devices = config.hardware - if not torch.cuda.is_available() and "gpu" in devices: - pl_logger.warning("Config requested GPU benchmarking but torch could not detect any cuda enabled devices") - elif {"cpu", "gpu"}.issubset(devices): - # Create process for gpu and cpu - with ProcessPoolExecutor(max_workers=2, mp_context=multiprocessing.get_context("spawn")) as executor: - jobs = [ - executor.submit(compute_on_cpu, config, runs_folder), - executor.submit(distribute_over_gpus, config, runs_folder), - ] - for job in as_completed(jobs): - try: - job.result() - except Exception as exception: - raise Exception(f"Error occurred while computing benchmark on device {job}") from exception - elif "cpu" in devices: - compute_on_cpu(config, folder=runs_folder) - elif "gpu" in devices: - distribute_over_gpus(config, folder=runs_folder) - if "wandb" in config.writer: - upload_to_wandb(team="anomalib", folder=runs_folder) - if "comet" in config.writer: - upload_to_comet(folder=runs_folder) - - -def sweep( - run_config: DictConfig | ListConfig, device: int = 0, seed: int = 42, convert_openvino: bool = False -) -> dict[str, str | float]: - """Go over all the values mentioned in `grid_search` parameter of the benchmarking config. - - Args: - run_config: (DictConfig | ListConfig, optional): Configuration for current run. - device (int, optional): Name of the device on which the model is trained. Defaults to 0 "cpu". - convert_openvino (bool, optional): Whether to convert the model to openvino format. Defaults to False. - - Returns: - dict[str, str | float]: Dictionary containing the metrics gathered from the sweep. - """ - seed_everything(seed, workers=True) - # This assumes that `model_name` is always present in the sweep config. - model_config = get_configurable_parameters(model_name=run_config.model_name) - model_config.project.seed = seed - - model_config = cast(DictConfig, model_config) # placate mypy - for param in run_config.keys(): - # grid search keys are always assumed to be strings - param = cast(str, param) # placate mypy - set_in_nested_config(model_config, param.split("."), run_config[param]) # type: ignore - - # convert image size to tuple in case it was updated by run config - model_config = update_input_size_config(model_config) - - # Set device in config. 0 - cpu, [0], [1].. - gpu id - if device != 0: - model_config.trainer.devices = [device - 1] - model_config.trainer.accelerator = "gpu" - else: - model_config.trainer.accelerator = "cpu" - - # Remove legacy flags - for legacy_device in ["num_processes", "gpus", "ipus", "tpu_cores"]: - if legacy_device in model_config.trainer: - model_config.trainer[legacy_device] = None - - if run_config.model_name in ["patchcore", "cflow"]: - convert_openvino = False # `torch.cdist` is not supported by onnx version 11 - # TODO Remove this line when issue #40 is fixed https://github.com/openvinotoolkit/anomalib/issues/40 - if model_config.model.input_size != (224, 224): - return {} # go to next run - - # Run benchmarking for current config - model_metrics = get_single_model_metrics(model_config=model_config, openvino_metrics=convert_openvino) - output = f"One sweep run complete for model {model_config.model.name}" - output += f" On category {model_config.dataset.category}" if model_config.dataset.category is not None else "" - output += str(model_metrics) - logger.info(output) - - # Append configuration of current run to the collected metrics - for key, value in run_config.items(): - # Skip adding model name to the dataframe - if key != "model_name": - model_metrics[key] = value - - # Add device name to list - model_metrics["device"] = "gpu" if device > 0 else "cpu" - model_metrics["model_name"] = run_config.model_name - - return model_metrics + logger.info("Benchmarking complete \n%s", "-" * 120) if __name__ == "__main__": - # Benchmarking entry point. - # Spawn multiple processes one for cpu and rest for the number of gpus available in the system. - # The idea is to distribute metrics collection over all the available devices. - parser = ArgumentParser() parser.add_argument("--config", type=Path, help="Path to sweep configuration") _args = parser.parse_args() - print("Benchmarking started 🏃‍♂️. This will take a while ⏲ depending on your configuration.") + print("[royal_blue1]Benchmarking started. This will take a while depending on your configuration.[/royal_blue1]") + _sweep_config = OmegaConf.load(_args.config) - distribute(_sweep_config) - print("Finished gathering results ⚡") + if "wandb" in _sweep_config.writer: + wandb.setup() # this is required when using multiprocessing otherwise wandb hangs + runner = Benchmark(_sweep_config, n_gpus=0) + runner.run() + print("[royal_blue1]Finished gathering results[/royal_blue1] ⚡") diff --git a/tools/benchmarking/utils/__init__.py b/tools/benchmarking/utils/__init__.py index b9eebfed78..3b5007afcc 100644 --- a/tools/benchmarking/utils/__init__.py +++ b/tools/benchmarking/utils/__init__.py @@ -3,6 +3,6 @@ # Copyright (C) 2022 Intel Corporation # SPDX-License-Identifier: Apache-2.0 -from .metrics import upload_to_comet, upload_to_wandb, write_metrics +from .metrics import write_metrics, write_to_comet, write_to_wandb -__all__ = ["write_metrics", "upload_to_comet", "upload_to_wandb"] +__all__ = ["write_metrics", "write_to_comet", "write_to_wandb"] diff --git a/tools/benchmarking/utils/metrics.py b/tools/benchmarking/utils/metrics.py index b1b7373d65..161c8c4ebb 100644 --- a/tools/benchmarking/utils/metrics.py +++ b/tools/benchmarking/utils/metrics.py @@ -5,9 +5,9 @@ from __future__ import annotations +import logging import random import string -from glob import glob from pathlib import Path import pandas as pd @@ -15,11 +15,13 @@ from comet_ml import Experiment from torch.utils.tensorboard.writer import SummaryWriter +logger = logging.getLogger(__name__) + def write_metrics( model_metrics: dict[str, str | float], writers: list[str], - folder: str | None = None, + folder: str, ): """Writes metrics to destination provided in the sweep config. @@ -32,22 +34,36 @@ def write_metrics( if model_metrics == {} or model_metrics is None: return + result_folder = Path(folder) # Write to CSV - metrics_df = pd.DataFrame(model_metrics, index=[0]) - result_folder = Path("runs") if folder is None else Path(f"runs/{folder}") - result_path = result_folder / f"{model_metrics['model_name']}_{model_metrics['device']}.csv" - Path.mkdir(result_path.parent, parents=True, exist_ok=True) - if not result_path.is_file(): - metrics_df.to_csv(result_path) - else: - metrics_df.to_csv(result_path, mode="a", header=False) + try: + metrics_df = pd.DataFrame(model_metrics, index=[0]) + result_path = result_folder / f"{model_metrics['model_name']}_{model_metrics['device']}.csv" + Path.mkdir(result_path.parent, parents=True, exist_ok=True) + if not result_path.is_file(): + metrics_df.to_csv(result_path) + else: + metrics_df.to_csv(result_path, mode="a", header=False) + except Exception as exception: + logger.exception(f"Could not write to csv. Exception: {exception}") + + project_name = f"benchmarking_{result_folder.name}" + tags = [] + for key, value in model_metrics.items(): + if all(name not in key.lower() for name in ["time", "image", "pixel", "throughput"]): + tags.append(str(value)) if "tensorboard" in writers: - write_to_tensorboard(model_metrics) + write_to_tensorboard(model_metrics, result_folder) + if "wandb" in writers: + write_to_wandb(model_metrics, project_name, tags) + if "comet" in writers: + write_to_comet(model_metrics, project_name, tags) def write_to_tensorboard( model_metrics: dict[str, str | float], + folder: Path, ): """Write model_metrics to tensorboard. @@ -63,7 +79,7 @@ def write_to_tensorboard( else: string_metrics[key] = metric scalar_prefixes.append(metric) - writer = SummaryWriter(f"runs/{model_metrics['model_name']}_{model_metrics['device']}") + writer = SummaryWriter(folder / "tfevents") for key, metric in model_metrics.items(): if isinstance(metric, (int, float, bool)): scalar_metrics[key.replace(".", "/")] = metric # need to join by / for tensorboard grouping @@ -90,56 +106,57 @@ def get_unique_key(str_len: int) -> str: return "".join([random.choice(string.ascii_lowercase) for _ in range(str_len)]) # nosec: B311 -def upload_to_wandb( +def write_to_wandb( + model_metrics: dict[str, str | float], + project_name: str, + tags: list[str], team: str = "anomalib", - folder: str | None = None, ): - """Upload the data in csv files to wandb. + """Write model_metrics to wandb. - Creates a project named benchmarking_[two random characters]. This is so that the project names are unique. - One issue is that it does not check for collision + > _Note:_ It is observed that any failure in wandb causes the run to hang. Use wandb writer with caution. Args: + model_metrics (dict[str, str | float]): Dictionary containing collected results. + project_name (str): Name of the project on wandb. + tags (list[str]): List of tags for the run. team (str, optional): Name of the team on wandb. This can also be the id of your personal account. - Defaults to "anomalib". - folder (optional, str): Sub-directory from which runs are picked up. Defaults to None. If none picks from runs. + Defaults to "anomalib". """ - project = f"benchmarking_{get_unique_key(2)}" - tag_list = ["dataset.category", "model_name", "dataset.image_size", "model.backbone", "device"] - search_path = "runs/*.csv" if folder is None else f"runs/{folder}/*.csv" - for csv_file in glob(search_path): - table = pd.read_csv(csv_file) - for index, row in table.iterrows(): - row = dict(row[1:]) # remove index column - tags = [str(row[column]) for column in tag_list if column in row.keys()] - wandb.init( - entity=team, project=project, name=f"{row['model_name']}_{row['dataset.category']}_{index}", tags=tags - ) - wandb.log(row) - wandb.finish() - - -def upload_to_comet( - folder: str | None = None, + for key, value in model_metrics.items(): + if all(name not in key.lower() for name in ["time", "image", "pixel", "throughput"]): + tags.append(str(value)) + run = wandb.init( + entity=team, + project=project_name, + name=f"{'_'.join(tags)}", + tags=tags, + settings={"silent": True, "show_info": False, "show_warnings": False, "show_errors": False}, + ) + run.log(model_metrics) + logger.info(f"Run logged at {run.url}") + run.finish(quiet=True) + + +def write_to_comet( + model_metrics: dict[str, str | float], + project_name: str, + tags: list[str], + team: str = "anomalib", ): - """Upload the data in csv files to comet. + """Write model_metrics to wandb. - Creates a project named benchmarking_[two random characters]. This is so that the project names are unique. - One issue is that it does not check for collision Args: - folder (optional, str): Sub-directory from which runs are picked up. Defaults to None. If none picks from runs. + model_metrics (dict[str, str | float]): Dictionary containing collected results. + project_name (str): Name of the project on comet. + tags (list[str]): List of tags for the run. + team (str, optional): Name of the team on wandb. This can also be the id of your personal account. + Defaults to "anomalib". """ - project = f"benchmarking_{get_unique_key(2)}" - tag_list = ["dataset.category", "model_name", "dataset.image_size", "model.backbone", "device"] - search_path = "runs/*.csv" if folder is None else f"runs/{folder}/*.csv" - for csv_file in glob(search_path): - table = pd.read_csv(csv_file) - for index, row in table.iterrows(): - row = dict(row[1:]) # remove index column - tags = [str(row[column]) for column in tag_list if column in row.keys()] - experiment = Experiment(project_name=project) - experiment.set_name(f"{row['model_name']}_{row['dataset.category']}_{index}") - experiment.log_metrics(row, step=1, epoch=1) # populates auto-generated charts on panel view - experiment.add_tags(tags) - experiment.log_table(filename=csv_file) + experiment = Experiment(project_name=project_name, workspace=team) + experiment.set_name(f"{'_'.join(tags)}") + experiment.log_metrics(model_metrics, step=1, epoch=1) # populates auto-generated charts on panel view + experiment.add_tags(tags) + logger.info(f"Run logged at {experiment.url}") + experiment.end()