diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 270693c81..d3668d0a7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,6 +2,10 @@ name: Tests on: [push, pull_request] +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + jobs: test: name: (${{ matrix.os }}, Py${{ matrix.python-version }}, sk${{ matrix.scikit-learn }}, sk-only:${{ matrix.sklearn-only }}) diff --git a/openml/__init__.py b/openml/__init__.py index ce5a01575..ab670c1db 100644 --- a/openml/__init__.py +++ b/openml/__init__.py @@ -117,4 +117,5 @@ def populate_cache(task_ids=None, dataset_ids=None, flow_ids=None, run_ids=None) ] # Load the scikit-learn extension by default -import openml.extensions.sklearn # noqa: F401 +# TODO(eddiebergman): Not sure why this is at the bottom of the file +import openml.extensions.sklearn # noqa: E402, F401 diff --git a/openml/config.py b/openml/config.py index 5d0d6c612..1dc07828b 100644 --- a/openml/config.py +++ b/openml/config.py @@ -12,17 +12,18 @@ from io import StringIO from pathlib import Path from typing import Dict, Union, cast +from typing_extensions import Literal from urllib.parse import urlparse logger = logging.getLogger(__name__) openml_logger = logging.getLogger("openml") -console_handler = None -file_handler = None # type: Optional[logging.Handler] +console_handler: logging.StreamHandler | None = None +file_handler: logging.handlers.RotatingFileHandler | None = None -def _create_log_handlers(create_file_handler: bool = True) -> None: +def _create_log_handlers(create_file_handler: bool = True) -> None: # noqa: FBT """Creates but does not attach the log handlers.""" - global console_handler, file_handler + global console_handler, file_handler # noqa: PLW0603 if console_handler is not None or file_handler is not None: logger.debug("Requested to create log handlers, but they are already created.") return @@ -35,7 +36,7 @@ def _create_log_handlers(create_file_handler: bool = True) -> None: if create_file_handler: one_mb = 2**20 - log_path = os.path.join(_root_cache_directory, "openml_python.log") + log_path = _root_cache_directory / "openml_python.log" file_handler = logging.handlers.RotatingFileHandler( log_path, maxBytes=one_mb, @@ -64,7 +65,7 @@ def _convert_log_levels(log_level: int) -> tuple[int, int]: def _set_level_register_and_store(handler: logging.Handler, log_level: int) -> None: """Set handler log level, register it if needed, save setting to config file if specified.""" - oml_level, py_level = _convert_log_levels(log_level) + _oml_level, py_level = _convert_log_levels(log_level) handler.setLevel(py_level) if openml_logger.level > py_level or openml_logger.level == logging.NOTSET: @@ -76,31 +77,27 @@ def _set_level_register_and_store(handler: logging.Handler, log_level: int) -> N def set_console_log_level(console_output_level: int) -> None: """Set console output to the desired level and register it with openml logger if needed.""" - global console_handler - _set_level_register_and_store(cast(logging.Handler, console_handler), console_output_level) + global console_handler # noqa: PLW0602 + assert console_handler is not None + _set_level_register_and_store(console_handler, console_output_level) def set_file_log_level(file_output_level: int) -> None: """Set file output to the desired level and register it with openml logger if needed.""" - global file_handler - _set_level_register_and_store(cast(logging.Handler, file_handler), file_output_level) + global file_handler # noqa: PLW0602 + assert file_handler is not None + _set_level_register_and_store(file_handler, file_output_level) # Default values (see also https://github.com/openml/OpenML/wiki/Client-API-Standards) +_user_path = Path("~").expanduser().absolute() _defaults = { "apikey": "", "server": "https://www.openml.org/api/v1/xml", "cachedir": ( - os.environ.get( - "XDG_CACHE_HOME", - os.path.join( - "~", - ".cache", - "openml", - ), - ) + os.environ.get("XDG_CACHE_HOME", _user_path / ".cache" / "openml") if platform.system() == "Linux" - else os.path.join("~", ".openml") + else _user_path / ".openml" ), "avoid_duplicate_runs": "True", "retry_policy": "human", @@ -124,18 +121,18 @@ def get_server_base_url() -> str: return server.split("/api")[0] -apikey = _defaults["apikey"] +apikey: str = _defaults["apikey"] # The current cache directory (without the server name) -_root_cache_directory = str(_defaults["cachedir"]) # so mypy knows it is a string -avoid_duplicate_runs = _defaults["avoid_duplicate_runs"] == "True" +_root_cache_directory = Path(_defaults["cachedir"]) +avoid_duplicate_runs: bool = _defaults["avoid_duplicate_runs"] == "True" retry_policy = _defaults["retry_policy"] connection_n_retries = int(_defaults["connection_n_retries"]) -def set_retry_policy(value: str, n_retries: int | None = None) -> None: - global retry_policy - global connection_n_retries +def set_retry_policy(value: Literal["human", "robot"], n_retries: int | None = None) -> None: + global retry_policy # noqa: PLW0603 + global connection_n_retries # noqa: PLW0603 default_retries_by_policy = {"human": 5, "robot": 50} if value not in default_retries_by_policy: @@ -145,6 +142,7 @@ def set_retry_policy(value: str, n_retries: int | None = None) -> None: ) if n_retries is not None and not isinstance(n_retries, int): raise TypeError(f"`n_retries` must be of type `int` or `None` but is `{type(n_retries)}`.") + if isinstance(n_retries, int) and n_retries < 1: raise ValueError(f"`n_retries` is '{n_retries}' but must be positive.") @@ -168,8 +166,8 @@ def start_using_configuration_for_example(cls) -> None: To configuration as was before this call is stored, and can be recovered by using the `stop_use_example_configuration` method. """ - global server - global apikey + global server # noqa: PLW0603 + global apikey # noqa: PLW0603 if cls._start_last_called and server == cls._test_server and apikey == cls._test_apikey: # Method is called more than once in a row without modifying the server or apikey. @@ -186,6 +184,7 @@ def start_using_configuration_for_example(cls) -> None: warnings.warn( f"Switching to the test server {server} to not upload results to the live server. " "Using the test server may result in reduced performance of the API!", + stacklevel=2, ) @classmethod @@ -199,8 +198,8 @@ def stop_using_configuration_for_example(cls) -> None: "`start_use_example_configuration` must be called first.", ) - global server - global apikey + global server # noqa: PLW0603 + global apikey # noqa: PLW0603 server = cast(str, cls._last_used_server) apikey = cast(str, cls._last_used_key) @@ -213,7 +212,7 @@ def determine_config_file_path() -> Path: else: config_dir = Path("~") / ".openml" # Still use os.path.expanduser to trigger the mock in the unit test - config_dir = Path(os.path.expanduser(config_dir)) + config_dir = Path(config_dir).expanduser().resolve() return config_dir / "config" @@ -226,18 +225,18 @@ def _setup(config: dict[str, str | int | bool] | None = None) -> None: openml.config.server = SOMESERVER We could also make it a property but that's less clear. """ - global apikey - global server - global _root_cache_directory - global avoid_duplicate_runs + global apikey # noqa: PLW0603 + global server # noqa: PLW0603 + global _root_cache_directory # noqa: PLW0603 + global avoid_duplicate_runs # noqa: PLW0603 config_file = determine_config_file_path() config_dir = config_file.parent # read config file, create directory for config file - if not os.path.exists(config_dir): + if not config_dir.exists(): try: - os.makedirs(config_dir, exist_ok=True) + config_dir.mkdir(exist_ok=True, parents=True) cache_exists = True except PermissionError: cache_exists = False @@ -250,20 +249,20 @@ def _setup(config: dict[str, str | int | bool] | None = None) -> None: avoid_duplicate_runs = bool(config.get("avoid_duplicate_runs")) - apikey = cast(str, config["apikey"]) - server = cast(str, config["server"]) - short_cache_dir = cast(str, config["cachedir"]) + apikey = str(config["apikey"]) + server = str(config["server"]) + short_cache_dir = Path(config["cachedir"]) tmp_n_retries = config["connection_n_retries"] n_retries = int(tmp_n_retries) if tmp_n_retries is not None else None - set_retry_policy(cast(str, config["retry_policy"]), n_retries) + set_retry_policy(config["retry_policy"], n_retries) - _root_cache_directory = os.path.expanduser(short_cache_dir) + _root_cache_directory = short_cache_dir.expanduser().resolve() # create the cache subdirectory - if not os.path.exists(_root_cache_directory): + if not _root_cache_directory.exists(): try: - os.makedirs(_root_cache_directory, exist_ok=True) + _root_cache_directory.mkdir(exist_ok=True, parents=True) except PermissionError: openml_logger.warning( "No permission to create openml cache directory at %s! This can result in " @@ -288,7 +287,7 @@ def set_field_in_config_file(field: str, value: str) -> None: globals()[field] = value config_file = determine_config_file_path() config = _parse_config(str(config_file)) - with open(config_file, "w") as fh: + with config_file.open("w") as fh: for f in _defaults: # We can't blindly set all values based on globals() because when the user # sets it through config.FIELD it should not be stored to file. @@ -303,6 +302,7 @@ def set_field_in_config_file(field: str, value: str) -> None: def _parse_config(config_file: str | Path) -> dict[str, str]: """Parse the config file, set up defaults.""" + config_file = Path(config_file) config = configparser.RawConfigParser(defaults=_defaults) # The ConfigParser requires a [SECTION_HEADER], which we do not expect in our config file. @@ -310,7 +310,7 @@ def _parse_config(config_file: str | Path) -> dict[str, str]: config_file_ = StringIO() config_file_.write("[FAKE_SECTION]\n") try: - with open(config_file) as fh: + with config_file.open("w") as fh: for line in fh: config_file_.write(line) except FileNotFoundError: @@ -326,13 +326,14 @@ def get_config_as_dict() -> dict[str, str | int | bool]: config = {} # type: Dict[str, Union[str, int, bool]] config["apikey"] = apikey config["server"] = server - config["cachedir"] = _root_cache_directory + config["cachedir"] = str(_root_cache_directory) config["avoid_duplicate_runs"] = avoid_duplicate_runs config["connection_n_retries"] = connection_n_retries config["retry_policy"] = retry_policy return config +# NOTE: For backwards compatibility, we keep the `str` def get_cache_directory() -> str: """Get the current cache directory. @@ -354,11 +355,11 @@ def get_cache_directory() -> str: """ url_suffix = urlparse(server).netloc - reversed_url_suffix = os.sep.join(url_suffix.split(".")[::-1]) - return os.path.join(_root_cache_directory, reversed_url_suffix) + reversed_url_suffix = os.sep.join(url_suffix.split(".")[::-1]) # noqa: PTH118 + return os.path.join(_root_cache_directory, reversed_url_suffix) # noqa: PTH118 -def set_root_cache_directory(root_cache_directory: str) -> None: +def set_root_cache_directory(root_cache_directory: str | Path) -> None: """Set module-wide base cache directory. Sets the root cache directory, wherin the cache directories are @@ -377,8 +378,8 @@ def set_root_cache_directory(root_cache_directory: str) -> None: -------- get_cache_directory """ - global _root_cache_directory - _root_cache_directory = root_cache_directory + global _root_cache_directory # noqa: PLW0603 + _root_cache_directory = Path(root_cache_directory) start_using_configuration_for_example = ( diff --git a/openml/extensions/sklearn/__init__.py b/openml/extensions/sklearn/__init__.py index e10b069ba..9c1c6cba6 100644 --- a/openml/extensions/sklearn/__init__.py +++ b/openml/extensions/sklearn/__init__.py @@ -1,15 +1,21 @@ # License: BSD 3-Clause +from __future__ import annotations + +from typing import TYPE_CHECKING from openml.extensions import register_extension from .extension import SklearnExtension +if TYPE_CHECKING: + import pandas as pd + __all__ = ["SklearnExtension"] register_extension(SklearnExtension) -def cont(X): +def cont(X: pd.DataFrame) -> pd.Series: """Returns True for all non-categorical columns, False for the rest. This is a helper function for OpenML datasets encoded as DataFrames simplifying the handling @@ -23,7 +29,7 @@ def cont(X): return X.dtypes != "category" -def cat(X): +def cat(X: pd.DataFrame) -> pd.Series: """Returns True for all categorical columns, False for the rest. This is a helper function for OpenML datasets encoded as DataFrames simplifying the handling diff --git a/openml/runs/functions.py b/openml/runs/functions.py index 37f79110d..28cf2d1d3 100644 --- a/openml/runs/functions.py +++ b/openml/runs/functions.py @@ -6,7 +6,8 @@ import time import warnings from collections import OrderedDict -from typing import TYPE_CHECKING, Any, cast # F401 +from pathlib import Path +from typing import TYPE_CHECKING, Any import numpy as np import pandas as pd @@ -49,6 +50,7 @@ # get_dict is in run.py to avoid circular imports RUNS_CACHE_DIR_NAME = "runs" +ERROR_CODE = 512 def run_model_on_task( @@ -444,18 +446,18 @@ def run_exists(task_id: int, setup_id: int) -> set[int]: return set() try: - result = cast( - pd.DataFrame, - list_runs(task=[task_id], setup=[setup_id], output_format="dataframe"), - ) + result = list_runs(task=[task_id], setup=[setup_id], output_format="dataframe") + assert isinstance(result, pd.DataFrame) # TODO(eddiebergman): Remove once #1299 return set() if result.empty else set(result["run_id"]) except OpenMLServerException as exception: - # error code 512 implies no results. The run does not exist yet - assert exception.code == 512 + # error code implies no results. The run does not exist yet + if exception.code != ERROR_CODE: + raise exception return set() -def _run_task_get_arffcontent( +def _run_task_get_arffcontent( # noqa: PLR0915, PLR0912, PLR0913, C901 + *, model: Any, task: OpenMLTask, extension: Extension, @@ -494,8 +496,8 @@ def _run_task_get_arffcontent( A tuple containing the arfftrace content, the OpenML run trace, the global and local evaluation measures. """ - arff_datacontent = [] # type: List[List] - traces = [] # type: List[OpenMLRunTrace] + arff_datacontent = [] # type: list[list] + traces = [] # type: list[OpenMLRunTrace] # stores fold-based evaluation measures. In case of a sample based task, # this information is multiple times overwritten, but due to the ordering # of tne loops, eventually it contains the information based on the full @@ -527,7 +529,18 @@ def _run_task_get_arffcontent( # Execute runs in parallel # assuming the same number of tasks as workers (n_jobs), the total compute time for this # statement will be similar to the slowest run - job_rvals = Parallel(verbose=0, n_jobs=n_jobs)( + # TODO(eddiebergman): Simplify this + job_rvals: list[ + tuple[ + np.ndarray, + pd.DataFrame | None, + np.ndarray, + pd.DataFrame | None, + OpenMLRunTrace | None, + OrderedDict[str, float], + ], + ] + job_rvals = Parallel(verbose=0, n_jobs=n_jobs)( # type: ignore delayed(_run_task_get_arffcontent_parallel_helper)( extension=extension, fold_no=fold_no, @@ -538,7 +551,7 @@ def _run_task_get_arffcontent( dataset_format=dataset_format, configuration=_config, ) - for n_fit, rep_no, fold_no, sample_no in jobs + for _n_fit, rep_no, fold_no, sample_no in jobs ) # job_rvals contain the output of all the runs with one-to-one correspondence with `jobs` for n_fit, rep_no, fold_no, sample_no in jobs: @@ -550,7 +563,13 @@ def _run_task_get_arffcontent( # add client-side calculated metrics. These is used on the server as # consistency check, only useful for supervised tasks - def _calculate_local_measure(sklearn_fn, openml_name): + def _calculate_local_measure( + sklearn_fn, + openml_name, + test_y=test_y, + pred_y=pred_y, + user_defined_measures_fold=user_defined_measures_fold, + ): user_defined_measures_fold[openml_name] = sklearn_fn(test_y, pred_y) if isinstance(task, (OpenMLClassificationTask, OpenMLLearningCurveTask)): @@ -644,15 +663,14 @@ def _calculate_local_measure(sklearn_fn, openml_name): sample_no ] = user_defined_measures_fold[measure] + trace: OpenMLRunTrace | None = None if len(traces) > 0: - if len(traces) != n_fit: + if len(traces) != len(jobs): raise ValueError( - f"Did not find enough traces (expected {n_fit}, found {len(traces)})", + f"Did not find enough traces (expected {len(jobs)}, found {len(traces)})", ) - else: - trace = OpenMLRunTrace.merge_traces(traces) - else: - trace = None + + trace = OpenMLRunTrace.merge_traces(traces) return ( arff_datacontent, @@ -662,7 +680,7 @@ def _calculate_local_measure(sklearn_fn, openml_name): ) -def _run_task_get_arffcontent_parallel_helper( +def _run_task_get_arffcontent_parallel_helper( # noqa: PLR0913 extension: Extension, fold_no: int, model: Any, @@ -721,24 +739,28 @@ def _run_task_get_arffcontent_parallel_helper( if isinstance(task, OpenMLSupervisedTask): x, y = task.get_X_and_y(dataset_format=dataset_format) - if dataset_format == "dataframe": + if isinstance(x, pd.DataFrame): + assert isinstance(y, (pd.Series, pd.DataFrame)) train_x = x.iloc[train_indices] train_y = y.iloc[train_indices] test_x = x.iloc[test_indices] test_y = y.iloc[test_indices] else: + # TODO(eddiebergman): Complains spmatrix doesn't support __getitem__ for typing train_x = x[train_indices] train_y = y[train_indices] test_x = x[test_indices] test_y = y[test_indices] elif isinstance(task, OpenMLClusteringTask): x = task.get_X(dataset_format=dataset_format) - train_x = x.iloc[train_indices] if dataset_format == "dataframe" else x[train_indices] + # TODO(eddiebergman): Complains spmatrix doesn't support __getitem__ for typing + train_x = x.iloc[train_indices] if isinstance(x, pd.DataFrame) else x[train_indices] train_y = None test_x = None test_y = None else: raise NotImplementedError(task.task_type) + config.logger.info( "Going to run model {} on dataset {} for repeat {} fold {} sample {}".format( str(model), @@ -784,7 +806,7 @@ def get_runs(run_ids): @openml.utils.thread_safe_if_oslo_installed -def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun: +def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun: # noqa: FBT002, FBT001 """Gets run corresponding to run_id. Parameters @@ -802,27 +824,26 @@ def get_run(run_id: int, ignore_cache: bool = False) -> OpenMLRun: run : OpenMLRun Run corresponding to ID, fetched from the server. """ - run_dir = openml.utils._create_cache_directory_for_id(RUNS_CACHE_DIR_NAME, run_id) - run_file = os.path.join(run_dir, "description.xml") + run_dir = Path(openml.utils._create_cache_directory_for_id(RUNS_CACHE_DIR_NAME, run_id)) + run_file = run_dir / "description.xml" - if not os.path.exists(run_dir): - os.makedirs(run_dir) + run_dir.mkdir(parents=True, exist_ok=True) try: if not ignore_cache: return _get_cached_run(run_id) - else: - raise OpenMLCacheException(message="dummy") + + raise OpenMLCacheException(message="dummy") except OpenMLCacheException: run_xml = openml._api_calls._perform_api_call("run/%d" % run_id, "get") - with open(run_file, "w", encoding="utf8") as fh: + with run_file.open("w", encoding="utf8") as fh: fh.write(run_xml) return _create_run_from_xml(run_xml) -def _create_run_from_xml(xml, from_server=True): +def _create_run_from_xml(xml: str, from_server: bool = True) -> OpenMLRun: # noqa: PLR0915, PLR0912, C901, FBT """Create a run object from xml returned from server. Parameters @@ -840,6 +861,7 @@ def _create_run_from_xml(xml, from_server=True): New run object representing run_xml. """ + # TODO(eddiebergman): type this def obtain_field(xml_obj, fieldname, from_server, cast=None): # this function can be used to check whether a field is present in an # object. if it is not present, either returns None or throws an error @@ -848,10 +870,11 @@ def obtain_field(xml_obj, fieldname, from_server, cast=None): if cast is not None: return cast(xml_obj[fieldname]) return xml_obj[fieldname] - elif not from_server: + + if not from_server: return None - else: - raise AttributeError("Run XML does not contain required (server) " "field: ", fieldname) + + raise AttributeError("Run XML does not contain required (server) " "field: ", fieldname) run = xmltodict.parse(xml, force_list=["oml:file", "oml:evaluation", "oml:parameter_setting"])[ "oml:run" @@ -968,12 +991,12 @@ def obtain_field(xml_obj, fieldname, from_server, cast=None): task = openml.tasks.get_task(task_id) if task.task_type_id == TaskType.SUBGROUP_DISCOVERY: raise NotImplementedError("Subgroup discovery tasks are not yet supported.") - else: - # JvR: actually, I am not sure whether this error should be raised. - # a run can consist without predictions. But for now let's keep it - # Matthias: yes, it should stay as long as we do not really handle - # this stuff - raise ValueError("No prediction files for run %d in run " "description XML" % run_id) + + # JvR: actually, I am not sure whether this error should be raised. + # a run can consist without predictions. But for now let's keep it + # Matthias: yes, it should stay as long as we do not really handle + # this stuff + raise ValueError("No prediction files for run %d in run description XML" % run_id) tags = openml.utils.extract_xml_tags("oml:tag", run) diff --git a/openml/tasks/functions.py b/openml/tasks/functions.py index e85abf060..5764a9c86 100644 --- a/openml/tasks/functions.py +++ b/openml/tasks/functions.py @@ -4,7 +4,8 @@ import os import re import warnings -from collections import OrderedDict +from typing import Any +from typing_extensions import Literal import pandas as pd import xmltodict @@ -27,7 +28,7 @@ TASKS_CACHE_DIR_NAME = "tasks" -def _get_cached_tasks(): +def _get_cached_tasks() -> dict[int, OpenMLTask]: """Return a dict of all the tasks which are cached locally. Returns @@ -36,22 +37,14 @@ def _get_cached_tasks(): A dict of all the cached tasks. Each task is an instance of OpenMLTask. """ - tasks = OrderedDict() - task_cache_dir = openml.utils._create_cache_directory(TASKS_CACHE_DIR_NAME) directory_content = os.listdir(task_cache_dir) directory_content.sort() + # Find all dataset ids for which we have downloaded the dataset # description - - for filename in directory_content: - if not re.match(r"[0-9]*", filename): - continue - - tid = int(filename) - tasks[tid] = _get_cached_task(tid) - - return tasks + tids = (int(did) for did in directory_content if re.match(r"[0-9]*", did)) + return {tid: _get_cached_task(tid) for tid in tids} def _get_cached_task(tid: int) -> OpenMLTask: @@ -68,12 +61,14 @@ def _get_cached_task(tid: int) -> OpenMLTask: """ tid_cache_dir = openml.utils._create_cache_directory_for_id(TASKS_CACHE_DIR_NAME, tid) + task_xml_path = tid_cache_dir / "task.xml" try: - with open(os.path.join(tid_cache_dir, "task.xml"), encoding="utf8") as fh: + with task_xml_path.open(encoding="utf8") as fh: return _create_task_from_xml(fh.read()) - except OSError: + except OSError as e: + raise OpenMLCacheException(f"Task file for tid {tid} not cached") from e + finally: openml.utils._remove_cache_dir_for_id(TASKS_CACHE_DIR_NAME, tid_cache_dir) - raise OpenMLCacheException("Task file for tid %d not " "cached" % tid) def _get_estimation_procedure_list(): @@ -93,12 +88,14 @@ def _get_estimation_procedure_list(): # Minimalistic check if the XML is useful if "oml:estimationprocedures" not in procs_dict: raise ValueError("Error in return XML, does not contain tag oml:estimationprocedures.") - elif "@xmlns:oml" not in procs_dict["oml:estimationprocedures"]: + + if "@xmlns:oml" not in procs_dict["oml:estimationprocedures"]: raise ValueError( "Error in return XML, does not contain tag " "@xmlns:oml as a child of oml:estimationprocedures.", ) - elif procs_dict["oml:estimationprocedures"]["@xmlns:oml"] != "http://openml.org/openml": + + if procs_dict["oml:estimationprocedures"]["@xmlns:oml"] != "http://openml.org/openml": raise ValueError( "Error in return XML, value of " "oml:estimationprocedures/@xmlns:oml is not " @@ -106,25 +103,25 @@ def _get_estimation_procedure_list(): % str(procs_dict["oml:estimationprocedures"]["@xmlns:oml"]), ) - procs = [] + procs: list[dict[str, Any]] = [] for proc_ in procs_dict["oml:estimationprocedures"]["oml:estimationprocedure"]: task_type_int = int(proc_["oml:ttid"]) try: task_type_id = TaskType(task_type_int) + procs.append( + { + "id": int(proc_["oml:id"]), + "task_type_id": task_type_id, + "name": proc_["oml:name"], + "type": proc_["oml:type"], + }, + ) except ValueError as e: warnings.warn( f"Could not create task type id for {task_type_int} due to error {e}", RuntimeWarning, + stacklevel=2, ) - continue - procs.append( - { - "id": int(proc_["oml:id"]), - "task_type_id": task_type_id, - "name": proc_["oml:name"], - "type": proc_["oml:type"], - }, - ) return procs @@ -230,10 +227,15 @@ def _list_tasks(task_type=None, output_format="dict", **kwargs): if operator == "task_id": value = ",".join([str(int(i)) for i in value]) api_call += f"/{operator}/{value}" + return __list_tasks(api_call=api_call, output_format=output_format) -def __list_tasks(api_call, output_format="dict"): +# TODO(eddiebergman): overload todefine type returned +def __list_tasks( + api_call: str, + output_format: Literal["dict", "dataframe"] = "dict", +) -> dict | pd.DataFrame: """Returns a dictionary or a Pandas DataFrame with information about OpenML tasks. Parameters @@ -260,12 +262,14 @@ def __list_tasks(api_call, output_format="dict"): tasks_dict = xmltodict.parse(xml_string, force_list=("oml:task", "oml:input")) # Minimalistic check if the XML is useful if "oml:tasks" not in tasks_dict: - raise ValueError('Error in return XML, does not contain "oml:runs": %s' % str(tasks_dict)) - elif "@xmlns:oml" not in tasks_dict["oml:tasks"]: + raise ValueError(f'Error in return XML, does not contain "oml:runs": {tasks_dict}') + + if "@xmlns:oml" not in tasks_dict["oml:tasks"]: raise ValueError( - "Error in return XML, does not contain " '"oml:runs"/@xmlns:oml: %s' % str(tasks_dict), + f'Error in return XML, does not contain "oml:runs"/@xmlns:oml: {tasks_dict}' ) - elif tasks_dict["oml:tasks"]["@xmlns:oml"] != "http://openml.org/openml": + + if tasks_dict["oml:tasks"]["@xmlns:oml"] != "http://openml.org/openml": raise ValueError( "Error in return XML, value of " '"oml:runs"/@xmlns:oml is not ' @@ -289,8 +293,10 @@ def __list_tasks(api_call, output_format="dict"): warnings.warn( f"Could not create task type id for {task_type_int} due to error {e}", RuntimeWarning, + stacklevel=2, ) continue + task = { "tid": tid, "ttid": task_type_id, @@ -301,12 +307,12 @@ def __list_tasks(api_call, output_format="dict"): } # Other task inputs - for input in task_.get("oml:input", []): - if input["@name"] == "estimation_procedure": - task[input["@name"]] = proc_dict[int(input["#text"])]["name"] + for _input in task_.get("oml:input", []): + if _input["@name"] == "estimation_procedure": + task[_input["@name"]] = proc_dict[int(_input["#text"])]["name"] else: - value = input.get("#text") - task[input["@name"]] = value + value = _input.get("#text") + task[_input["@name"]] = value # The number of qualities can range from 0 to infinity for quality in task_.get("oml:quality", []): @@ -321,10 +327,13 @@ def __list_tasks(api_call, output_format="dict"): tasks[tid] = task except KeyError as e: if tid is not None: - warnings.warn("Invalid xml for task %d: %s\nFrom %s" % (tid, e, task_)) + warnings.warn( + "Invalid xml for task %d: %s\nFrom %s" % (tid, e, task_), + RuntimeWarning, + stacklevel=2, + ) else: - warnings.warn(f"Could not find key {e} in {task_}!") - continue + warnings.warn(f"Could not find key {e} in {task_}!", RuntimeWarning, stacklevel=2) if output_format == "dataframe": tasks = pd.DataFrame.from_dict(tasks, orient="index") @@ -332,10 +341,11 @@ def __list_tasks(api_call, output_format="dict"): return tasks +# TODO(eddiebergman): Maybe since this isn't public api, we can make it keyword only? def get_tasks( task_ids: list[int], - download_data: bool = True, - download_qualities: bool = True, + download_data: bool = True, # noqa: FBT001, FBT002 + download_qualities: bool = True, # noqa: FBT001, FBT002 ) -> list[OpenMLTask]: """Download tasks. @@ -405,6 +415,7 @@ def get_task( "of ``True`` and be independent from `download_data`. To disable this message until " "version 0.15 explicitly set `download_splits` to a bool.", FutureWarning, + stacklevel=3, ) download_splits = get_dataset_kwargs.get("download_data", True) @@ -413,17 +424,15 @@ def get_task( warnings.warn( "Task id must be specified as `int` from 0.14.0 onwards.", FutureWarning, + stacklevel=3, ) try: task_id = int(task_id) - except (ValueError, TypeError): - raise ValueError("Dataset ID is neither an Integer nor can be cast to an Integer.") + except (ValueError, TypeError) as e: + raise ValueError("Dataset ID is neither an Integer nor can be cast to an Integer.") from e - tid_cache_dir = openml.utils._create_cache_directory_for_id( - TASKS_CACHE_DIR_NAME, - task_id, - ) + tid_cache_dir = openml.utils._create_cache_directory_for_id(TASKS_CACHE_DIR_NAME, task_id) try: task = _get_task_description(task_id) @@ -438,34 +447,26 @@ def get_task( if download_splits and isinstance(task, OpenMLSupervisedTask): task.download_split() except Exception as e: - openml.utils._remove_cache_dir_for_id( - TASKS_CACHE_DIR_NAME, - tid_cache_dir, - ) + openml.utils._remove_cache_dir_for_id(TASKS_CACHE_DIR_NAME, tid_cache_dir) raise e return task -def _get_task_description(task_id): +def _get_task_description(task_id: int): try: return _get_cached_task(task_id) except OpenMLCacheException: - xml_file = os.path.join( - openml.utils._create_cache_directory_for_id( - TASKS_CACHE_DIR_NAME, - task_id, - ), - "task.xml", - ) + _cache_dir = openml.utils._create_cache_directory_for_id(TASKS_CACHE_DIR_NAME, task_id) + xml_file = _cache_dir / "task.xml" task_xml = openml._api_calls._perform_api_call("task/%d" % task_id, "get") - with open(xml_file, "w", encoding="utf8") as fh: + with xml_file.open("w", encoding="utf8") as fh: fh.write(task_xml) return _create_task_from_xml(task_xml) -def _create_task_from_xml(xml): +def _create_task_from_xml(xml: str) -> OpenMLTask: """Create a task given a xml string. Parameters @@ -541,6 +542,7 @@ def _create_task_from_xml(xml): return cls(**common_kwargs) +# TODO(eddiebergman): overload on `task_type` def create_task( task_type: TaskType, dataset_id: int, @@ -592,16 +594,16 @@ def create_task( if task_cls is None: raise NotImplementedError(f"Task type {task_type:d} not supported.") - else: - return task_cls( - task_type_id=task_type, - task_type=None, - data_set_id=dataset_id, - target_name=target_name, - estimation_procedure_id=estimation_procedure_id, - evaluation_measure=evaluation_measure, - **kwargs, - ) + + return task_cls( + task_type_id=task_type, + task_type=None, + data_set_id=dataset_id, + target_name=target_name, + estimation_procedure_id=estimation_procedure_id, + evaluation_measure=evaluation_measure, + **kwargs, + ) def delete_task(task_id: int) -> bool: diff --git a/openml/tasks/split.py b/openml/tasks/split.py index f90ddc7cd..82a44216b 100644 --- a/openml/tasks/split.py +++ b/openml/tasks/split.py @@ -1,14 +1,20 @@ # License: BSD 3-Clause from __future__ import annotations -import os import pickle -from collections import OrderedDict, namedtuple +from collections import OrderedDict +from pathlib import Path +from typing_extensions import NamedTuple import arff import numpy as np -Split = namedtuple("Split", ["train", "test"]) + +class Split(NamedTuple): + """A single split of a dataset.""" + + train: np.ndarray + test: np.ndarray class OpenMLSplit: @@ -21,7 +27,12 @@ class OpenMLSplit: split : dict """ - def __init__(self, name, description, split): + def __init__( + self, + name: int | str, + description: str, + split: dict[int, dict[int, dict[int, np.ndarray]]], + ): self.description = description self.name = name self.split = {} @@ -36,8 +47,11 @@ def __init__(self, name, description, split): self.split[repetition][fold][sample] = split[repetition][fold][sample] self.repeats = len(self.split) + + # TODO(eddiebergman): Better error message if any(len(self.split[0]) != len(self.split[i]) for i in range(self.repeats)): raise ValueError("") + self.folds = len(self.split[0]) self.samples = len(self.split[0][0]) @@ -69,22 +83,25 @@ def __eq__(self, other): return True @classmethod - def _from_arff_file(cls, filename: str) -> OpenMLSplit: + def _from_arff_file(cls, filename: Path) -> OpenMLSplit: # noqa: C901, PLR0912 repetitions = None + name = None - pkl_filename = filename.replace(".arff", ".pkl.py3") + pkl_filename = filename.with_suffix(".pkl.py3") - if os.path.exists(pkl_filename): - with open(pkl_filename, "rb") as fh: - _ = pickle.load(fh) - repetitions = _["repetitions"] - name = _["name"] + if pkl_filename.exists(): + with pkl_filename.open("rb") as fh: + # TODO(eddiebergman): Would be good to figure out what _split is and assert it is + _split = pickle.load(fh) # noqa: S301 + repetitions = _split["repetitions"] + name = _split["name"] # Cache miss if repetitions is None: # Faster than liac-arff and sufficient in this situation! - if not os.path.exists(filename): - raise FileNotFoundError("Split arff %s does not exist!" % filename) + if not filename.exists(): + raise FileNotFoundError(f"Split arff {filename} does not exist!") + file_data = arff.load(open(filename), return_type=arff.DENSE_GEN) splits = file_data["data"] name = file_data["relation"] @@ -130,12 +147,13 @@ def _from_arff_file(cls, filename: str) -> OpenMLSplit: np.array(repetitions[repetition][fold][sample][1], dtype=np.int32), ) - with open(pkl_filename, "wb") as fh: + with pkl_filename.open("wb") as fh: pickle.dump({"name": name, "repetitions": repetitions}, fh, protocol=2) + assert name is not None return cls(name, "", repetitions) - def from_dataset(self, X, Y, folds, repeats): + def from_dataset(self, X, Y, folds: int, repeats: int): """Generates a new OpenML dataset object from input data and cross-validation settings. Parameters @@ -156,7 +174,7 @@ def from_dataset(self, X, Y, folds, repeats): """ raise NotImplementedError() - def get(self, repeat=0, fold=0, sample=0): + def get(self, repeat: int = 0, fold: int = 0, sample: int = 0) -> np.ndarray: """Returns the specified data split from the CrossValidationSplit object. Parameters diff --git a/openml/tasks/task.py b/openml/tasks/task.py index 5a39cea11..a6c672a0a 100644 --- a/openml/tasks/task.py +++ b/openml/tasks/task.py @@ -1,15 +1,17 @@ # License: BSD 3-Clause +# TODO(eddbergman): Seems like a lot of the subclasses could just get away with setting +# a `ClassVar` for whatever changes as their `__init__` defaults, less duplicated code. from __future__ import annotations -import os import warnings from abc import ABC from collections import OrderedDict from enum import Enum +from pathlib import Path from typing import TYPE_CHECKING, Any -from warnings import warn import openml._api_calls +import openml.config from openml import datasets from openml.base import OpenMLBase from openml.utils import _create_cache_directory_for_id @@ -22,7 +24,11 @@ import scipy.sparse +# TODO(eddiebergman): Should use `auto()` but might be too late if these numbers are used +# and stored on server. class TaskType(Enum): + """Possible task types as defined in OpenML.""" + SUPERVISED_CLASSIFICATION = 1 SUPERVISED_REGRESSION = 2 LEARNING_CURVE = 3 @@ -59,7 +65,7 @@ class OpenMLTask(OpenMLBase): Refers to the URL of the data splits used for the OpenML task. """ - def __init__( + def __init__( # noqa: PLR0913 self, task_id: int | None, task_type_id: TaskType, @@ -76,25 +82,27 @@ def __init__( self.task_type = task_type self.dataset_id = int(data_set_id) self.evaluation_measure = evaluation_measure - self.estimation_procedure = {} # type: Dict[str, Optional[Union[str, Dict]]] # E501 + self.estimation_procedure: dict[str, str | dict | None] = {} self.estimation_procedure["type"] = estimation_procedure_type self.estimation_procedure["parameters"] = estimation_parameters self.estimation_procedure["data_splits_url"] = data_splits_url self.estimation_procedure_id = estimation_procedure_id - self.split = None # type: Optional[OpenMLSplit] + self.split: OpenMLSplit | None = None @classmethod def _entity_letter(cls) -> str: return "t" @property - def id(self) -> int | None: + def id(self) -> int | None: # noqa: A003 + """Return the OpenML ID of this task.""" return self.task_id def _get_repr_body_fields(self) -> list[tuple[str, str | int | list[str]]]: """Collect all information to display in the __repr__ body.""" + base_server_url = openml.config.get_server_base_url() fields: dict[str, Any] = { - "Task Type Description": f"{openml.config.get_server_base_url()}/tt/{self.task_type_id}", + "Task Type Description": f"{base_server_url}/tt/{self.task_type_id}" } if self.task_id is not None: fields["Task ID"] = self.task_id @@ -103,10 +111,17 @@ def _get_repr_body_fields(self) -> list[tuple[str, str | int | list[str]]]: fields["Evaluation Measure"] = self.evaluation_measure if self.estimation_procedure is not None: fields["Estimation Procedure"] = self.estimation_procedure["type"] - if getattr(self, "target_name", None) is not None: - fields["Target Feature"] = self.target_name - if hasattr(self, "class_labels") and self.class_labels is not None: - fields["# of Classes"] = len(self.class_labels) + + # TODO(eddiebergman): Subclasses could advertise/provide this, instead of having to + # have the base class know about it's subclasses. + target_name = getattr(self, "target_name", None) + if target_name is not None: + fields["Target Feature"] = target_name + + class_labels = getattr(self, "class_labels", None) + if class_labels is not None: + fields["# of Classes"] = len(class_labels) + if hasattr(self, "cost_matrix"): fields["Cost Matrix"] = "Available" @@ -124,7 +139,7 @@ def _get_repr_body_fields(self) -> list[tuple[str, str | int | list[str]]]: return [(key, fields[key]) for key in order if key in fields] def get_dataset(self) -> datasets.OpenMLDataset: - """Download dataset associated with task""" + """Download dataset associated with task.""" return datasets.get_dataset(self.dataset_id) def get_train_test_split_indices( @@ -133,34 +148,31 @@ def get_train_test_split_indices( repeat: int = 0, sample: int = 0, ) -> tuple[np.ndarray, np.ndarray]: + """Get the indices of the train and test splits for a given task.""" # Replace with retrieve from cache if self.split is None: self.split = self.download_split() - train_indices, test_indices = self.split.get( - repeat=repeat, - fold=fold, - sample=sample, - ) - return train_indices, test_indices + return self.split.get(repeat=repeat, fold=fold, sample=sample) - def _download_split(self, cache_file: str): + def _download_split(self, cache_file: Path) -> None: + # TODO(eddiebergman): Not sure about this try to read and error approach try: - with open(cache_file, encoding="utf8"): + with cache_file.open(encoding="utf8"): pass except OSError: split_url = self.estimation_procedure["data_splits_url"] openml._api_calls._download_text_file( source=str(split_url), - output_path=cache_file, + output_path=str(cache_file), ) def download_split(self) -> OpenMLSplit: """Download the OpenML split for a given task.""" - cached_split_file = os.path.join( - _create_cache_directory_for_id("tasks", self.task_id), - "datasplits.arff", - ) + # TODO(eddiebergman): Can this every be `None`? + assert self.task_id is not None + cache_dir = _create_cache_directory_for_id("tasks", self.task_id) + cached_split_file = cache_dir / "datasplits.arff" try: split = OpenMLSplit._from_arff_file(cached_split_file) @@ -172,6 +184,7 @@ def download_split(self) -> OpenMLSplit: return split def get_split_dimensions(self) -> tuple[int, int, int]: + """Get the (repeats, folds, samples) of the split for a given task.""" if self.split is None: self.split = self.download_split() @@ -180,21 +193,21 @@ def get_split_dimensions(self) -> tuple[int, int, int]: def _to_dict(self) -> OrderedDict[str, OrderedDict]: """Creates a dictionary representation of self.""" task_container = OrderedDict() # type: OrderedDict[str, OrderedDict] - task_dict = OrderedDict( + task_dict: OrderedDict[str, list | str | int] = OrderedDict( [("@xmlns:oml", "http://openml.org/openml")], - ) # type: OrderedDict[str, Union[List, str, int]] + ) task_container["oml:task_inputs"] = task_dict task_dict["oml:task_type_id"] = self.task_type_id.value # having task_inputs and adding a type annotation # solves wrong warnings - task_inputs = [ + task_inputs: list[OrderedDict] = [ OrderedDict([("@name", "source_data"), ("#text", str(self.dataset_id))]), OrderedDict( [("@name", "estimation_procedure"), ("#text", str(self.estimation_procedure_id))], ), - ] # type: List[OrderedDict] + ] if self.evaluation_measure is not None: task_inputs.append( @@ -237,7 +250,7 @@ class OpenMLSupervisedTask(OpenMLTask, ABC): Refers to the unique identifier of task. """ - def __init__( + def __init__( # noqa: PLR0913 self, task_type_id: TaskType, task_type: str, @@ -264,6 +277,7 @@ def __init__( self.target_name = target_name + # TODO(eddiebergman): type with overload? def get_X_and_y( self, dataset_format: str = "array", @@ -319,11 +333,13 @@ def _to_dict(self) -> OrderedDict[str, OrderedDict]: @property def estimation_parameters(self): - warn( + """Return the estimation parameters for the task.""" + warnings.warn( "The estimation_parameters attribute will be " "deprecated in the future, please use " "estimation_procedure['parameters'] instead", PendingDeprecationWarning, + stacklevel=2, ) return self.estimation_procedure["parameters"] @@ -363,7 +379,7 @@ class OpenMLClassificationTask(OpenMLSupervisedTask): A cost matrix (for classification tasks). """ - def __init__( + def __init__( # noqa: PLR0913 self, task_type_id: TaskType, task_type: str, @@ -424,7 +440,7 @@ class OpenMLRegressionTask(OpenMLSupervisedTask): Evaluation measure used in the Regression task. """ - def __init__( + def __init__( # noqa: PLR0913 self, task_type_id: TaskType, task_type: str, @@ -479,7 +495,7 @@ class OpenMLClusteringTask(OpenMLTask): feature set for the clustering task. """ - def __init__( + def __init__( # noqa: PLR0913 self, task_type_id: TaskType, task_type: str, @@ -581,7 +597,7 @@ class OpenMLLearningCurveTask(OpenMLClassificationTask): Cost matrix for Learning Curve tasks. """ - def __init__( + def __init__( # noqa: PLR0913 self, task_type_id: TaskType, task_type: str, diff --git a/openml/testing.py b/openml/testing.py index 1db868967..5db8d6bb7 100644 --- a/openml/testing.py +++ b/openml/testing.py @@ -9,7 +9,8 @@ import shutil import time import unittest -from typing import Dict, List, Optional, Tuple, Union, cast # noqa: F401 +from pathlib import Path +from typing import ClassVar import pandas as pd import requests @@ -37,15 +38,16 @@ class TestBase(unittest.TestCase): Hopefully soon allows using a test server, not the production server. """ - publish_tracker = { + # TODO: This could be made more explcit with a TypedDict instead of list[str | int] + publish_tracker: ClassVar[dict[str, list[str | int]]] = { "run": [], "data": [], "flow": [], "task": [], "study": [], "user": [], - } # type: Dict[str, List[int]] - flow_name_tracker = [] # type: List[str] + } + flow_name_tracker: ClassVar[list[str]] = [] test_server = "https://test.openml.org/api/v1/xml" # amueller's read/write key that he will throw away later apikey = "610344db6388d9ba34f6db45a3cf71de" @@ -75,26 +77,26 @@ def setUp(self, n_levels: int = 1) -> None: # cache self.maxDiff = None self.static_cache_dir = None - abspath_this_file = os.path.abspath(inspect.getfile(self.__class__)) - static_cache_dir = os.path.dirname(abspath_this_file) + abspath_this_file = Path(inspect.getfile(self.__class__)).absolute() + static_cache_dir = abspath_this_file.parent for _ in range(n_levels): - static_cache_dir = os.path.abspath(os.path.join(static_cache_dir, "..")) + static_cache_dir = static_cache_dir.parent.absolute() content = os.listdir(static_cache_dir) if "files" in content: - self.static_cache_dir = os.path.join(static_cache_dir, "files") + self.static_cache_dir = static_cache_dir / "files" if self.static_cache_dir is None: raise ValueError( f"Cannot find test cache dir, expected it to be {static_cache_dir}!", ) - self.cwd = os.getcwd() - workdir = os.path.dirname(os.path.abspath(__file__)) + self.cwd = Path.cwd() + workdir = Path(__file__).parent.absolute() tmp_dir_name = self.id() - self.workdir = os.path.join(workdir, tmp_dir_name) + self.workdir = workdir / tmp_dir_name shutil.rmtree(self.workdir, ignore_errors=True) - os.mkdir(self.workdir) + self.workdir.mkdir(exist_ok=True) os.chdir(self.workdir) self.cached = True @@ -102,7 +104,7 @@ def setUp(self, n_levels: int = 1) -> None: self.production_server = "https://openml.org/api/v1/xml" openml.config.server = TestBase.test_server openml.config.avoid_duplicate_runs = False - openml.config.set_root_cache_directory(self.workdir) + openml.config.set_root_cache_directory(str(self.workdir)) # Increase the number of retries to avoid spurious server failures self.retry_policy = openml.config.retry_policy @@ -110,22 +112,22 @@ def setUp(self, n_levels: int = 1) -> None: openml.config.set_retry_policy("robot", n_retries=20) def tearDown(self) -> None: + """Tear down the test""" os.chdir(self.cwd) try: shutil.rmtree(self.workdir) - except PermissionError: - if os.name == "nt": + except PermissionError as e: + if os.name != "nt": # one of the files may still be used by another process - pass - else: - raise + raise e + openml.config.server = self.production_server openml.config.connection_n_retries = self.connection_n_retries openml.config.retry_policy = self.retry_policy @classmethod def _mark_entity_for_removal( - self, + cls, entity_type: str, entity_id: int, entity_name: str | None = None, @@ -143,10 +145,10 @@ def _mark_entity_for_removal( TestBase.publish_tracker[entity_type].append(entity_id) if isinstance(entity_type, openml.flows.OpenMLFlow): assert entity_name is not None - self.flow_name_tracker.append(entity_name) + cls.flow_name_tracker.append(entity_name) @classmethod - def _delete_entity_from_tracker(self, entity_type: str, entity: int) -> None: + def _delete_entity_from_tracker(cls, entity_type: str, entity: int) -> None: """Deletes entity records from the static file_tracker Given an entity type and corresponding ID, deletes all entries, including @@ -176,7 +178,7 @@ def _get_sentinel(self, sentinel: str | None = None) -> str: # Create a unique prefix for the flow. Necessary because the flow # is identified by its name and external version online. Having a # unique name allows us to publish the same flow in each test run. - md5 = hashlib.md5() + md5 = hashlib.md5() # noqa: S324 md5.update(str(time.time()).encode("utf-8")) md5.update(str(os.getpid()).encode("utf-8")) sentinel = md5.hexdigest()[:10] @@ -201,7 +203,7 @@ def _add_sentinel_to_flow_name( def _check_dataset(self, dataset: dict[str, str | int]) -> None: _check_dataset(dataset) - assert type(dataset) == dict + assert isinstance(dataset, dict) assert len(dataset) >= 2 assert "did" in dataset assert isinstance(dataset["did"], int) @@ -209,11 +211,12 @@ def _check_dataset(self, dataset: dict[str, str | int]) -> None: assert isinstance(dataset["status"], str) assert dataset["status"] in ["in_preparation", "active", "deactivated"] - def _check_fold_timing_evaluations( + def _check_fold_timing_evaluations( # noqa: PLR0913 self, fold_evaluations: dict[str, dict[int, dict[int, float]]], num_repeats: int, num_folds: int, + *, max_time_allowed: float = 60000.0, task_type: TaskType = TaskType.SUPERVISED_CLASSIFICATION, check_scores: bool = True, @@ -284,9 +287,10 @@ def check_task_existence( """ return_val = None tasks = openml.tasks.list_tasks(task_type=task_type, output_format="dataframe") + assert isinstance(tasks, pd.DataFrame) if len(tasks) == 0: return None - tasks = cast(pd.DataFrame, tasks).loc[tasks["did"] == dataset_id] + tasks = tasks.loc[tasks["did"] == dataset_id] if len(tasks) == 0: return None tasks = tasks.loc[tasks["target_feature"] == target_name] @@ -334,7 +338,7 @@ def create_request_response( status_code: int, content_filepath: pathlib.Path, ) -> requests.Response: - with open(content_filepath) as xml_response: + with content_filepath.open("r") as xml_response: response_body = xml_response.read() response = requests.Response() diff --git a/openml/utils.py b/openml/utils.py index d3fafe460..a838cb00b 100644 --- a/openml/utils.py +++ b/openml/utils.py @@ -1,13 +1,14 @@ # License: BSD 3-Clause from __future__ import annotations -import collections import contextlib -import os import shutil import warnings +from collections import OrderedDict from functools import wraps -from typing import TYPE_CHECKING +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Mapping, TypeVar +from typing_extensions import Literal, ParamSpec import pandas as pd import xmltodict @@ -22,6 +23,9 @@ if TYPE_CHECKING: from openml.base import OpenMLBase + P = ParamSpec("P") + R = TypeVar("R") + oslo_installed = False try: # Currently, importing oslo raises a lot of warning that it will stop working @@ -35,7 +39,12 @@ pass -def extract_xml_tags(xml_tag_name, node, allow_none=True): +def extract_xml_tags( + xml_tag_name: str, + node: Mapping[str, Any], + *, + allow_none: bool = True, +) -> Any | None: """Helper to extract xml tags from xmltodict. Parameters @@ -43,7 +52,7 @@ def extract_xml_tags(xml_tag_name, node, allow_none=True): xml_tag_name : str Name of the xml tag to extract from the node. - node : object + node : Mapping[str, Any] Node object returned by ``xmltodict`` from which ``xml_tag_name`` should be extracted. @@ -56,21 +65,17 @@ def extract_xml_tags(xml_tag_name, node, allow_none=True): object """ if xml_tag_name in node and node[xml_tag_name] is not None: - if isinstance(node[xml_tag_name], dict): - rval = [node[xml_tag_name]] - elif isinstance(node[xml_tag_name], str): - rval = [node[xml_tag_name]] - elif isinstance(node[xml_tag_name], list): - rval = node[xml_tag_name] - else: - raise ValueError("Received not string and non list as tag item") + if isinstance(node[xml_tag_name], (dict, str)): + return [node[xml_tag_name]] + if isinstance(node[xml_tag_name], list): + return node[xml_tag_name] - return rval - else: - if allow_none: - return None - else: - raise ValueError(f"Could not find tag '{xml_tag_name}' in node '{node!s}'") + raise ValueError("Received not string and non list as tag item") + + if allow_none: + return None + + raise ValueError(f"Could not find tag '{xml_tag_name}' in node '{node!s}'") def _get_rest_api_type_alias(oml_object: OpenMLBase) -> str: @@ -90,12 +95,12 @@ def _get_rest_api_type_alias(oml_object: OpenMLBase) -> str: return api_type_alias -def _tag_openml_base(oml_object: OpenMLBase, tag: str, untag: bool = False): +def _tag_openml_base(oml_object: OpenMLBase, tag: str, untag: bool = False) -> None: # noqa: FBT api_type_alias = _get_rest_api_type_alias(oml_object) - _tag_entity(api_type_alias, oml_object.id, tag, untag) + _tag_entity(api_type_alias, oml_object.id, tag, untag=untag) -def _tag_entity(entity_type, entity_id, tag, untag=False) -> list[str]: +def _tag_entity(entity_type, entity_id, tag, *, untag: bool = False) -> list[str]: """ Function that tags or untags a given entity on OpenML. As the OpenML API tag functions all consist of the same format, this function covers @@ -138,12 +143,13 @@ def _tag_entity(entity_type, entity_id, tag, untag=False) -> list[str]: if "oml:tag" in result: return result["oml:tag"] - else: - # no tags, return empty list - return [] + + # no tags, return empty list + return [] -def _delete_entity(entity_type, entity_id): +# TODO(eddiebergman): Maybe this can be made more specific with a Literal +def _delete_entity(entity_type: str, entity_id: int) -> bool: """ Function that deletes a given entity on OpenML. As the OpenML API tag functions all consist of the same format, this function covers @@ -213,7 +219,17 @@ def _delete_entity(entity_type, entity_id): raise -def _list_all(listing_call, output_format="dict", *args, **filters): +# TODO(eddiebergman): Add `@overload` typing for output_format +# NOTE: Impossible to type `listing_call` properly on the account of the output format, +# might be better to use an iterator here instead and concatenate at the use point +# NOTE: The obect output_format, the return type of `listing_call` is expected to be `Sized` +# to have `len()` be callable on it. +def _list_all( # noqa: C901, PLR0912 + listing_call: Callable[P, Any], + output_format: Literal["dict", "dataframe", "object"] = "dict", + *args: P.args, + **filters: P.kwargs, +) -> OrderedDict | pd.DataFrame: """Helper to handle paged listing requests. Example usage: @@ -242,31 +258,28 @@ def _list_all(listing_call, output_format="dict", *args, **filters): # eliminate filters that have a None value active_filters = {key: value for key, value in filters.items() if value is not None} page = 0 - result = collections.OrderedDict() + result = OrderedDict() if output_format == "dataframe": result = pd.DataFrame() # Default batch size per paging. # This one can be set in filters (batch_size), but should not be # changed afterwards. The derived batch_size can be changed. - BATCH_SIZE_ORIG = 10000 - if "batch_size" in active_filters: - BATCH_SIZE_ORIG = active_filters["batch_size"] - del active_filters["batch_size"] + BATCH_SIZE_ORIG = active_filters.pop("batch_size", 10000) + if not isinstance(BATCH_SIZE_ORIG, int): + raise ValueError(f"'batch_size' should be an integer but got {BATCH_SIZE_ORIG}") # max number of results to be shown - LIMIT = None - offset = 0 - if "size" in active_filters: - LIMIT = active_filters["size"] - del active_filters["size"] + LIMIT = active_filters.pop("size", None) + if LIMIT is not None and not isinstance(LIMIT, int): + raise ValueError(f"'limit' should be an integer but got {LIMIT}") if LIMIT is not None and BATCH_SIZE_ORIG > LIMIT: BATCH_SIZE_ORIG = LIMIT - if "offset" in active_filters: - offset = active_filters["offset"] - del active_filters["offset"] + offset = active_filters.pop("offset", 0) + if not isinstance(offset, int): + raise ValueError(f"'offset' should be an integer but got {offset}") batch_size = BATCH_SIZE_ORIG while True: @@ -274,14 +287,14 @@ def _list_all(listing_call, output_format="dict", *args, **filters): current_offset = offset + BATCH_SIZE_ORIG * page new_batch = listing_call( *args, - limit=batch_size, - offset=current_offset, - output_format=output_format, - **active_filters, + output_format=output_format, # type: ignore + **{**active_filters, "limit": batch_size, "offset": current_offset}, ) except openml.exceptions.OpenMLServerNoResult: # we want to return an empty dict in this case + # NOTE: This may not actually happen, but we could just return here to enforce it... break + if output_format == "dataframe": if len(result) == 0: result = new_batch @@ -290,8 +303,10 @@ def _list_all(listing_call, output_format="dict", *args, **filters): else: # For output_format = 'dict' or 'object' result.update(new_batch) + if len(new_batch) < batch_size: break + page += 1 if LIMIT is not None: # check if the number of required results has been achieved @@ -299,6 +314,7 @@ def _list_all(listing_call, output_format="dict", *args, **filters): # in case of bugs to prevent infinite loops if len(result) >= LIMIT: break + # check if there are enough results to fulfill a batch if LIMIT - len(result) < BATCH_SIZE_ORIG: batch_size = LIMIT - len(result) @@ -306,31 +322,29 @@ def _list_all(listing_call, output_format="dict", *args, **filters): return result -def _get_cache_dir_for_key(key): - cache = config.get_cache_directory() - return os.path.join(cache, key) +def _get_cache_dir_for_key(key: str) -> Path: + return Path(config.get_cache_directory()) / key def _create_cache_directory(key): cache_dir = _get_cache_dir_for_key(key) try: - os.makedirs(cache_dir, exist_ok=True) - except Exception as e: + cache_dir.mkdir(exist_ok=True, parents=True) + except Exception as e: # noqa: BLE001 raise openml.exceptions.OpenMLCacheException( - f"Cannot create cache directory {cache_dir}.", + f"Cannot create cache directory {cache_dir}." ) from e return cache_dir -def _get_cache_dir_for_id(key, id_, create=False): +def _get_cache_dir_for_id(key: str, id_: int, create: bool = False) -> Path: # noqa: FBT cache_dir = _create_cache_directory(key) if create else _get_cache_dir_for_key(key) - - return os.path.join(cache_dir, str(id_)) + return Path(cache_dir) / str(id_) -def _create_cache_directory_for_id(key, id_): +def _create_cache_directory_for_id(key: str, id_: int) -> Path: """Create the cache directory for a specific ID In order to have a clearer cache structure and because every task @@ -348,20 +362,18 @@ def _create_cache_directory_for_id(key, id_): Returns ------- - str + cache_dir : Path Path of the created dataset cache directory. """ cache_dir = _get_cache_dir_for_id(key, id_, create=True) - if os.path.isdir(cache_dir): - pass - elif os.path.exists(cache_dir): + if cache_dir.exists() and not cache_dir.is_dir(): raise ValueError("%s cache dir exists but is not a directory!" % key) - else: - os.makedirs(cache_dir) + + cache_dir.mkdir(exist_ok=True, parents=True) return cache_dir -def _remove_cache_dir_for_id(key, cache_dir): +def _remove_cache_dir_for_id(key: str, cache_dir: Path) -> None: """Remove the task cache directory This function is NOT thread/multiprocessing safe. @@ -374,10 +386,10 @@ def _remove_cache_dir_for_id(key, cache_dir): """ try: shutil.rmtree(cache_dir) - except OSError: + except OSError as e: raise ValueError( - f"Cannot remove faulty {key} cache directory {cache_dir}." "Please do this manually!", - ) + f"Cannot remove faulty {key} cache directory {cache_dir}. Please do this manually!", + ) from e def thread_safe_if_oslo_installed(func): @@ -401,12 +413,13 @@ def safe_func(*args, **kwargs): return func(*args, **kwargs) return safe_func - else: - return func + + return func -def _create_lockfiles_dir(): - dir = os.path.join(config.get_cache_directory(), "locks") +def _create_lockfiles_dir() -> Path: + path = Path(config.get_cache_directory()) / "locks" + # TODO(eddiebergman): Not sure why this is allowed to error and ignore??? with contextlib.suppress(OSError): - os.makedirs(dir) - return dir + path.mkdir(exist_ok=True, parents=True) + return path diff --git a/pyproject.toml b/pyproject.toml index becc1e57c..ed854e5b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -129,6 +129,7 @@ target-version = "py37" line-length = 100 show-source = true src = ["openml", "tests", "examples"] +unsafe-fixes = true # Allow unused variables when underscore-prefixed. dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" @@ -192,6 +193,7 @@ ignore = [ "PLC1901", # "" can be simplified to be falsey "TCH003", # Move stdlib import into TYPE_CHECKING "COM812", # Trailing comma missing (handled by linter, ruff recommend disabling if using formatter) + "N803", # Argument should be lowercase (but we accept things like `X`) # TODO(@eddibergman): These should be enabled "D100", # Missing docstring in public module @@ -209,6 +211,9 @@ ignore = [ ] exclude = [ + # TODO(eddiebergman): Tests should be re-enabled after the refactor + "tests", + # ".bzr", ".direnv", ".eggs", @@ -306,7 +311,10 @@ warn_return_any = true [[tool.mypy.overrides]] module = ["tests.*"] -disallow_untyped_defs = false # Sometimes we just want to ignore verbose types -disallow_untyped_decorators = false # Test decorators are not properly typed -disallow_incomplete_defs = false # Sometimes we just want to ignore verbose types -disable_error_code = ["var-annotated"] + +# TODO(eddiebergman): This should be re-enabled after tests get refactored +ignore_errors = true +#disallow_untyped_defs = false # Sometimes we just want to ignore verbose types +#disallow_untyped_decorators = false # Test decorators are not properly typed +#disallow_incomplete_defs = false # Sometimes we just want to ignore verbose types +#disable_error_code = ["var-annotated"] diff --git a/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py b/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py index 664076239..44612ca61 100644 --- a/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py +++ b/tests/test_extensions/test_sklearn_extension/test_sklearn_extension.py @@ -1728,7 +1728,7 @@ def test_run_model_on_fold_classification_1_array(self): assert np.any(y_hat_proba.iloc[:, i].to_numpy() != np.zeros(y_test.shape)) # check user defined measures - fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + fold_evaluations: dict[str, dict[int, dict[int, float]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) for measure in user_defined_measures: fold_evaluations[measure][0][0] = user_defined_measures[measure] @@ -1801,7 +1801,7 @@ def test_run_model_on_fold_classification_1_dataframe(self): assert np.any(y_hat_proba.iloc[:, i].to_numpy() != np.zeros(y_test.shape)) # check user defined measures - fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + fold_evaluations: dict[str, dict[int, dict[int, float]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) for measure in user_defined_measures: fold_evaluations[measure][0][0] = user_defined_measures[measure] @@ -1854,7 +1854,7 @@ def test_run_model_on_fold_classification_2(self): assert np.any(y_hat_proba.to_numpy()[:, i] != np.zeros(y_test.shape)) # check user defined measures - fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + fold_evaluations: dict[str, dict[int, dict[int, float]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) for measure in user_defined_measures: fold_evaluations[measure][0][0] = user_defined_measures[measure] @@ -1976,7 +1976,7 @@ def test_run_model_on_fold_regression(self): assert y_hat_proba is None # check user defined measures - fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + fold_evaluations: dict[str, dict[int, dict[int, float]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) for measure in user_defined_measures: fold_evaluations[measure][0][0] = user_defined_measures[measure] @@ -2019,7 +2019,7 @@ def test_run_model_on_fold_clustering(self): assert y_hat_proba is None # check user defined measures - fold_evaluations = collections.defaultdict(lambda: collections.defaultdict(dict)) + fold_evaluations: dict[str, dict[int, dict[int, float]]] = collections.defaultdict(lambda: collections.defaultdict(dict)) for measure in user_defined_measures: fold_evaluations[measure][0][0] = user_defined_measures[measure] diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 4a730a611..d36935b17 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -552,7 +552,12 @@ def determine_grid_size(param_grid): ) # todo: check if runtime is present - self._check_fold_timing_evaluations(run.fold_evaluations, 1, num_folds, task_type=task_type) + self._check_fold_timing_evaluations( + fold_evaluations=run.fold_evaluations, + num_repeats=1, + num_folds=num_folds, + task_type=task_type + ) # Check if run string and print representation do not run into an error # The above check already verifies that all columns needed for supported @@ -1353,9 +1358,9 @@ def test__run_task_get_arffcontent(self): task_type = TaskType.SUPERVISED_CLASSIFICATION self._check_fold_timing_evaluations( - fold_evaluations, - num_repeats, - num_folds, + fold_evaluations=fold_evaluations, + num_repeats=num_repeats, + num_folds=num_folds, task_type=task_type, ) diff --git a/tests/test_tasks/test_split.py b/tests/test_tasks/test_split.py index b49dd77af..12cb632d9 100644 --- a/tests/test_tasks/test_split.py +++ b/tests/test_tasks/test_split.py @@ -3,6 +3,7 @@ import inspect import os +from pathlib import Path import numpy as np @@ -18,18 +19,17 @@ def setUp(self): __file__ = inspect.getfile(OpenMLSplitTest) self.directory = os.path.dirname(__file__) # This is for dataset - self.arff_filename = os.path.join( - self.directory, - "..", - "files", - "org", - "openml", - "test", - "tasks", - "1882", - "datasplits.arff", + self.arff_filepath = ( + Path(self.directory).parent + / "files" + / "org" + / "openml" + / "test" + / "tasks" + / "1882" + / "datasplits.arff" ) - self.pd_filename = self.arff_filename.replace(".arff", ".pkl.py3") + self.pd_filename = self.arff_filepath.with_suffix(".pkl.py3") def tearDown(self): try: @@ -39,27 +39,27 @@ def tearDown(self): pass def test_eq(self): - split = OpenMLSplit._from_arff_file(self.arff_filename) + split = OpenMLSplit._from_arff_file(self.arff_filepath) assert split == split - split2 = OpenMLSplit._from_arff_file(self.arff_filename) + split2 = OpenMLSplit._from_arff_file(self.arff_filepath) split2.name = "a" assert split != split2 - split2 = OpenMLSplit._from_arff_file(self.arff_filename) + split2 = OpenMLSplit._from_arff_file(self.arff_filepath) split2.description = "a" assert split != split2 - split2 = OpenMLSplit._from_arff_file(self.arff_filename) + split2 = OpenMLSplit._from_arff_file(self.arff_filepath) split2.split[10] = {} assert split != split2 - split2 = OpenMLSplit._from_arff_file(self.arff_filename) + split2 = OpenMLSplit._from_arff_file(self.arff_filepath) split2.split[0][10] = {} assert split != split2 def test_from_arff_file(self): - split = OpenMLSplit._from_arff_file(self.arff_filename) + split = OpenMLSplit._from_arff_file(self.arff_filepath) assert isinstance(split.split, dict) assert isinstance(split.split[0], dict) assert isinstance(split.split[0][0], dict) @@ -78,7 +78,7 @@ def test_from_arff_file(self): ) def test_get_split(self): - split = OpenMLSplit._from_arff_file(self.arff_filename) + split = OpenMLSplit._from_arff_file(self.arff_filepath) train_split, test_split = split.get(fold=5, repeat=2) assert train_split.shape[0] == 808 assert test_split.shape[0] == 90