diff --git a/autoPyTorch/__version__.py b/autoPyTorch/__version__.py index 94b9a71f5..36509b4a7 100644 --- a/autoPyTorch/__version__.py +++ b/autoPyTorch/__version__.py @@ -1,4 +1,4 @@ """Version information.""" # The following line *must* be the last in the module, exactly as formatted: -__version__ = "0.2" +__version__ = "0.2.1" diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index c5468eae7..db048f00a 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -52,7 +52,6 @@ ) from autoPyTorch.ensemble.ensemble_builder import EnsembleBuilderManager from autoPyTorch.ensemble.singlebest_ensemble import SingleBest -from autoPyTorch.evaluation.abstract_evaluator import fit_and_suppress_warnings from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash from autoPyTorch.evaluation.utils import DisableFileOutputParameters from autoPyTorch.optimizer.smbo import AutoMLSMBO @@ -69,6 +68,7 @@ start_log_server, ) from autoPyTorch.utils.parallel import preload_modules +from autoPyTorch.utils.parallel_model_runner import run_models_on_dataset from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements from autoPyTorch.utils.results_manager import MetricResults, ResultsManager, SearchResults from autoPyTorch.utils.results_visualizer import ColorLabelSettings, PlotSettingParams, ResultsVisualizer @@ -443,14 +443,14 @@ def ensemble_performance_history(self) -> List[Dict[str, Any]]: def trajectory(self) -> Optional[List]: return self._results_manager.trajectory - def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None: + def set_pipeline_options(self, **pipeline_options_kwargs: Any) -> None: """ Check whether arguments are valid and then sets them to the current pipeline configuration. Args: - **pipeline_config_kwargs: Valid config options include "num_run", + **pipeline_options_kwargs: Valid config options include "num_run", "device", "budget_type", "epochs", "runtime", "torch_num_threads", "early_stopping", "use_tensorboard_logger", "metrics_during_training" @@ -459,7 +459,7 @@ def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None: None """ unknown_keys = [] - for option, value in pipeline_config_kwargs.items(): + for option, value in pipeline_options_kwargs.items(): if option in self.pipeline_options.keys(): pass else: @@ -470,7 +470,7 @@ def set_pipeline_config(self, **pipeline_config_kwargs: Any) -> None: " expected arguments to be in {}". format(unknown_keys, self.pipeline_options.keys())) - self.pipeline_options.update(pipeline_config_kwargs) + self.pipeline_options.update(pipeline_options_kwargs) def get_pipeline_options(self) -> dict: """ @@ -634,7 +634,9 @@ def _close_dask_client(self) -> None: self._is_dask_client_internally_created = False del self._is_dask_client_internally_created - def _load_models(self) -> bool: + def _load_models( + self, + ) -> bool: """ Loads the models saved in the temporary directory @@ -645,6 +647,7 @@ def _load_models(self) -> bool: """ if self.resampling_strategy is None: raise ValueError("Resampling strategy is needed to determine what models to load") + self.ensemble_ = self._backend.load_ensemble(self.seed) # If no ensemble is loaded, try to get the best performing model @@ -799,113 +802,37 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: assert self._dask_client is not None self._logger.info("Starting to create traditional classifier predictions.") - starttime = time.time() # Initialise run history for the traditional classifiers - run_history = RunHistory() memory_limit = self._memory_limit if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) available_classifiers = get_available_traditional_learners() - dask_futures = [] - - total_number_classifiers = len(available_classifiers) - for n_r, classifier in enumerate(available_classifiers): - - # Only launch a task if there is time - start_time = time.time() - if time_left >= func_eval_time_limit_secs: - self._logger.info(f"{n_r}: Started fitting {classifier} with cutoff={func_eval_time_limit_secs}") - scenario_mock = unittest.mock.Mock() - scenario_mock.wallclock_limit = time_left - # This stats object is a hack - maybe the SMAC stats object should - # already be generated here! - stats = Stats(scenario_mock) - stats.start_timing() - ta = ExecuteTaFuncWithQueue( - pynisher_context=self._multiprocessing_context, - backend=self._backend, - seed=self.seed, - multi_objectives=["cost"], - metric=self._metric, - logger_port=self._logger_port, - cost_for_crash=get_cost_of_crash(self._metric), - abort_on_first_run_crash=False, - initial_num_run=self._backend.get_next_num_run(), - stats=stats, - memory_limit=memory_limit, - disable_file_output=self._disable_file_output, - all_supported_metrics=self._all_supported_metrics, - ) - dask_futures.append([ - classifier, - self._dask_client.submit( - ta.run, config=classifier, - cutoff=func_eval_time_limit_secs, - ) - ]) - - # When managing time, we need to take into account the allocated time resources, - # which are dependent on the number of cores. 'dask_futures' is a proxy to the number - # of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most - # 4 task in parallel. Every 'cutoff' seconds, we generate up to 4 tasks. - # If we only have 4 workers and there are 4 futures in dask_futures, it means that every - # worker has a task. We would not like to launch another job until a worker is available. To this - # end, the following if-statement queries the number of active jobs, and forces to wait for a job - # completion via future.result(), so that a new worker is available for the next iteration. - if len(dask_futures) >= self.n_jobs: - - # How many workers to wait before starting fitting the next iteration - workers_to_wait = 1 - if n_r >= total_number_classifiers - 1 or time_left <= func_eval_time_limit_secs: - # If on the last iteration, flush out all tasks - workers_to_wait = len(dask_futures) - - while workers_to_wait >= 1: - workers_to_wait -= 1 - # We launch dask jobs only when there are resources available. - # This allow us to control time allocation properly, and early terminate - # the traditional machine learning pipeline - cls, future = dask_futures.pop(0) - status, cost, runtime, additional_info = future.result() - if status == StatusType.SUCCESS: - self._logger.info( - "Fitting {} took {} [sec] and got performance: {}.\n" - "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) - ) - configuration = additional_info['pipeline_configuration'] - origin = additional_info['configuration_origin'] - additional_info.pop('pipeline_configuration') - run_history.add(config=configuration, cost=cost, - time=runtime, status=status, seed=self.seed, - starttime=starttime, endtime=starttime + runtime, - origin=origin, additional_info=additional_info) - else: - if additional_info.get('exitcode') == -6: - self._logger.error( - "Traditional prediction for {} failed with run state {},\n" - "because the provided memory limits were too tight.\n" - "Please increase the 'ml_memory_limit' and try again.\n" - "If you still get the problem, please open an issue\n" - "and paste the additional info.\n" - "Additional info:\n{}".format(cls, str(status), dict_repr(additional_info)) - ) - else: - self._logger.error( - "Traditional prediction for {} failed with run state {}.\nAdditional info:\n{}".format( - cls, str(status), dict_repr(additional_info) - ) - ) - - # In the case of a serial execution, calling submit halts the run for a resource - # dynamically adjust time in this case - time_left -= int(time.time() - start_time) - - # Exit if no more time is available for a new classifier - if time_left < func_eval_time_limit_secs: - self._logger.warning("Not enough time to fit all traditional machine learning models." - "Please consider increasing the run time to further improve performance.") - break + model_configs = [(classifier, self.pipeline_options[self.pipeline_options['budget_type']]) + for classifier in available_classifiers] + + run_history = run_models_on_dataset( + time_left=time_left, + func_eval_time_limit_secs=func_eval_time_limit_secs, + model_configs=model_configs, + logger=self._logger, + logger_port=self._logger_port, + metric=self._metric, + dask_client=self._dask_client, + backend=self._backend, + memory_limit=memory_limit, + disable_file_output=self._disable_file_output, + all_supported_metrics=self._all_supported_metrics, + include=self.include_components, + exclude=self.exclude_components, + search_space_updates=self.search_space_updates, + pipeline_options=self.pipeline_options, + seed=self.seed, + multiprocessing_context=self._multiprocessing_context, + n_jobs=self.n_jobs, + current_search_space=self.search_space, + initial_num_run=self._backend.get_next_num_run() + ) self._logger.debug("Run history traditional: {}".format(run_history)) # add run history of traditional to api run history @@ -1272,7 +1199,7 @@ def _search( all_supported_metrics=self._all_supported_metrics, smac_scenario_args=smac_scenario_args, get_smac_object_callback=get_smac_object_callback, - pipeline_config=self.pipeline_options, + pipeline_options=self.pipeline_options, min_budget=min_budget, max_budget=max_budget, ensemble_callback=proc_ensemble, @@ -1378,38 +1305,144 @@ def _get_fit_dictionary( def refit( self, - dataset: BaseDataset, - split_id: int = 0 + dataset: Optional[BaseDataset] = None, + X_train: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + y_train: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + X_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + y_test: Optional[Union[List, pd.DataFrame, np.ndarray]] = None, + dataset_name: Optional[str] = None, + resampling_strategy: ResamplingStrategies = NoResamplingStrategyTypes.no_resampling, + resampling_strategy_args: Optional[Dict[str, Any]] = None, + total_walltime_limit: int = 120, + run_time_limit_secs: int = 60, + memory_limit: Optional[int] = None, + eval_metric: Optional[str] = None, + all_supported_metrics: bool = False, + budget_type: Optional[str] = None, + budget: Optional[float] = None, + pipeline_options: Optional[Dict] = None, + disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, ) -> "BaseTask": """ - Refit all models found with fit to new data. - - Necessary when using cross-validation. During training, autoPyTorch - fits each model k times on the dataset, but does not keep any trained - model and can therefore not be used to predict for new data points. - This methods fits all models found during a call to fit on the data - given. This method may also be used together with holdout to avoid - only using 66% of the training data to fit the final model. + Fit all the models found in the ensemble on the whole training set X_train. + Therefore, we recommend using `NoResamplingStrategy` to be able to do that. Nevertheless, it + is still able to fit using other splitting techniques such as hold out or cross validation. - Refit uses the estimator pipeline_config attribute, which the user - can interact via the get_pipeline_config()/set_pipeline_config() + Refit uses the estimator pipeline_options attribute, which the user + can interact via the get_pipeline_options()/set_pipeline_options() methods. Args: - dataset (Dataset): - The argument that will provide the dataset splits. It can either - be a dictionary with the splits, or the dataset object which can - generate the splits based on different restrictions. - split_id (int): - split id to fit on. + dataset (BaseDataset): + An object of the appropriate child class of `BaseDataset`, + that will be used to fit the pipeline + X_train, y_train, X_test, y_test: Union[np.ndarray, List, pd.DataFrame] + A pair of features (X_train) and targets (y_train) used to fit a + pipeline. Additionally, a holdout of this pairs (X_test, y_test) can + be provided to track the generalization performance of each stage. + dataset_name (Optional[str]): + Name of the dataset, if None, random value is used. + resampling_strategy (ResamplingStrategies): + Strategy to split the training data. Defaults to + NoResamplingStrategyTypes.no_resampling. + resampling_strategy_args (Optional[Dict[str, Any]]): + Arguments required for the chosen resampling strategy. If None, uses + the default values provided in DEFAULT_RESAMPLING_PARAMETERS + in ```datasets/resampling_strategy.py```. + dataset_name (Optional[str]): + name of the dataset, used as experiment name. + total_walltime_limit (int): + Total time that can be used by all the models to be refitted. Defaults to 120. + run_time_limit_secs (int: default=60): + Time limit for a single call to the machine learning model. + Model fitting will be terminated if the machine learning algorithm + runs over the time limit. Set this value high enough so that + typical machine learning algorithms can be fit on the training + data. + memory_limit (Optional[int]): + Memory limit in MB for the machine learning algorithm. autopytorch + will stop fitting the machine learning algorithm if it tries + to allocate more than memory_limit MB. If None is provided, + no memory limit is set. In case of multi-processing, memory_limit + will be per job. This memory limit also applies to the ensemble + creation process. + eval_metric (Optional[str]): + Name of the metric that is used to evaluate a pipeline. + all_supported_metrics (bool: default=True): + if True, all metrics supporting current task will be calculated + for each pipeline and results will be available via cv_results + budget_type (str): + Type of budget to be used when fitting the pipeline. + It can be one of: + + + `epochs`: The training of each pipeline will be terminated after + a number of epochs have passed. This number of epochs is determined by the + budget argument of this method. + + `runtime`: The training of each pipeline will be terminated after + a number of seconds have passed. This number of seconds is determined by the + budget argument of this method. The overall fitting time of a pipeline is + controlled by func_eval_time_limit_secs. 'runtime' only controls the allocated + time to train a pipeline, but it does not consider the overall time it takes + to create a pipeline (data loading and preprocessing, other i/o operations, etc.). + budget (Optional[float]): + Budget to fit a single run of the pipeline. If not + provided, uses the default in the pipeline config + pipeline_options (Optional[Dict]): + Valid config options include "device", + "torch_num_threads", "early_stopping", "use_tensorboard_logger", + "metrics_during_training" + disable_file_output (Optional[List[Union[str, DisableFileOutputParameters]]]): + Used as a list to pass more fine-grained + information on what to save. Must be a member of `DisableFileOutputParameters`. + Allowed elements in the list are: + + + `y_optimization`: + do not save the predictions for the optimization set, + which would later on be used to build an ensemble. Note that SMAC + optimizes a metric evaluated on the optimization set. + + `pipeline`: + do not save any individual pipeline files + + `pipelines`: + In case of cross validation, disables saving the joint model of the + pipelines fit on each fold. + + `y_test`: + do not save the predictions for the test set. + + `all`: + do not save any of the above. + For more information check `autoPyTorch.evaluation.utils.DisableFileOutputParameters`. + Returns: self """ + if dataset is None: + if ( + X_train is None + and y_train is None + ): + raise ValueError("No dataset provided, must provide X_train, y_train tensors") + dataset = self.get_dataset(X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + resampling_strategy=resampling_strategy, + resampling_strategy_args=resampling_strategy_args, + dataset_name=dataset_name + ) + self.dataset_name = dataset.dataset_name - if self._logger is None: - self._logger = self._get_logger(str(self.dataset_name)) + # Used when loading models + self.resampling_strategy = resampling_strategy + + self._logger = self._get_logger("RefitLogger") + + self._logger.debug("Starting refit") + + if self.n_jobs == 1: + self._dask_client = SingleThreadedClient() + else: + self._create_dask_client() dataset_requirements = get_dataset_requirements( info=dataset.get_required_dataset_info(), @@ -1417,29 +1450,102 @@ def refit( exclude=self.exclude_components, search_space_updates=self.search_space_updates) dataset_properties = dataset.get_dataset_properties(dataset_requirements) + self._backend.save_datamanager(dataset) + scenario_mock = unittest.mock.Mock() + scenario_mock.wallclock_limit = run_time_limit_secs + # This stats object is a hack - maybe the SMAC stats object should + # already be generated here! + stats = Stats(scenario_mock) + + if memory_limit is None and getattr(self, '_memory_limit', None) is not None: + memory_limit = self._memory_limit + + metric = get_metrics(dataset_properties=dataset_properties, + names=[eval_metric] if eval_metric is not None else None, + all_supported_metrics=False).pop() + + pipeline_options = self.pipeline_options.copy().update(pipeline_options) if pipeline_options is not None \ + else self.pipeline_options.copy() + + assert pipeline_options is not None + + if budget_type is not None: + pipeline_options.update({'budget_type': budget_type}) + else: + budget_type = pipeline_options['budget_type'] + + budget = budget if budget is not None else pipeline_options[budget_type] + + if disable_file_output is None: + disable_file_output = getattr(self, '_disable_file_output', []) + + stats.start_timing() + if self.models_ is None or len(self.models_) == 0 or self.ensemble_ is None: self._load_models() - # Refit is not applicable when ensemble_size is set to zero. - if self.ensemble_ is None: - raise ValueError("Refit can only be called if 'ensemble_size != 0'") - + model_configs = [] for identifier in self.models_: model = self.models_[identifier] - # this updates the model inplace, it can then later be used in - # predict method - - # try to fit the model. If it fails, shuffle the data. This - # could alleviate the problem in algorithms that depend on - # the ordering of the data. - X = self._get_fit_dictionary( - dataset_properties=copy.copy(dataset_properties), - dataset=dataset, - split_id=split_id) - fit_and_suppress_warnings(self._logger, model, X, y=None) + budget = identifier[-1] # identifier is seed, num_run, budget + model_configs.append((model.config, budget)) + + self._logger.debug(f"Refitting {model_configs}") + run_history = run_models_on_dataset( + time_left=total_walltime_limit, + func_eval_time_limit_secs=run_time_limit_secs, + model_configs=model_configs, + logger=self._logger, + logger_port=self._logger_port, + metric=metric, + dask_client=self._dask_client, + backend=self._backend, + memory_limit=memory_limit, + disable_file_output=disable_file_output, + all_supported_metrics=all_supported_metrics, + include=self.include_components, + exclude=self.exclude_components, + search_space_updates=self.search_space_updates, + pipeline_options=pipeline_options, + seed=self.seed, + multiprocessing_context=self._multiprocessing_context, + n_jobs=self.n_jobs, + current_search_space=self.search_space, + initial_num_run=self._backend.get_next_num_run() + ) + + replace_old_identifiers_to_refit_identifiers = {} + + self._logger.debug("Finished refit training") + old_identifier_index = None + for _, run_value in run_history.data.items(): + config = run_value.additional_info['configuration'] + for i, (configuration, _) in enumerate(model_configs): + if isinstance(configuration, Configuration): + configuration = configuration.get_dictionary() + self._logger.debug(f"Matching {config} with {configuration}") + if config == configuration: + old_identifier_index = i + break + if old_identifier_index is not None: + old_identifier = list(self.models_.keys())[old_identifier_index] + refit_identifier = (self.seed, run_value.additional_info['num_run'], old_identifier[2]) + replace_old_identifiers_to_refit_identifiers[old_identifier] = refit_identifier + else: + warnings.warn(f"Refit for {config} failed. Model fitted during search will be used instead.") + old_identifier_index = None + self.ensemble_.update_identifiers(replace_old_identifiers_to_refit_identifiers) + self.run_history.update(run_history, DataOrigin.EXTERNAL_SAME_INSTANCES) + run_history.save_json(os.path.join(self._backend.internals_directory, 'refit_run_history.json'), + save_external=True) + + # store ensemble for later use, with large iteration + self._backend.save_ensemble(self.ensemble_, 10**8, self.seed) + + self._load_models() self._clean_logger() return self @@ -1473,8 +1579,8 @@ def fit_pipeline( A pipeline configuration can be specified if None, uses default - Fit uses the estimator pipeline_config attribute, which the user - can interact via the get_pipeline_config()/set_pipeline_config() + Fit uses the estimator pipeline_options attribute, which the user + can interact via the get_pipeline_options()/set_pipeline_options() methods. Args: @@ -1581,8 +1687,8 @@ def fit_pipeline( if dataset is None: if ( - X_train is not None - and y_train is not None + X_train is None + and y_train is None ): raise ValueError("No dataset provided, must provide X_train, y_train tensors") dataset = self.get_dataset(X_train=X_train, @@ -1603,22 +1709,22 @@ def fit_pipeline( # search process, it makes sense to set it to 0 configuration.__setattr__('config_id', 0) + include_components = self.include_components if include_components is None else include_components + exclude_components = self.exclude_components if exclude_components is None else exclude_components + search_space_updates = self.search_space_updates if search_space_updates is None else search_space_updates + # get dataset properties dataset_requirements = get_dataset_requirements( info=dataset.get_required_dataset_info(), - include=self.include_components, - exclude=self.exclude_components, - search_space_updates=self.search_space_updates) + include=include_components, + exclude=exclude_components, + search_space_updates=search_space_updates) dataset_properties = dataset.get_dataset_properties(dataset_requirements) self._backend.save_datamanager(dataset) if self._logger is None: self._logger = self._get_logger(dataset.dataset_name) - include_components = self.include_components if include_components is None else include_components - exclude_components = self.exclude_components if exclude_components is None else exclude_components - search_space_updates = self.search_space_updates if search_space_updates is None else search_space_updates - scenario_mock = unittest.mock.Mock() scenario_mock.wallclock_limit = run_time_limit_secs # This stats object is a hack - maybe the SMAC stats object should @@ -1666,7 +1772,7 @@ def fit_pipeline( include=include_components, exclude=exclude_components, search_space_updates=search_space_updates, - pipeline_config=pipeline_options, + pipeline_options=pipeline_options, pynisher_context=self._multiprocessing_context, ) @@ -1742,8 +1848,7 @@ def predict( # Parallelize predictions across models with n_jobs processes. # Each process computes predictions in chunks of batch_size rows. - if self._logger is None: - self._logger = self._get_logger("Predict-Logger") + self._logger = self._get_logger("Predict-Logger") if self.ensemble_ is None and not self._load_models(): raise ValueError("No ensemble found. Either fit has not yet " diff --git a/autoPyTorch/ensemble/abstract_ensemble.py b/autoPyTorch/ensemble/abstract_ensemble.py index 072b6d260..0f04fe38a 100644 --- a/autoPyTorch/ensemble/abstract_ensemble.py +++ b/autoPyTorch/ensemble/abstract_ensemble.py @@ -9,6 +9,9 @@ class AbstractEnsemble(object): __metaclass__ = ABCMeta + def __init__(self): + self.identifiers_: List[Tuple[int, int, float]] = [] + @abstractmethod def fit( self, @@ -76,3 +79,12 @@ def get_validation_performance(self) -> float: Returns: Score """ + + def update_identifiers( + self, + replace_identifiers_mapping: Dict[Tuple[int, int, float], Tuple[int, int, float]] + ) -> None: + identifiers = self.identifiers_.copy() + for i, identifier in enumerate(self.identifiers_): + identifiers[i] = replace_identifiers_mapping.get(identifier, identifier) + self.identifiers_ = identifiers diff --git a/autoPyTorch/evaluation/abstract_evaluator.py b/autoPyTorch/evaluation/abstract_evaluator.py index d20a96b75..069228726 100644 --- a/autoPyTorch/evaluation/abstract_evaluator.py +++ b/autoPyTorch/evaluation/abstract_evaluator.py @@ -195,7 +195,8 @@ def get_additional_run_info(self) -> Dict[str, Any]: Can be found in autoPyTorch/pipeline/components/setup/traditional_ml/estimator_configs """ return {'pipeline_configuration': self.configuration, - 'trainer_configuration': self.pipeline.named_steps['model_trainer'].choice.model.get_config()} + 'trainer_configuration': self.pipeline.named_steps['model_trainer'].choice.model.get_config(), + 'configuration_origin': 'traditional'} def get_pipeline_representation(self) -> Dict[str, str]: return self.pipeline.get_pipeline_representation() @@ -347,7 +348,7 @@ class AbstractEvaluator(object): An evaluator is an object that: + constructs a pipeline (i.e. a classification or regression estimator) for a given - pipeline_config and run settings (budget, seed) + pipeline_options and run settings (budget, seed) + Fits and trains this pipeline (TrainEvaluator) or tests a given configuration (TestEvaluator) @@ -369,7 +370,7 @@ class AbstractEvaluator(object): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type. Currently, only epoch and time are allowed. - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -430,7 +431,7 @@ def __init__(self, backend: Backend, budget: float, configuration: Union[int, str, Configuration], budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, seed: int = 1, output_y_hat_optimization: bool = True, num_run: Optional[int] = None, @@ -523,10 +524,10 @@ def __init__(self, backend: Backend, self._init_params = init_params assert self.pipeline_class is not None, "Could not infer pipeline class" - pipeline_config = pipeline_config if pipeline_config is not None \ + pipeline_options = pipeline_options if pipeline_options is not None \ else self.pipeline_class.get_default_pipeline_options() - self.budget_type = pipeline_config['budget_type'] if budget_type is None else budget_type - self.budget = pipeline_config[self.budget_type] if budget == 0 else budget + self.budget_type = pipeline_options['budget_type'] if budget_type is None else budget_type + self.budget = pipeline_options[self.budget_type] if budget == 0 else budget self.num_run = 0 if num_run is None else num_run @@ -539,7 +540,7 @@ def __init__(self, backend: Backend, port=logger_port, ) - self._init_fit_dictionary(logger_port=logger_port, pipeline_config=pipeline_config, metrics_dict=metrics_dict) + self._init_fit_dictionary(logger_port=logger_port, pipeline_options=pipeline_options, metrics_dict=metrics_dict) self.Y_optimization: Optional[np.ndarray] = None self.Y_actual_train: Optional[np.ndarray] = None self.pipelines: Optional[List[BaseEstimator]] = None @@ -597,7 +598,7 @@ def _init_datamanager_info( def _init_fit_dictionary( self, logger_port: int, - pipeline_config: Dict[str, Any], + pipeline_options: Dict[str, Any], metrics_dict: Optional[Dict[str, List[str]]] = None, ) -> None: """ @@ -608,7 +609,7 @@ def _init_fit_dictionary( Logging is performed using a socket-server scheme to be robust against many parallel entities that want to write to the same file. This integer states the socket port for the communication channel. - pipeline_config (Dict[str, Any]): + pipeline_options (Dict[str, Any]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -634,7 +635,7 @@ def _init_fit_dictionary( 'optimize_metric': self.metric.name }) - self.fit_dictionary.update(pipeline_config) + self.fit_dictionary.update(pipeline_options) # If the budget is epochs, we want to limit that in the fit dictionary if self.budget_type == 'epochs': self.fit_dictionary['epochs'] = self.budget @@ -805,6 +806,11 @@ def finish_up(self, loss: Dict[str, float], train_loss: Dict[str, float], if test_loss is not None: additional_run_info['test_loss'] = test_loss + # Add information to additional info that can be useful for other functionalities + additional_run_info['configuration'] = self.configuration \ + if not isinstance(self.configuration, Configuration) else self.configuration.get_dictionary() + additional_run_info['budget'] = self.budget + rval_dict = {'loss': cost, 'additional_run_info': additional_run_info, 'status': status} diff --git a/autoPyTorch/evaluation/tae.py b/autoPyTorch/evaluation/tae.py index b1650113c..3eaea6720 100644 --- a/autoPyTorch/evaluation/tae.py +++ b/autoPyTorch/evaluation/tae.py @@ -123,7 +123,7 @@ def __init__( abort_on_first_run_crash: bool, pynisher_context: str, multi_objectives: List[str], - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, initial_num_run: int = 1, stats: Optional[Stats] = None, run_obj: str = 'quality', @@ -198,13 +198,13 @@ def __init__( self.disable_file_output = disable_file_output self.init_params = init_params - self.budget_type = pipeline_config['budget_type'] if pipeline_config is not None else budget_type + self.budget_type = pipeline_options['budget_type'] if pipeline_options is not None else budget_type - self.pipeline_config: Dict[str, Union[int, str, float]] = dict() - if pipeline_config is None: - pipeline_config = replace_string_bool_to_bool(json.load(open( + self.pipeline_options: Dict[str, Union[int, str, float]] = dict() + if pipeline_options is None: + pipeline_options = replace_string_bool_to_bool(json.load(open( os.path.join(os.path.dirname(__file__), '../configs/default_pipeline_options.json')))) - self.pipeline_config.update(pipeline_config) + self.pipeline_options.update(pipeline_options) self.logger_port = logger_port if self.logger_port is None: @@ -225,7 +225,7 @@ def __init__( def _check_and_get_default_budget(self) -> float: budget_type_choices_tabular = ('epochs', 'runtime') budget_choices = { - budget_type: float(self.pipeline_config.get(budget_type, np.inf)) + budget_type: float(self.pipeline_options.get(budget_type, np.inf)) for budget_type in budget_type_choices_tabular } @@ -234,7 +234,7 @@ def _check_and_get_default_budget(self) -> float: budget_type_choices = budget_type_choices_tabular + FORECASTING_BUDGET_TYPE # budget is defined by epochs by default - budget_type = str(self.pipeline_config.get('budget_type', 'epochs')) + budget_type = str(self.pipeline_options.get('budget_type', 'epochs')) if self.budget_type is not None: budget_type = self.budget_type @@ -361,7 +361,7 @@ def run( init_params=init_params, budget=budget, budget_type=self.budget_type, - pipeline_config=self.pipeline_config, + pipeline_options=self.pipeline_options, logger_port=self.logger_port, all_supported_metrics=self.all_supported_metrics, search_space_updates=self.search_space_updates diff --git a/autoPyTorch/evaluation/test_evaluator.py b/autoPyTorch/evaluation/test_evaluator.py index 4d5b0ae91..12b7bc31d 100644 --- a/autoPyTorch/evaluation/test_evaluator.py +++ b/autoPyTorch/evaluation/test_evaluator.py @@ -51,7 +51,7 @@ class TestEvaluator(AbstractEvaluator): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -113,7 +113,7 @@ def __init__( budget: float, configuration: Union[int, str, Configuration], budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, seed: int = 1, output_y_hat_optimization: bool = False, num_run: Optional[int] = None, @@ -141,7 +141,7 @@ def __init__( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates ) @@ -206,7 +206,7 @@ def eval_test_function( include: Optional[Dict[str, Any]], exclude: Optional[Dict[str, Any]], disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, budget_type: str = None, init_params: Optional[Dict[str, Any]] = None, logger_port: Optional[int] = None, @@ -230,7 +230,7 @@ def eval_test_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates) evaluator.fit_predict_and_loss() diff --git a/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py b/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py index 0940d1e9a..07a87ede4 100644 --- a/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py +++ b/autoPyTorch/evaluation/time_series_forecasting_train_evaluator.py @@ -40,7 +40,7 @@ class TimeSeriesForecastingTrainEvaluator(TrainEvaluator): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -106,7 +106,7 @@ def __init__(self, backend: Backend, queue: Queue, metric: autoPyTorchMetric, budget: float, budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, configuration: Optional[Configuration] = None, seed: int = 1, output_y_hat_optimization: bool = True, @@ -138,7 +138,7 @@ def __init__(self, backend: Backend, queue: Queue, logger_port=logger_port, keep_models=keep_models, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates ) self.datamanager = backend.load_datamanager() @@ -456,7 +456,7 @@ def forecasting_eval_train_function( include: Optional[Dict[str, Any]], exclude: Optional[Dict[str, Any]], disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, budget_type: str = None, init_params: Optional[Dict[str, Any]] = None, logger_port: Optional[int] = None, @@ -490,7 +490,7 @@ def forecasting_eval_train_function( The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -550,7 +550,7 @@ def forecasting_eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, max_budget=max_budget, min_num_test_instances=min_num_test_instances, diff --git a/autoPyTorch/evaluation/train_evaluator.py b/autoPyTorch/evaluation/train_evaluator.py index 142af6bcc..e88c8eaca 100644 --- a/autoPyTorch/evaluation/train_evaluator.py +++ b/autoPyTorch/evaluation/train_evaluator.py @@ -60,7 +60,7 @@ class TrainEvaluator(AbstractEvaluator): The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -121,7 +121,7 @@ def __init__(self, backend: Backend, queue: Queue, budget: float, configuration: Union[int, str, Configuration], budget_type: str = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, seed: int = 1, output_y_hat_optimization: bool = True, num_run: Optional[int] = None, @@ -149,7 +149,7 @@ def __init__(self, backend: Backend, queue: Queue, budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates ) @@ -420,7 +420,7 @@ def eval_train_function( include: Optional[Dict[str, Any]], exclude: Optional[Dict[str, Any]], disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, - pipeline_config: Optional[Dict[str, Any]] = None, + pipeline_options: Optional[Dict[str, Any]] = None, budget_type: str = None, init_params: Optional[Dict[str, Any]] = None, logger_port: Optional[int] = None, @@ -452,7 +452,7 @@ def eval_train_function( The amount of epochs/time a configuration is allowed to run. budget_type (str): The budget type, which can be epochs or time - pipeline_config (Optional[Dict[str, Any]]): + pipeline_options (Optional[Dict[str, Any]]): Defines the content of the pipeline being evaluated. For example, it contains pipeline specific settings like logging name, or whether or not to use tensorboard. @@ -506,7 +506,7 @@ def eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, ) evaluator.fit_predict_and_loss() diff --git a/autoPyTorch/optimizer/smbo.py b/autoPyTorch/optimizer/smbo.py index 53eae4696..92bf7bb87 100644 --- a/autoPyTorch/optimizer/smbo.py +++ b/autoPyTorch/optimizer/smbo.py @@ -111,7 +111,7 @@ def __init__(self, watcher: StopWatch, n_jobs: int, dask_client: Optional[dask.distributed.Client], - pipeline_config: Dict[str, Any], + pipeline_options: Dict[str, Any], start_num_run: int = 1, seed: int = 1, resampling_strategy: Union[HoldoutValTypes, @@ -227,7 +227,7 @@ def __init__(self, self.backend = backend self.all_supported_metrics = all_supported_metrics - self.pipeline_config = pipeline_config + self.pipeline_options = pipeline_options # the configuration space self.config_space = config_space @@ -326,7 +326,7 @@ def run_smbo(self, func: Optional[Callable] = None ta=func, logger_port=self.logger_port, all_supported_metrics=self.all_supported_metrics, - pipeline_config=self.pipeline_config, + pipeline_options=self.pipeline_options, search_space_updates=self.search_space_updates, pynisher_context=self.pynisher_context, ) @@ -376,7 +376,7 @@ def run_smbo(self, func: Optional[Callable] = None ) scenario_dict.update(self.smac_scenario_args) - budget_type = self.pipeline_config['budget_type'] + budget_type = self.pipeline_options['budget_type'] if budget_type in FORECASTING_BUDGET_TYPE: if STRING_TO_TASK_TYPES.get(self.task_type, -1) != TIMESERIES_FORECASTING: raise ValueError('Forecasting Budget type is only available for forecasting task!') diff --git a/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py b/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py index 7d26c5481..8b4723066 100644 --- a/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py +++ b/autoPyTorch/pipeline/components/setup/traditional_ml/base_model.py @@ -52,8 +52,7 @@ def __init__( self.add_fit_requirements([ FitRequirement('X_train', (np.ndarray, list, pd.DataFrame), user_defined=False, dataset_property=False), FitRequirement('y_train', (np.ndarray, list, pd.Series,), user_defined=False, dataset_property=False), - FitRequirement('train_indices', (np.ndarray, list), user_defined=False, dataset_property=False), - FitRequirement('val_indices', (np.ndarray, list), user_defined=False, dataset_property=False)]) + FitRequirement('train_indices', (np.ndarray, list), user_defined=False, dataset_property=False)]) def fit(self, X: Dict[str, Any], y: Any = None) -> autoPyTorchSetupComponent: """ @@ -90,8 +89,14 @@ def fit(self, X: Dict[str, Any], y: Any = None) -> autoPyTorchSetupComponent: # train model blockPrint() + val_indices = X.get('val_indices', None) + X_val = None + y_val = None + if val_indices is not None: + X_val = X['X_train'][val_indices] + y_val = X['y_train'][val_indices] self.fit_output = self.model.fit(X['X_train'][X['train_indices']], X['y_train'][X['train_indices']], - X['X_train'][X['val_indices']], X['y_train'][X['val_indices']]) + X_val, y_val) enablePrint() # infer diff --git a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py index 9c0166a9f..eaf40feb3 100644 --- a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py +++ b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/base_traditional_learner.py @@ -68,6 +68,8 @@ def __init__(self, self.is_classification = STRING_TO_TASK_TYPES[task_type] not in REGRESSION_TASKS + self.has_val_set = False + self.metric = get_metrics(dataset_properties={'task_type': task_type, 'output_type': output_type}, names=[optimize_metric] if optimize_metric is not None else None)[0] @@ -132,8 +134,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: """ Method that fits the underlying estimator Args: @@ -152,8 +154,8 @@ def _fit(self, def fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> Dict[str, Any]: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> Dict[str, Any]: """ Fit the model (possible using the validation set for early stopping) and return the results on the training and validation set. @@ -172,7 +174,10 @@ def fit(self, X_train: np.ndarray, Dictionary containing the results. see _get_results() """ X_train = self._preprocess(X_train) - X_val = self._preprocess(X_val) + + if X_val is not None: + self.has_val_set = True + X_val = self._preprocess(X_val) self._prepare_model(X_train, y_train) @@ -253,14 +258,14 @@ def _get_results(self, Dictionary containing the results """ pred_train = self.predict(X_train, predict_proba=self.is_classification, preprocess=False) - pred_val = self.predict(X_val, predict_proba=self.is_classification, preprocess=False) results = dict() - - results["val_preds"] = pred_val.tolist() - results["labels"] = y_val.tolist() - results["train_score"] = self.metric(y_train, pred_train) - results["val_score"] = self.metric(y_val, pred_val) + + if self.has_val_set: + pred_val = self.predict(X_val, predict_proba=self.is_classification, preprocess=False) + results["labels"] = y_val.tolist() + results["val_preds"] = pred_val.tolist() + results["val_score"] = self.metric(y_val, pred_val) return results diff --git a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py index 220c52dcd..fca02aa32 100644 --- a/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py +++ b/autoPyTorch/pipeline/components/setup/traditional_ml/traditional_learner/learners.py @@ -45,8 +45,10 @@ def _prepare_model(self, X_train: np.ndarray, y_train: np.ndarray ) -> None: - early_stopping = 150 if X_train.shape[0] > 10000 else max(round(150 * 10000 / X_train.shape[0]), 10) - self.config["early_stopping_rounds"] = early_stopping + + if self.has_val_set: + early_stopping = 150 if X_train.shape[0] > 10000 else max(round(150 * 10000 / X_train.shape[0]), 10) + self.config["early_stopping_rounds"] = early_stopping if not self.is_classification: self.model = LGBMRegressor(**self.config, random_state=self.random_state) else: @@ -57,11 +59,14 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None ) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" - self.model.fit(X_train, y_train, eval_set=[(X_val, y_val)]) + eval_set = None + if self.has_val_set: + eval_set = [(X_val, y_val)] + self.model.fit(X_train, y_train, eval_set=eval_set) def predict(self, X_test: np.ndarray, predict_proba: bool = False, @@ -125,15 +130,21 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None + ) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" - early_stopping = 150 if X_train.shape[0] > 10000 else max(round(150 * 10000 / X_train.shape[0]), 10) categoricals = [ind for ind in range(X_train.shape[1]) if isinstance(X_train[0, ind], str)] X_train_pooled = Pool(data=X_train, label=y_train, cat_features=categoricals) - X_val_pooled = Pool(data=X_val, label=y_val, cat_features=categoricals) + X_val_pooled = None + if self.has_val_set: + X_val_pooled = Pool(data=X_val, label=y_val, cat_features=categoricals) + early_stopping: Optional[int] = 150 if X_train.shape[0] > 10000 else max( + round(150 * 10000 / X_train.shape[0]), 10) + else: + early_stopping = None self.model.fit(X_train_pooled, eval_set=X_val_pooled, @@ -189,8 +200,9 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None + ) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) @@ -244,8 +256,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) if self.config["warm_start"]: @@ -303,8 +315,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) @@ -346,8 +358,8 @@ def _prepare_model(self, def _fit(self, X_train: np.ndarray, y_train: np.ndarray, - X_val: np.ndarray, - y_val: np.ndarray) -> None: + X_val: Optional[np.ndarray] = None, + y_val: Optional[np.ndarray] = None) -> None: assert self.model is not None, "No model found. Can't fit without preparing the model" self.model.fit(X_train, y_train) diff --git a/autoPyTorch/utils/parallel_model_runner.py b/autoPyTorch/utils/parallel_model_runner.py new file mode 100644 index 000000000..d4237f683 --- /dev/null +++ b/autoPyTorch/utils/parallel_model_runner.py @@ -0,0 +1,300 @@ +import logging +import math +import time +import unittest +from typing import Dict, List, Optional, Tuple, Union + +from ConfigSpace.configuration_space import Configuration, ConfigurationSpace + +import dask.distributed + +from smac.runhistory.runhistory import RunHistory +from smac.stats.stats import Stats +from smac.tae import StatusType + +from autoPyTorch.automl_common.common.utils.backend import Backend +from autoPyTorch.evaluation.tae import ExecuteTaFuncWithQueue, get_cost_of_crash +from autoPyTorch.evaluation.utils import DisableFileOutputParameters +from autoPyTorch.pipeline.components.training.metrics.base import autoPyTorchMetric +from autoPyTorch.utils.common import dict_repr +from autoPyTorch.utils.hyperparameter_search_space_update import HyperparameterSearchSpaceUpdates +from autoPyTorch.utils.logging_ import PicklableClientLogger + + +def run_models_on_dataset( + time_left: int, + func_eval_time_limit_secs: int, + model_configs: List[Tuple[str, Configuration]], + logger: PicklableClientLogger, + metric: autoPyTorchMetric, + dask_client: dask.distributed.Client, + backend: Backend, + seed: int, + multiprocessing_context: str, + current_search_space: ConfigurationSpace, + n_jobs: int = 1, + initial_num_run: int = 1, + all_supported_metrics: bool = False, + include: Optional[Dict[str, List[str]]] = None, + exclude: Optional[Dict[str, List[str]]] = None, + search_space_updates: Optional[HyperparameterSearchSpaceUpdates] = None, + logger_port: Optional[int] = logging.handlers.DEFAULT_TCP_LOGGING_PORT, + memory_limit: Optional[int] = None, + disable_file_output: Optional[List[Union[str, DisableFileOutputParameters]]] = None, + pipeline_options: Optional[Dict] = None, +) -> RunHistory: + """ + Runs models specified by `model_configs` on dask parallel infrastructure. + + Args: + time_left (int): + Time limit in seconds for the search of appropriate models. + By increasing this value, autopytorch has a higher + chance of finding better models. + func_eval_time_limit_secs (int): + Time limit for a single call to the machine learning model. + Model fitting will be terminated if the machine + learning algorithm runs over the time limit. Set + this value high enough so that typical machine + learning algorithms can be fit on the training + data. + Set to np.inf in case no time limit is desired. + model_configs (List[Tuple[str, Configuration]]): + List containing the configuration and the budget for the model to be evaluated. + logger (PicklableClientLogger): + Logger + metric (autoPyTorchMetric): + autoPyTorchMetric to be used for evaluation. + dask_client (dask.distributed.Client): + dask client where the function evaluation jobs are submitted. + backend (Backend): + Current backend object where the data is stored. The backend + is used to interact with the disk. + all_supported_metrics (bool): + If True, all metrics supporting current task will be calculated + for each pipeline. + seed (int): + Seed to be used for reproducibility. + multiprocessing_context (str): + context used for spawning child processes. + n_jobs (int): + Number of consecutive processes to spawn. + current_search_space (ConfigurationSpace): + The search space of the neural networks which will be used to instantiate Configuration objects. + initial_num_run (int): + Initial num run for running the models. + include (Optional[Dict[str, List[str]]]): + Dictionary containing components to include. Key is the node + name and Value is an Iterable of the names of the components + to include. Only these components will be present in the + search space. Defaults to None. + exclude (Optional[Dict[str, List[str]]]): + Dictionary containing components to exclude. Key is the node + name and Value is an Iterable of the names of the components + to exclude. All except these components will be present in + the search space. Defaults to None. + search_space_updates (Optional[HyperparameterSearchSpaceUpdates]): + Search space updates that can be used to modify the search + space of particular components or choice modules of the pipeline. + Defaults to None. + logger_port (Optional[int]): + Port used to create the logging server. Defaults to logging.handlers.DEFAULT_TCP_LOGGING_PORT. + memory_limit (Optional[int]): + Memory limit in MB for the machine learning algorithm. + Autopytorch will stop fitting the machine learning algorithm + if it tries to allocate more than memory_limit MB. If None + is provided, no memory limit is set. In case of multi-processing, + memory_limit will be per job. This memory limit also applies to + the ensemble creation process. Defaults to None. + disable_file_output (Optional[List[Union[str, DisableFileOutputParameters]]]): + Used as a list to pass more fine-grained + information on what to save. Must be a member of `DisableFileOutputParameters`. + Allowed elements in the list are: + + + `y_optimization`: + do not save the predictions for the optimization set, + which would later on be used to build an ensemble. Note that SMAC + optimizes a metric evaluated on the optimization set. + + `pipeline`: + do not save any individual pipeline files + + `pipelines`: + In case of cross validation, disables saving the joint model of the + pipelines fit on each fold. + + `y_test`: + do not save the predictions for the test set. + + `all`: + do not save any of the above. + For more information check `autoPyTorch.evaluation.utils.DisableFileOutputParameters`. + Defaults to None. + pipeline_options (Optional[Dict]): + Valid config options include "device", + "torch_num_threads", "early_stopping", "use_tensorboard_logger", + "metrics_during_training". + + Returns: + RunHistory: + run_history: + Run History of training all the models in model_configs + """ + starttime = time.time() + run_history = RunHistory() + if memory_limit is not None: + memory_limit = int(math.ceil(memory_limit)) + total_models = len(model_configs) + dask_futures: List[dask.distributed.Future] = [] + for n_r, (config, budget) in enumerate(model_configs): + + # Only launch a task if there is time + start_time = time.time() + if time_left >= func_eval_time_limit_secs: + logger.info(f"{n_r}: Started fitting {config} with cutoff={func_eval_time_limit_secs}") + scenario_mock = unittest.mock.Mock() + scenario_mock.wallclock_limit = time_left + # This stats object is a hack - maybe the SMAC stats object should + # already be generated here! + stats = Stats(scenario_mock) + stats.start_timing() + + if isinstance(config, Configuration): + config.config_id = n_r + init_num_run = initial_num_run + else: + init_num_run = initial_num_run + n_r + + ta = ExecuteTaFuncWithQueue( + pynisher_context=multiprocessing_context, + backend=backend, + seed=seed, + metric=metric, + multi_objectives=["cost"], + logger_port=logger_port, + pipeline_options=pipeline_options, + cost_for_crash=get_cost_of_crash(metric), + abort_on_first_run_crash=False, + initial_num_run=init_num_run, + stats=stats, + memory_limit=memory_limit, + disable_file_output=disable_file_output, + all_supported_metrics=all_supported_metrics, + include=include, + exclude=exclude, + search_space_updates=search_space_updates + ) + dask_futures.append([ + config, + dask_client.submit( + ta.run, config=config, + cutoff=func_eval_time_limit_secs, + budget=budget + ) + ]) + + # When managing time, we need to take into account the allocated time resources, + # which are dependent on the number of cores. 'dask_futures' is a proxy to the number + # of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most + # 4 task in parallel. Every 'cutoff' seconds, we generate up to 4 tasks. + # If we only have 4 workers and there are 4 futures in dask_futures, it means that every + # worker has a task. We would not like to launch another job until a worker is available. To this + # end, the following if-statement queries the number of active jobs, and forces to wait for a job + # completion via future.result(), so that a new worker is available for the next iteration. + if len(dask_futures) >= n_jobs: + + # How many workers to wait before starting fitting the next iteration + workers_to_wait = 1 + if n_r >= total_models - 1 or time_left <= func_eval_time_limit_secs: + # If on the last iteration, flush out all tasks + workers_to_wait = len(dask_futures) + + while workers_to_wait >= 1: + workers_to_wait -= 1 + # We launch dask jobs only when there are resources available. + # This allow us to control time allocation properly, and early terminate + # the traditional machine learning pipeline + _process_result( + current_search_space=current_search_space, + dask_futures=dask_futures, + run_history=run_history, + seed=seed, + starttime=starttime, + logger=logger) + + # In the case of a serial execution, calling submit halts the run for a resource + # dynamically adjust time in this case + time_left -= int(time.time() - start_time) + + # Exit if no more time is available for a new classifier + if time_left < func_eval_time_limit_secs: + logger.warning("Not enough time to fit all machine learning models." + "Please consider increasing the run time to further improve performance.") + break + + return run_history + + +def _process_result( + dask_futures: List[dask.distributed.Future], + current_search_space: ConfigurationSpace, + run_history: RunHistory, + seed: int, + starttime: float, + logger: PicklableClientLogger +) -> None: + """ + Update run_history in-place using results of the + latest finishing model. + + Args: + dask_futures (List[dask.distributed.Future]): + List of dask futures which are used to get the results of a finished run. + run_history (RunHistory): + RunHistory object to be appended with the finished run + seed (int): + Seed used for reproducibility. + starttime (float): + starttime of the runs. + logger (PicklableClientLogger): + Logger. + """ + cls, future = dask_futures.pop(0) + status, cost, runtime, additional_info = future.result() + if status == StatusType.SUCCESS: + logger.info( + "Fitting {} took {} [sec] and got performance: {}.\n" + "additional info:\n{}".format(cls, runtime, cost, dict_repr(additional_info)) + ) + origin: str = additional_info['configuration_origin'] + current_config: Union[str, dict] = additional_info['configuration'] + + # indicates the finished model is part of autopytorch search space + if isinstance(current_config, dict): + configuration = Configuration(current_search_space, current_config) # type: ignore[misc] + else: + # we assume that it is a traditional model and `pipeline_configuration` + # specifies the configuration. + configuration = additional_info.pop('pipeline_configuration', None) + + if configuration is not None: + run_history.add(config=configuration, cost=cost, + time=runtime, status=status, seed=seed, + starttime=starttime, endtime=starttime + runtime, + origin=origin, additional_info=additional_info) + else: + logger.warning(f"Something went wrong while processing the results of {current_config}." + f"with additional_info: {additional_info} and status_type: {status}. " + f"Refer to the log file for more information.\nSkipping for now.") + else: + if additional_info.get('exitcode') == -6: + logger.error( + "Prediction for {} failed with run state {},\n" + "because the provided memory limits were too tight.\n" + "Please increase the 'ml_memory_limit' and try again.\n" + "If you still get the problem, please open an issue\n" + "and paste the additional info.\n" + "Additional info:\n{}".format(cls, str(status), dict_repr(additional_info)) + ) + else: + logger.error( + "Prediction for {} failed with run state {}.\nAdditional info:\n{}".format( + cls, str(status), dict_repr(additional_info) + ) + ) diff --git a/examples/20_basics/example_tabular_classification.py b/examples/20_basics/example_tabular_classification.py index 636281eff..291e017ac 100644 --- a/examples/20_basics/example_tabular_classification.py +++ b/examples/20_basics/example_tabular_classification.py @@ -3,13 +3,15 @@ Tabular Classification ====================== -The following example shows how to fit a sample classification model -with AutoPyTorch +The following example shows how to fit a simple classification ensemble +with AutoPyTorch and refit the found ensemble. """ import os import tempfile as tmp import warnings +from autoPyTorch.datasets.resampling_strategy import CrossValTypes + os.environ['JOBLIB_TEMP_FOLDER'] = tmp.gettempdir() os.environ['OMP_NUM_THREADS'] = '1' os.environ['OPENBLAS_NUM_THREADS'] = '1' @@ -62,13 +64,39 @@ ) ############################################################################ -# Print the final ensemble performance -# ==================================== +# Print the final ensemble performance before refit +# ================================================= + y_pred = api.predict(X_test) score = api.score(y_pred, y_test) print(score) -# Print the final ensemble built by AutoPyTorch -print(api.show_models()) # Print statistics from search print(api.sprint_statistics()) + +########################################################################### +# Refit the models on the full dataset. +# ===================================== + +api.refit( + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + dataset_name="Australian", + # you can change the resampling strategy to + # for example, CrossValTypes.k_fold_cross_validation + # to fit k fold models and have a voting classifier + # resampling_strategy=CrossValTypes.k_fold_cross_validation +) + +############################################################################ +# Print the final ensemble performance after refit +# ================================================ + +y_pred = api.predict(X_test) +score = api.score(y_pred, y_test) +print(score) + +# Print the final ensemble built by AutoPyTorch +print(api.show_models()) diff --git a/examples/20_basics/example_tabular_regression.py b/examples/20_basics/example_tabular_regression.py index 127f26829..6357d23e1 100644 --- a/examples/20_basics/example_tabular_regression.py +++ b/examples/20_basics/example_tabular_regression.py @@ -50,19 +50,44 @@ optimize_metric='r2', total_walltime_limit=300, func_eval_time_limit_secs=50, + dataset_name="Boston" ) ############################################################################ -# Print the final ensemble performance -# ==================================== +# Print the final ensemble performance before refit +# ================================================= y_pred = api.predict(X_test) - -# Rescale the Neural Network predictions into the original target range score = api.score(y_pred, y_test) - print(score) -# Print the final ensemble built by AutoPyTorch -print(api.show_models()) # Print statistics from search print(api.sprint_statistics()) + +########################################################################### +# Refit the models on the full dataset. +# ===================================== + +api.refit( + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test, + dataset_name="Boston", + total_walltime_limit=500, + run_time_limit_secs=50 + # you can change the resampling strategy to + # for example, CrossValTypes.k_fold_cross_validation + # to fit k fold models and have a voting classifier + # resampling_strategy=CrossValTypes.k_fold_cross_validation +) + +############################################################################ +# Print the final ensemble performance after refit +# ================================================ + +y_pred = api.predict(X_test) +score = api.score(y_pred, y_test) +print(score) + +# Print the final ensemble built by AutoPyTorch +print(api.show_models()) diff --git a/setup.py b/setup.py index 7a8e7bac6..422c6f24d 100755 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ # noinspection PyInterpreter setuptools.setup( name="autoPyTorch", - version="0.2", + version="0.2.1", author="AutoML Freiburg", author_email="eddiebergmanhs@gmail.com", description=("Auto-PyTorch searches neural architectures using smac"), diff --git a/test/test_api/test_api.py b/test/test_api/test_api.py index 465d74c6b..157496138 100644 --- a/test/test_api/test_api.py +++ b/test/test_api/test_api.py @@ -46,6 +46,43 @@ HOLDOUT_NUM_SPLITS = 1 +def refit_test_estimator( + estimator, + X_train, + y_train, + X_test, + y_test, +): + estimator.refit( + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + + # Check if the refit models are actually used in the new ensemble. + refit_ensemble_identifiers = estimator.ensemble_.get_selected_model_identifiers() + refit_run_history_path = os.path.join(estimator._backend.internals_directory, 'refit_run_history.json') + assert os.path.exists(refit_run_history_path) + + refit_run_history: RunHistory = RunHistory() + refit_run_history.update_from_json(refit_run_history_path, estimator.search_space) + + all_refit_runs_in_new_ensemble = [] + model_num_runs = [] + for run_key, run_value in refit_run_history.data.items(): + any_refit_run_in_new_ensemble = False + num_run = run_value.additional_info["num_run"] + model_num_runs.append(num_run) + for identifier in refit_ensemble_identifiers: + if num_run == identifier[1]: + any_refit_run_in_new_ensemble = True + break + all_refit_runs_in_new_ensemble.append(any_refit_run_in_new_ensemble) + + assert all(all_refit_runs_in_new_ensemble), "All successful runs in the refit should be a part of the new ensemble" + + # Test # ==== @unittest.mock.patch('autoPyTorch.evaluation.tae.eval_train_function', @@ -186,8 +223,9 @@ def test_tabular_classification(openml_id, resampling_strategy, backend, resampl # Ensemble Builder produced an ensemble estimator.ensemble_ is not None + ensemble_identifiers = estimator.ensemble_.identifiers_ # There should be a weight for each element of the ensemble - assert len(estimator.ensemble_.identifiers_) == len(estimator.ensemble_.weights_) + assert len(ensemble_identifiers) == len(estimator.ensemble_.weights_) y_pred = estimator.predict(X_test) assert np.shape(y_pred)[0] == np.shape(X_test)[0] @@ -207,6 +245,15 @@ def test_tabular_classification(openml_id, resampling_strategy, backend, resampl successful_num_run) assert 'train_loss' in incumbent_results + # Test refit on dummy data + refit_test_estimator( + estimator=estimator, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + # Check that we can pickle dump_file = os.path.join(estimator._backend.temporary_directory, 'dump.pkl') @@ -217,9 +264,6 @@ def test_tabular_classification(openml_id, resampling_strategy, backend, resampl restored_estimator = pickle.load(f) restored_estimator.predict(X_test) - # Test refit on dummy data - estimator.refit(dataset=backend.load_datamanager()) - # Make sure that a configuration space is stored in the estimator assert isinstance(estimator.get_search_space(), CS.ConfigurationSpace) @@ -387,6 +431,15 @@ def test_tabular_regression(openml_name, resampling_strategy, backend, resamplin successful_num_run) assert 'train_loss' in incumbent_results, estimator.run_history.data + # Test refit on dummy data + refit_test_estimator( + estimator=estimator, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + # Check that we can pickle dump_file = os.path.join(estimator._backend.temporary_directory, 'dump.pkl') @@ -397,9 +450,6 @@ def test_tabular_regression(openml_name, resampling_strategy, backend, resamplin restored_estimator = pickle.load(f) restored_estimator.predict(X_test) - # Test refit on dummy data - estimator.refit(dataset=backend.load_datamanager()) - # Make sure that a configuration space is stored in the estimator assert isinstance(estimator.get_search_space(), CS.ConfigurationSpace) @@ -580,7 +630,14 @@ def test_time_series_forecasting(forecasting_toy_dataset, resampling_strategy, b assert np.shape(y_pred) == np.shape(y_test) # Test refit on dummy data - estimator.refit(dataset=backend.load_datamanager()) + refit_test_estimator( + estimator=estimator, + X_train=X_train, + y_train=y_train, + X_test=X_test, + y_test=y_test + ) + # Make sure that a configuration space is stored in the estimator assert isinstance(estimator.get_search_space(), CS.ConfigurationSpace) diff --git a/test/test_api/test_base_api.py b/test/test_api/test_base_api.py index bb8f9c061..ac01af25e 100644 --- a/test/test_api/test_base_api.py +++ b/test/test_api/test_base_api.py @@ -27,7 +27,7 @@ def test_nonsupported_arguments(fit_dictionary_tabular): api = BaseTask() with pytest.raises(ValueError, match=r".*Invalid configuration arguments given.*"): - api.set_pipeline_config(unsupported=True) + api.set_pipeline_options(unsupported=True) with pytest.raises(ValueError, match=r".*No search space initialised and no dataset.*"): api.get_search_space() api.resampling_strategy = None @@ -95,7 +95,7 @@ def test_show_models(fit_dictionary_tabular): assert re.search(expected, api.show_models()) is not None -def test_set_pipeline_config(): +def test_set_pipeline_options(): # checks if we can correctly change the pipeline options BaseTask.__abstractmethods__ = set() estimator = BaseTask() @@ -103,7 +103,7 @@ def test_set_pipeline_config(): "budget_type": "epochs", "epochs": 51, "runtime": 360} - estimator.set_pipeline_config(**pipeline_options) + estimator.set_pipeline_options(**pipeline_options) assert pipeline_options.items() <= estimator.get_pipeline_options().items() @@ -118,12 +118,12 @@ def test_pipeline_get_budget(fit_dictionary_tabular, min_budget, max_budget, bud estimator = BaseTask(task_type='tabular_classification', ensemble_size=0) # Fixture pipeline config - default_pipeline_config = { + default_pipeline_options = { 'device': 'cpu', 'budget_type': 'epochs', 'epochs': 50, 'runtime': 3600, 'torch_num_threads': 1, 'early_stopping': 20, 'use_tensorboard_logger': False, 'metrics_during_training': True, 'optimize_metric': 'accuracy' } - default_pipeline_config.update(expected) + default_pipeline_options.update(expected) # Create pre-requisites dataset = fit_dictionary_tabular['backend'].load_datamanager() @@ -141,7 +141,7 @@ def test_pipeline_get_budget(fit_dictionary_tabular, min_budget, max_budget, bud enable_traditional_pipeline=False, total_walltime_limit=20, func_eval_time_limit_secs=10, load_models=False) - assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_config'] == default_pipeline_config + assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_options'] == default_pipeline_options assert list(smac_mock.call_args)[1]['max_budget'] == max_budget assert list(smac_mock.call_args)[1]['initial_budget'] == min_budget @@ -174,12 +174,12 @@ def test_pipeline_get_budget_forecasting(fit_dictionary_forecasting, min_budget, BaseTask.__abstractmethods__ = set() estimator = BaseTask(task_type='time_series_forecasting', ensemble_size=0) # Fixture pipeline config - default_pipeline_config = { + default_pipeline_options = { 'device': 'cpu', 'budget_type': 'epochs', 'epochs': 50, 'runtime': 3600, 'torch_num_threads': 1, 'early_stopping': 20, 'use_tensorboard_logger': False, 'metrics_during_training': True, 'optimize_metric': 'mean_MASE_forecasting' } - default_pipeline_config.update(expected) + default_pipeline_options.update(expected) # Create pre-requisites dataset = fit_dictionary_forecasting['backend'].load_datamanager() @@ -198,6 +198,6 @@ def test_pipeline_get_budget_forecasting(fit_dictionary_forecasting, min_budget, total_walltime_limit=20, func_eval_time_limit_secs=10, memory_limit=8192, load_models=False) - assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_config'] == default_pipeline_config + assert list(smac_mock.call_args)[1]['ta_kwargs']['pipeline_options'] == default_pipeline_options assert list(smac_mock.call_args)[1]['max_budget'] == max_budget assert list(smac_mock.call_args)[1]['initial_budget'] == min_budget diff --git a/test/test_api/utils.py b/test/test_api/utils.py index bbee9a3c4..701c455c1 100644 --- a/test/test_api/utils.py +++ b/test/test_api/utils.py @@ -63,6 +63,11 @@ def _fit_and_predict(self, pipeline, fold: int, train_indices, test_indices=test_indices, ) + # the configuration is used in refit where + # pipeline.config is used to retrieve the + # original configuration. + pipeline.config = self.configuration + if add_pipeline_to_self: self.pipeline = pipeline else: @@ -94,7 +99,7 @@ def dummy_eval_train_function( include, exclude, disable_file_output, - pipeline_config=None, + pipeline_options=None, budget_type=None, init_params=None, logger_port=None, @@ -118,7 +123,7 @@ def dummy_eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, ) evaluator.fit_predict_and_loss() @@ -137,7 +142,7 @@ def dummy_forecasting_eval_train_function( include, exclude, disable_file_output, - pipeline_config=None, + pipeline_options=None, budget_type=None, init_params=None, logger_port=None, @@ -163,7 +168,7 @@ def dummy_forecasting_eval_train_function( budget_type=budget_type, logger_port=logger_port, all_supported_metrics=all_supported_metrics, - pipeline_config=pipeline_config, + pipeline_options=pipeline_options, search_space_updates=search_space_updates, max_budget=max_budget, min_num_test_instances=min_num_test_instances, diff --git a/test/test_evaluation/test_abstract_evaluator.py b/test/test_evaluation/test_abstract_evaluator.py index a0be2c3f3..bb9df88e7 100644 --- a/test/test_evaluation/test_abstract_evaluator.py +++ b/test/test_evaluation/test_abstract_evaluator.py @@ -307,7 +307,7 @@ def test_error_unsupported_budget_type(self): backend=backend, output_y_hat_optimization=False, queue=queue_mock, - pipeline_config={'budget_type': "error", 'error': 0}, + pipeline_options={'budget_type': "error", 'error': 0}, metric=accuracy, budget=0, configuration=1) diff --git a/test/test_evaluation/test_evaluators.py b/test/test_evaluation/test_evaluators.py index 2ca32af10..0f0f15cdc 100644 --- a/test/test_evaluation/test_evaluators.py +++ b/test/test_evaluation/test_evaluators.py @@ -97,12 +97,13 @@ def test_holdout(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(backend_api, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -136,12 +137,13 @@ def test_cv(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(backend_api, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -246,7 +248,7 @@ def test_predict_proba_binary_classification(self, mock): queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(self.backend_mock, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.fit_predict_and_loss() Y_optimization_pred = self.backend_mock.save_numrun_to_dir.call_args_list[0][1][ @@ -278,12 +280,13 @@ def test_additional_metrics_during_training(self, pipeline_mock): D = get_binary_classification_datamanager() configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() evaluator = TrainEvaluator(backend_api, queue_, configuration=configuration, metric=accuracy, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, all_supported_metrics=True) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, all_supported_metrics=True) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -339,6 +342,7 @@ def test_no_resampling(self, pipeline_mock): pipeline_mock.get_default_pipeline_options.return_value = {'budget_type': 'epochs', 'epochs': 10} configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, 'autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() diff --git a/test/test_evaluation/test_forecasting_evaluators.py b/test/test_evaluation/test_forecasting_evaluators.py index 580402d5c..5eea055df 100644 --- a/test/test_evaluation/test_forecasting_evaluators.py +++ b/test/test_evaluation/test_forecasting_evaluators.py @@ -60,8 +60,8 @@ def test_budget_type_choices(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_config={'budget_type': budget_type, - budget_type: 0.1}, + pipeline_options={'budget_type': budget_type, + budget_type: 0.1}, min_num_test_instances=100) self.assertTrue('epochs' not in evaluator.fit_dictionary) if budget_type == 'resolution': @@ -85,6 +85,7 @@ def test_holdout(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -93,7 +94,7 @@ def test_holdout(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, min_num_test_instances=100) self.assertTrue('epochs' in evaluator.fit_dictionary) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) @@ -140,6 +141,7 @@ def test_cv(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -148,7 +150,7 @@ def test_cv(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}) + pipeline_options={'budget_type': 'epochs', 'epochs': 50}) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -189,6 +191,7 @@ def test_proxy_val_set(self, pipeline_mock): pipeline_mock.get_additional_run_info.return_value = None configuration = unittest.mock.Mock(spec=Configuration) + configuration.get_dictionary.return_value = {} backend_api = create(self.tmp_dir, self.output_dir, prefix='autoPyTorch') backend_api.load_datamanager = lambda: D queue_ = multiprocessing.Queue() @@ -197,7 +200,7 @@ def test_proxy_val_set(self, pipeline_mock): queue_, configuration=configuration, metric=mean_MASE_forecasting, budget=0.3, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, min_num_test_instances=1) evaluator.file_output = unittest.mock.Mock(spec=evaluator.file_output) evaluator.file_output.return_value = (None, {}) @@ -247,7 +250,7 @@ def test_finish_up(self, pipeline_mock, queue_mock): queue_mock, configuration=configuration, metric=mean_MASE_forecasting, budget=0.3, - pipeline_config={'budget_type': 'epochs', 'epochs': 50}, + pipeline_options={'budget_type': 'epochs', 'epochs': 50}, min_num_test_instances=1) val_splits = D.splits[0][1] diff --git a/test/test_utils/test_parallel_model_runner.py b/test/test_utils/test_parallel_model_runner.py new file mode 100644 index 000000000..a0a163f6e --- /dev/null +++ b/test/test_utils/test_parallel_model_runner.py @@ -0,0 +1,66 @@ +import unittest.mock +from test.test_api.utils import dummy_eval_train_function +from test.test_evaluation.evaluation_util import get_binary_classification_datamanager + +from ConfigSpace import Configuration + +from smac.tae import StatusType + +from autoPyTorch.pipeline.components.training.metrics.utils import get_metrics +from autoPyTorch.utils.logging_ import PicklableClientLogger +from autoPyTorch.utils.parallel_model_runner import run_models_on_dataset +from autoPyTorch.utils.pipeline import get_configuration_space, get_dataset_requirements +from autoPyTorch.utils.single_thread_client import SingleThreadedClient + + +@unittest.mock.patch('autoPyTorch.evaluation.tae.eval_train_function', + new=dummy_eval_train_function) +def test_run_models_on_dataset(backend): + dataset = get_binary_classification_datamanager() + backend.save_datamanager(dataset) + # Search for a good configuration + dataset_requirements = get_dataset_requirements( + info=dataset.get_required_dataset_info() + ) + dataset_properties = dataset.get_dataset_properties(dataset_requirements) + search_space = get_configuration_space(info=dataset_properties) + num_random_configs = 5 + model_configurations = [(search_space.sample_configuration(), 1) for _ in range(num_random_configs)] + # Add a traditional model + model_configurations.append(('lgb', 1)) + + metric = get_metrics(dataset_properties=dataset_properties, + names=["accuracy"], + all_supported_metrics=False).pop() + logger = unittest.mock.Mock(spec=PicklableClientLogger) + + dask_client = SingleThreadedClient() + + runhistory = run_models_on_dataset( + time_left=15, + func_eval_time_limit_secs=5, + model_configs=model_configurations, + logger=logger, + metric=metric, + dask_client=dask_client, + backend=backend, + seed=1, + multiprocessing_context="fork", + current_search_space=search_space, + ) + + has_successful_model = False + has_matching_config = False + # assert atleast 1 successfully fitted model + for run_key, run_value in runhistory.data.items(): + if run_value.status == StatusType.SUCCESS: + has_successful_model = True + configuration = run_value.additional_info['configuration'] + for (config, _) in model_configurations: + if isinstance(config, Configuration): + config = config.get_dictionary() + if config == configuration: + has_matching_config = True + + assert has_successful_model, "Atleast 1 model should be successfully trained" + assert has_matching_config, "Configurations should match with the passed model configurations"