From 10cb6ae07c13876dc4b014c768b583d3d9474fe2 Mon Sep 17 00:00:00 2001 From: Daniel Cohen Date: Thu, 8 Aug 2024 11:05:22 -0700 Subject: [PATCH] Create Scheduler.generate_candidates() function (#2640) Summary: Pull Request resolved: https://github.com/facebook/Ax/pull/2640 Add `Scheduler.generate_candidates()` method which calls - poll and fetch - get next trial - eventaully gen report - save new trials Differential Revision: D59606488 --- ax/exceptions/generation_strategy.py | 7 + ax/modelbridge/torch.py | 3 +- ax/service/scheduler.py | 43 ++++ ax/service/tests/scheduler_test_utils.py | 268 ++++++++++++++++++++++- ax/utils/testing/core_stubs.py | 52 ++++- 5 files changed, 367 insertions(+), 6 deletions(-) diff --git a/ax/exceptions/generation_strategy.py b/ax/exceptions/generation_strategy.py index 3c58c554db4..19cad9c19a4 100644 --- a/ax/exceptions/generation_strategy.py +++ b/ax/exceptions/generation_strategy.py @@ -74,3 +74,10 @@ def __init__(self, error_info: Optional[str]) -> None: + "check the documentation, and adjust the configuration accordingly. " + f"{error_info}" ) + + +class OptimizationConfigRequired(ValueError): + """Error indicating that an candidate generation cannot be completed + because an optimization config was not provided.""" + + pass diff --git a/ax/modelbridge/torch.py b/ax/modelbridge/torch.py index ccc7acb9f9c..f1fc1eabed9 100644 --- a/ax/modelbridge/torch.py +++ b/ax/modelbridge/torch.py @@ -38,6 +38,7 @@ from ax.core.search_space import SearchSpace from ax.core.types import TCandidateMetadata, TModelPredictArm from ax.exceptions.core import DataRequiredError, UnsupportedError +from ax.exceptions.generation_strategy import OptimizationConfigRequired from ax.modelbridge.base import gen_arms, GenResults, ModelBridge from ax.modelbridge.modelbridge_utils import ( array_to_observation_data, @@ -804,7 +805,7 @@ def _get_transformed_model_gen_args( search_space=search_space, param_names=self.parameters ) if optimization_config is None: - raise ValueError( + raise OptimizationConfigRequired( f"{self.__class__.__name__} requires an OptimizationConfig " "to be specified" ) diff --git a/ax/service/scheduler.py b/ax/service/scheduler.py index 607eee060aa..7bdb775670c 100644 --- a/ax/service/scheduler.py +++ b/ax/service/scheduler.py @@ -55,6 +55,7 @@ from ax.exceptions.generation_strategy import ( AxGenerationException, MaxParallelismReachedException, + OptimizationConfigRequired, ) from ax.modelbridge.base import ModelBridge from ax.modelbridge.generation_strategy import GenerationStrategy @@ -1708,6 +1709,15 @@ def _get_next_trials(self, num_trials: int = 1, n: int = 1) -> List[BaseTrial]: ) self.logger.debug(f"Message from generation strategy: {err}") return [] + except OptimizationConfigRequired as err: + if self._log_next_no_trials_reason: + self.logger.info( + "Generated all trials that can be generated currently. " + "`generation_strategy` requires an optimization config " + "to be set before generating more trials." + ) + self.logger.debug(f"Message from generation strategy: {err}") + return [] if self.options.trial_type == TrialType.TRIAL and any( len(generator_run_list[0].arms) > 1 or len(generator_run_list) > 1 @@ -1737,6 +1747,39 @@ def _get_next_trials(self, num_trials: int = 1, n: int = 1) -> List[BaseTrial]: trials.append(trial) return trials + def generate_candidates( + self, + num_trials: int = 1, + reduce_state_generator_runs: bool = False, + ) -> List[BaseTrial]: + """Fetch the latest data and generate new candidate trials. + + Args: + num_trials: Number of candidate trials to generate. + reduce_state_generator_runs: Flag to determine + whether to save model state for every generator run (default) + or to only save model state on the final generator run of each + batch. + + Returns: + List of trials, empty if generation is not possible. + """ + self.poll_and_process_results() + new_trials = self._get_next_trials( + num_trials=num_trials, + n=self.options.batch_size or 1, + ) + if len(new_trials) > 0: + new_generator_runs = [gr for t in new_trials for gr in t.generator_runs] + self._save_or_update_trials_and_generation_strategy_if_possible( + experiment=self.experiment, + trials=new_trials, + generation_strategy=self.generation_strategy, + new_generator_runs=new_generator_runs, + reduce_state_generator_runs=reduce_state_generator_runs, + ) + return new_trials + def _gen_new_trials_from_generation_strategy( self, num_trials: int, diff --git a/ax/service/tests/scheduler_test_utils.py b/ax/service/tests/scheduler_test_utils.py index 8f552b255b8..5640ee1acaa 100644 --- a/ax/service/tests/scheduler_test_utils.py +++ b/ax/service/tests/scheduler_test_utils.py @@ -36,8 +36,14 @@ from ax.metrics.branin_map import BraninTimestampMapMetric from ax.modelbridge.cross_validation import compute_model_fit_metrics_from_modelbridge from ax.modelbridge.dispatch_utils import choose_generation_strategy -from ax.modelbridge.generation_strategy import GenerationStep, GenerationStrategy +from ax.modelbridge.generation_strategy import ( + GenerationNode, + GenerationStep, + GenerationStrategy, +) +from ax.modelbridge.model_spec import ModelSpec from ax.modelbridge.registry import Models, ST_MTGP_trans +from ax.modelbridge.transition_criterion import MaxTrials, MinTrials from ax.runners.single_running_trial_mixin import SingleRunningTrialMixin from ax.runners.synthetic import SyntheticRunner from ax.service.scheduler import ( @@ -52,6 +58,7 @@ from ax.service.utils.with_db_settings_base import WithDBSettingsBase from ax.storage.json_store.encoders import runner_to_dict from ax.storage.json_store.registry import CORE_DECODER_REGISTRY, CORE_ENCODER_REGISTRY +from ax.storage.metric_registry import CORE_METRIC_REGISTRY from ax.storage.runner_registry import CORE_RUNNER_REGISTRY from ax.storage.sqa_store.db import init_test_engine_and_session_factory from ax.storage.sqa_store.decoder import Decoder @@ -64,14 +71,19 @@ from ax.utils.common.timeutils import current_timestamp_in_millis from ax.utils.common.typeutils import checked_cast, not_none from ax.utils.testing.core_stubs import ( + CustomTestMetric, + CustomTestRunner, DummyEarlyStoppingStrategy, DummyGlobalStoppingStrategy, get_branin_experiment, get_branin_experiment_with_multi_objective, get_branin_experiment_with_timestamp_map_metric, + get_branin_metric, get_branin_multi_objective_optimization_config, get_branin_search_space, + get_experiment_with_custom_runner_and_metric, get_generator_run, + get_outcome_constraint, get_sobol, SpecialGenerationStrategy, ) @@ -352,6 +364,52 @@ def _get_generation_strategy_strategy_for_test( ) -> GenerationStrategyInterface: return not_none(generation_strategy) + def _get_batch_hitl_gs(self) -> GenerationStrategy: + step_model_kwargs = {"silently_filter_kwargs": True} + sobol_criterion = [ + MaxTrials( + threshold=1, + transition_to="GPEI_node", + block_gen_if_met=True, + only_in_statuses=None, + not_in_statuses=[TrialStatus.FAILED, TrialStatus.ABANDONED], + ), + MinTrials( + threshold=1, + transition_to="GPEI_node", + block_gen_if_met=True, + only_in_statuses=[ + TrialStatus.RUNNING, + TrialStatus.COMPLETED, + TrialStatus.EARLY_STOPPED, + ], + ), + ] + sobol_model_spec = ModelSpec( + model_enum=Models.SOBOL, + model_kwargs=step_model_kwargs, + model_gen_kwargs={}, + ) + gpei_model_spec = ModelSpec( + model_enum=Models.GPEI, + model_kwargs=step_model_kwargs, + model_gen_kwargs={}, + ) + sobol_node = GenerationNode( + node_name="sobol_node", + transition_criteria=sobol_criterion, + model_specs=[sobol_model_spec], + ) + gpei_node = GenerationNode( + node_name="GPEI_node", + transition_criteria=[], + model_specs=[gpei_model_spec], + ) + return GenerationStrategy( + name="Sobol+GPEI_Nodes", + nodes=[sobol_node, gpei_node], + ) + @property def runner_registry(self) -> Dict[Type[Runner], int]: return { @@ -367,6 +425,7 @@ def runner_registry(self) -> Dict[Type[Runner], int]: SyntheticRunnerWithSingleRunningTrial: 2007, SyntheticRunnerWithPredictableStatusPolling: 2008, RunnerToAllowMultipleMapMetricFetches: 2009, + CustomTestRunner: 2010, **CORE_RUNNER_REGISTRY, } @@ -385,10 +444,18 @@ def db_config(self) -> SQAConfig: json_encoder_registry=encoder_registry, json_decoder_registry=decoder_registry, runner_registry=self.runner_registry, + metric_registry={ + CustomTestMetric: 3000, + **CORE_METRIC_REGISTRY, + }, ) @property def db_settings(self) -> DBSettings: + """If db_settings in used on scheduler, it is expected that the + test calls `init_test_engine_and_session_factory(force_init=True)` + prior to instantiating the scheduler. + """ config = self.db_config encoder = Encoder(config=config) decoder = Decoder(config=config) @@ -2238,3 +2305,202 @@ def test_update_options_with_validate_metrics(self) -> None: ".*Metrics {'branin'} do not implement fetching logic.", ): scheduler.options = SchedulerOptions(total_trials=10, validate_metrics=True) + + def test_generate_candidates_works_for_sobol(self) -> None: + init_test_engine_and_session_factory(force_init=True) + # GIVEN a scheduler using a GS with GPEI + gs = self._get_generation_strategy_strategy_for_test( + experiment=self.branin_experiment, + generation_strategy=self._get_batch_hitl_gs(), + ) + # this is a HITL experiment, so we don't want trials completing on their own. + self.branin_experiment.runner = InfinitePollRunner() + options = SchedulerOptions( + init_seconds_between_polls=0, # No wait bw polls so test is fast. + batch_size=10, + trial_type=TrialType.BATCH_TRIAL, + ) + scheduler = Scheduler( + experiment=self.branin_experiment, + generation_strategy=gs, + options=options, + db_settings=self.db_settings, + ) + + # WHEN generating candidates on a new experiment + scheduler.generate_candidates(num_trials=1) + + # THEN the experiment should have a Sobol generated trial in the database + scheduler = Scheduler.from_stored_experiment( + experiment_name=self.branin_experiment.name, + options=options, + db_settings=self.db_settings, + ) + self.assertEqual(len(scheduler.experiment.trials), 1) + self.assertEqual( + len(scheduler.experiment.trial_indices_by_status[TrialStatus.CANDIDATE]), 1 + ) + candidate_trial = scheduler.experiment.trials[0] + self.assertEqual(len(candidate_trial.generator_runs), 1) + self.assertEqual( + candidate_trial.generator_runs[0]._model_key, + Models.SOBOL.value, + ) + self.assertEqual( + len(candidate_trial.arms), + options.batch_size, + ) + + def test_generate_candidates_works_for_iteration(self) -> None: + # GIVEN a scheduler using a GS with GPEI + gs = self._get_generation_strategy_strategy_for_test( + experiment=self.branin_experiment, + generation_strategy=self._get_batch_hitl_gs(), + ) + # this is a HITL experiment, so we don't want trials completing on their own. + self.branin_experiment.runner = InfinitePollRunner() + scheduler = Scheduler( + experiment=self.branin_experiment, + generation_strategy=gs, + options=SchedulerOptions( + init_seconds_between_polls=0, # No wait bw polls so test is fast. + batch_size=10, + trial_type=TrialType.BATCH_TRIAL, + ), + db_settings=self.db_settings_if_always_needed, + ) + # AND GIVEN a sobol trial is running + scheduler.run(max_new_trials=1) + # if there is already data, the test doesn't prove that + # `generate_candidates()` fetches + self.assertTrue(scheduler.experiment.lookup_data().df.empty) + + # WHEN generating candidates + scheduler.generate_candidates(num_trials=1) + + # THEN the experiment should have a GPEI generated trial + self.assertFalse(scheduler.experiment.lookup_data().df.empty) + self.assertEqual( + len(scheduler.experiment.trials), 2, str(scheduler.experiment.trials) + ) + self.assertEqual( + len(scheduler.experiment.running_trial_indices), + 1, + str(scheduler.experiment.trials), + ) + self.assertEqual( + len(scheduler.experiment.trial_indices_by_status[TrialStatus.CANDIDATE]), 1 + ) + self.assertEqual( + scheduler.experiment.trials[1].generator_runs[0]._model_key, + Models.GPEI.value, + ) + + def test_generate_candidates_does_not_generate_if_missing_data(self) -> None: + # GIVEN a scheduler that can't fetch data + experiment = get_experiment_with_custom_runner_and_metric(num_trials=0) + gs = self._get_generation_strategy_strategy_for_test( + experiment=experiment, + generation_strategy=self._get_batch_hitl_gs(), + ) + experiment.runner = InfinitePollRunner() + scheduler = Scheduler( + experiment=experiment, + generation_strategy=gs, + options=SchedulerOptions( + init_seconds_between_polls=0, # No wait bw polls so test is fast. + batch_size=10, + trial_type=TrialType.BATCH_TRIAL, + validate_metrics=False, + ), + db_settings=self.db_settings_if_always_needed, + ) + # AND GIVEN a sobol trial is running + scheduler.run(max_new_trials=1) + # assert `run()` worked without fetching data + self.assertEqual(len(scheduler.experiment.running_trial_indices), 1) + self.assertTrue(scheduler.experiment.lookup_data().df.empty) + + # WHEN generating candidates + scheduler.generate_candidates(num_trials=1) + + # THEN the experiment should have no new trials + self.assertTrue(scheduler.experiment.lookup_data().df.empty) + self.assertEqual(len(scheduler.experiment.trials), 1) + + def test_generate_candidates_does_not_generate_if_missing_opt_config(self) -> None: + # GIVEN a scheduler using a GS with GPEI + experiment = get_branin_experiment(has_optimization_config=False) + # this is a HITL experiment, so we don't want trials completing on their own. + experiment.runner = InfinitePollRunner() + experiment.add_tracking_metric(get_branin_metric()) + gs = self._get_generation_strategy_strategy_for_test( + experiment=experiment, + generation_strategy=self._get_batch_hitl_gs(), + ) + scheduler = Scheduler( + experiment=experiment, + generation_strategy=gs, + options=SchedulerOptions( + init_seconds_between_polls=0, # No wait bw polls so test is fast. + batch_size=10, + trial_type=TrialType.BATCH_TRIAL, + ), + db_settings=self.db_settings_if_always_needed, + ) + # AND GIVEN a sobol trial is running + scheduler.run(max_new_trials=1) + # assert `run()` worked + self.assertEqual(len(scheduler.experiment.running_trial_indices), 1) + + # WHEN generating candidates + scheduler.generate_candidates(num_trials=1) + + # THEN the experiment should have not generated candidates + self.assertEqual(len(scheduler.experiment.trials), 1) + + def test_generate_candidates_does_not_generate_if_overconstrained(self) -> None: + # GIVEN a scheduler using a GS with GPEI + gs = self._get_generation_strategy_strategy_for_test( + experiment=self.branin_experiment, + generation_strategy=self._get_batch_hitl_gs(), + ) + # this is a HITL experiment, so we don't want trials completing on their own. + self.branin_experiment.runner = InfinitePollRunner() + scheduler = Scheduler( + experiment=self.branin_experiment, + generation_strategy=gs, + options=SchedulerOptions( + init_seconds_between_polls=0, # No wait bw polls so test is fast. + batch_size=10, + trial_type=TrialType.BATCH_TRIAL, + ), + db_settings=self.db_settings_if_always_needed, + ) + # AND GIVEN a sobol trial is running + scheduler.run(max_new_trials=1) + # assert `run()` worked + self.assertEqual(len(scheduler.experiment.running_trial_indices), 1) + # AND GIVEN the optimization config is overconstrained + self.branin_experiment.optimization_config.outcome_constraints = [ + get_outcome_constraint( + metric=get_branin_metric(name="branin_constraint"), + bound=20, + relative=True, + ) + ] + self.assertTrue(scheduler.experiment.lookup_data().df.empty) + + # WHEN generating candidates + scheduler.generate_candidates(num_trials=1) + + # THEN the experiment should have a GPEI generated trial + self.assertFalse(scheduler.experiment.lookup_data().df.empty) + self.assertEqual( + len(scheduler.experiment.trials), 1, str(scheduler.experiment.trials) + ) + self.assertEqual( + len(scheduler.experiment.running_trial_indices), + 1, + str(scheduler.experiment.trials), + ) diff --git a/ax/utils/testing/core_stubs.py b/ax/utils/testing/core_stubs.py index 8e7d596b0e6..8cc20018993 100644 --- a/ax/utils/testing/core_stubs.py +++ b/ax/utils/testing/core_stubs.py @@ -94,8 +94,9 @@ from ax.metrics.dict_lookup import DictLookupMetric from ax.metrics.factorial import FactorialMetric from ax.metrics.hartmann6 import AugmentedHartmann6Metric, Hartmann6Metric -from ax.modelbridge.factory import Cont_X_trans, get_factorial, get_sobol -from ax.modelbridge.generation_strategy import GenerationStrategy +from ax.modelbridge.factory import Cont_X_trans, get_factorial, get_sobol, Models +from ax.modelbridge.generation_strategy import GenerationNode, GenerationStrategy +from ax.modelbridge.model_spec import ModelSpec from ax.modelbridge.transition_criterion import ( MaxGenerationParallelism, MaxTrials, @@ -1551,9 +1552,13 @@ def get_objective_threshold( ) -def get_outcome_constraint(relative: bool = True) -> OutcomeConstraint: +def get_outcome_constraint( + metric: Optional[Metric] = None, relative: bool = True, bound: float = -0.25 +) -> OutcomeConstraint: + if metric is None: + metric = Metric(name="m2") return OutcomeConstraint( - metric=Metric(name="m2"), op=ComparisonOp.GEQ, bound=-0.25, relative=relative + metric=metric, op=ComparisonOp.GEQ, bound=bound, relative=relative ) @@ -2399,6 +2404,45 @@ def get_dataset( ) +def get_sobol_gpei_generation_strategy(sobol_steps: int = 1) -> GenerationStrategy: + # Set up the node-based generation strategy for testing. + step_model_kwargs = {"silently_filter_kwargs": True} + sobol_criterion = [ + MaxTrials( + threshold=1, + transition_to="GPEI_node", + block_gen_if_met=True, + only_in_statuses=None, + not_in_statuses=[TrialStatus.FAILED, TrialStatus.ABANDONED], + ) + ] + gpei_criterion = [] + sobol_model_spec = ModelSpec( + model_enum=Models.SOBOL, + model_kwargs=step_model_kwargs, + model_gen_kwargs={}, + ) + gpei_model_spec = ModelSpec( + model_enum=Models.GPEI, + model_kwargs=step_model_kwargs, + model_gen_kwargs={}, + ) + sobol_node = GenerationNode( + node_name="sobol_node", + transition_criteria=sobol_criterion, + model_specs=[sobol_model_spec], + ) + gpei_node = GenerationNode( + node_name="GPEI_node", + transition_criteria=gpei_criterion, + model_specs=[gpei_model_spec], + ) + return GenerationStrategy( + name="Sobol+GPEI_Nodes", + nodes=[sobol_node, gpei_node], + ) + + ############################## # Custom runner and metric ##############################