From f2b1ddac965db087aa388992b0f57eee96aa7eca Mon Sep 17 00:00:00 2001 From: Bernie Beckerman Date: Mon, 6 May 2024 09:04:38 -0700 Subject: [PATCH] Allow more flexible definition of which trial statuses to fit. (#2432) Summary: Changes `observations_from_data` and `observations_from_map_data` signatures, replacing arg `fit_abandoned: bool` -> `statuses_to_fit` and `statuses_to_fit_map_metric` which are both `Optional[List[TrialStatus]]`. In `observations_from_data`, these default to `None` in which case `{TrialStatus.COMPLETED}` is used for any MapMetrics, else `NON_ABANDONED_STATUSES`, i.e., all trial statuses except abandoned. In `observations_from_map_data`, `NON_ABANDONED_STATUSES` are the default for all metrics. NOTE: As of this diff, any rows of `Data.df` containing `metric_names` that don't exist on `experiment` will be filtered out ([pointer](https://www.internalfb.com/diff/D56634321?permalink=330145676765780)). Reviewed By: saitcakmak Differential Revision: D56634321 --- ax/core/base_trial.py | 7 +- ax/core/observation.py | 113 ++++++++++++++++++++++++------ ax/core/tests/test_observation.py | 73 +++++++++++++++---- ax/modelbridge/base.py | 18 ++++- ax/modelbridge/map_torch.py | 14 +++- 5 files changed, 185 insertions(+), 40 deletions(-) diff --git a/ax/core/base_trial.py b/ax/core/base_trial.py index cb83c534ae7..e0d3472113c 100644 --- a/ax/core/base_trial.py +++ b/ax/core/base_trial.py @@ -12,7 +12,7 @@ from copy import deepcopy from datetime import datetime, timedelta from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple, TYPE_CHECKING, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING, Union from ax.core.arm import Arm from ax.core.data import Data @@ -76,7 +76,8 @@ class TrialStatus(int, Enum): NOTE: Data for abandoned trials (or abandoned arms in batch trials) is not passed to the model as part of training data, unless ``fit_abandoned`` - option is specified to model bridge. + option is specified to model bridge. Additionally, data from MapMetrics is + typically excluded unless the corresponding trial is completed. """ CANDIDATE = 0 @@ -165,6 +166,8 @@ def __repr__(self) -> str: TrialStatus.EARLY_STOPPED, ] +NON_ABANDONED_STATUSES: Set[TrialStatus] = set(TrialStatus) - {TrialStatus.ABANDONED} + # pyre-fixme[24]: Generic type `Callable` expects 2 type parameters. def immutable_once_run(func: Callable) -> Callable: diff --git a/ax/core/observation.py b/ax/core/observation.py index 690d9543c52..255827bb986 100644 --- a/ax/core/observation.py +++ b/ax/core/observation.py @@ -18,9 +18,11 @@ import numpy as np import pandas as pd from ax.core.arm import Arm +from ax.core.base_trial import NON_ABANDONED_STATUSES, TrialStatus from ax.core.batch_trial import BatchTrial from ax.core.data import Data from ax.core.map_data import MapData +from ax.core.map_metric import MapMetric from ax.core.types import TCandidateMetadata, TParameterization from ax.utils.common.base import Base from ax.utils.common.constants import Keys @@ -253,7 +255,8 @@ def _observations_from_dataframe( cols: List[str], arm_name_only: bool, map_keys: Iterable[str], - include_abandoned: bool, + statuses_to_include: Set[TrialStatus], + statuses_to_include_map_metric: Set[TrialStatus], map_keys_as_parameters: bool = False, ) -> List[Observation]: """Helper method for extracting observations grouped by `cols` from `df`. @@ -262,10 +265,13 @@ def _observations_from_dataframe( experiment: Experiment with arm parameters. df: DataFrame derived from experiment Data. cols: columns used to group data into different observations. + arm_name_only: whether arm_name is the only column in `cols`. map_keys: columns that map dict-like Data e.g. `timestamp` in timeseries data, `epoch` in ML training traces. - include_abandoned: Whether data for abandoned trials and arms should - be included in the observations, returned from this function. + statuses_to_include: data from non-MapMetrics will only be included for trials + with statuses in this set. + statuses_to_include_map_metric: data from MapMetrics will only be included for + trials with statuses in this set. map_keys_as_parameters: Whether map_keys should be returned as part of the parameters of the Observation objects. @@ -287,8 +293,11 @@ def _observations_from_dataframe( arm_name = features["arm_name"] trial_index = features.get("trial_index", None) + is_arm_abandoned = False + trial_status = None if trial_index is not None: trial = experiment.trials[trial_index] + trial_status = trial.status metadata = trial._get_candidate_metadata(arm_name) or {} if Keys.TRIAL_COMPLETION_TIMESTAMP not in metadata: if trial._time_completed is not None: @@ -297,18 +306,15 @@ def _observations_from_dataframe( ).timestamp() obs_kwargs[Keys.METADATA] = metadata - if not include_abandoned and trial.status.is_abandoned: - # Exclude abandoned trials. - continue - - if not include_abandoned and isinstance(trial, BatchTrial): - # Exclude abandoned arms from batch trial's observations. + # Determine if this arm is abandoned. + is_arm_abandoned = trial.is_abandoned + if isinstance(trial, BatchTrial): if trial.index not in abandoned_arms_dict: # Same abandoned arm names to dict to avoid recomputing them # on creation of every observation. abandoned_arms_dict[trial.index] = trial.abandoned_arm_names if arm_name in abandoned_arms_dict[trial.index]: - continue + is_arm_abandoned = True obs_parameters = experiment.arms_by_name[arm_name].parameters.copy() if obs_parameters: @@ -332,6 +338,16 @@ def _observations_from_dataframe( obs_parameters[map_key] = features[map_key] else: obs_kwargs[Keys.METADATA][map_key] = features[map_key] + d = _filter_data_on_status( + df=d, + experiment=experiment, + trial_status=trial_status, + is_arm_abandoned=is_arm_abandoned, + statuses_to_include=statuses_to_include, + statuses_to_include_map_metric=statuses_to_include_map_metric, + ) + if len(d) == 0: + continue observations.append( Observation( features=ObservationFeatures(**obs_kwargs), @@ -346,6 +362,42 @@ def _observations_from_dataframe( return observations +def _filter_data_on_status( + df: pd.DataFrame, + experiment: experiment.Experiment, + trial_status: Optional[TrialStatus], + # Arms on a BatchTrial can be abandoned even if the BatchTrial is not. + # Data will be filtered out if is_arm_abandoned is True and the corresponding + # statuses_to_include does not contain TrialStatus.ABANDONED. + is_arm_abandoned: bool, + statuses_to_include: Set[TrialStatus], + statuses_to_include_map_metric: Set[TrialStatus], +) -> pd.DataFrame: + if "metric_name" not in df.columns: + raise ValueError(f"`metric_name` column is missing from {df!r}.") + dfs = [] + for g, d in df.groupby(by="metric_name"): + metric_name = g + # Filter out any metrics that are not on the experiment. + if metric_name not in experiment.metrics: + continue + metric = experiment.metrics[metric_name] + statuses_to_include_metric = ( + statuses_to_include_map_metric + if isinstance(metric, MapMetric) + else statuses_to_include + ) + if trial_status is not None and trial_status not in statuses_to_include_metric: + continue + if is_arm_abandoned and TrialStatus.ABANDONED not in statuses_to_include_metric: + continue + dfs.append(d) + if len(dfs) == 0: + return pd.DataFrame() + df = pd.concat(dfs) + return df + + def get_feature_cols(data: Data, is_map_data: bool = False) -> List[str]: feature_cols = OBS_COLS.intersection(data.df.columns) # note we use this check, rather than isinstance, since @@ -369,7 +421,10 @@ def get_feature_cols(data: Data, is_map_data: bool = False) -> List[str]: def observations_from_data( - experiment: experiment.Experiment, data: Data, include_abandoned: bool = False + experiment: experiment.Experiment, + data: Data, + statuses_to_include: Optional[Set[TrialStatus]] = None, + statuses_to_include_map_metric: Optional[Set[TrialStatus]] = None, ) -> List[Observation]: """Convert Data to observations. @@ -382,13 +437,18 @@ def observations_from_data( Args: experiment: Experiment with arm parameters. data: Data of observations. - include_abandoned: Whether data for abandoned trials and arms should - be included in the observations, returned from this function. + statuses_to_include: data from non-MapMetrics will only be included for trials + with statuses in this set. Defaults to all statuses except abandoned. + statuses_to_include_map_metric: data from MapMetrics will only be included for + trials with statuses in this set. Defaults to completed status only. Returns: List of Observation objects. """ - + if statuses_to_include is None: + statuses_to_include = NON_ABANDONED_STATUSES + if statuses_to_include_map_metric is None: + statuses_to_include_map_metric = {TrialStatus.COMPLETED} feature_cols = get_feature_cols(data) observations = [] arm_name_only = len(feature_cols) == 1 # there will always be an arm name @@ -418,8 +478,9 @@ def observations_from_data( df=complete_df, cols=feature_cols, arm_name_only=arm_name_only, + statuses_to_include=statuses_to_include, + statuses_to_include_map_metric=statuses_to_include_map_metric, map_keys=[], - include_abandoned=include_abandoned, ) ) if incomplete_df is not None: @@ -430,8 +491,9 @@ def observations_from_data( df=incomplete_df, cols=complete_feature_cols, arm_name_only=arm_name_only, + statuses_to_include=statuses_to_include, + statuses_to_include_map_metric=statuses_to_include_map_metric, map_keys=[], - include_abandoned=include_abandoned, ) ) return observations @@ -440,7 +502,8 @@ def observations_from_data( def observations_from_map_data( experiment: experiment.Experiment, map_data: MapData, - include_abandoned: bool = False, + statuses_to_include: Optional[Set[TrialStatus]] = None, + statuses_to_include_map_metric: Optional[Set[TrialStatus]] = None, map_keys_as_parameters: bool = False, limit_rows_per_metric: Optional[int] = None, limit_rows_per_group: Optional[int] = None, @@ -456,8 +519,10 @@ def observations_from_map_data( Args: experiment: Experiment with arm parameters. map_data: MapData of observations. - include_abandoned: Whether data for abandoned trials and arms should - be included in the observations, returned from this function. + statuses_to_include: data from non-MapMetrics will only be included for trials + with statuses in this set. Defaults to all statuses except abandoned. + statuses_to_include_map_metric: data from MapMetrics will only be included for + trials with statuses in this set. Defaults to all statuses except abandoned. map_keys_as_parameters: Whether map_keys should be returned as part of the parameters of the Observation objects. limit_rows_per_metric: If specified, uses MapData.subsample() with @@ -473,6 +538,10 @@ def observations_from_map_data( Returns: List of Observation objects. """ + if statuses_to_include is None: + statuses_to_include = NON_ABANDONED_STATUSES + if statuses_to_include_map_metric is None: + statuses_to_include_map_metric = NON_ABANDONED_STATUSES if limit_rows_per_metric is not None or limit_rows_per_group is not None: map_data = map_data.subsample( map_key=map_data.map_keys[0], @@ -517,7 +586,8 @@ def observations_from_map_data( cols=feature_cols, arm_name_only=arm_name_only, map_keys=map_data.map_keys, - include_abandoned=include_abandoned, + statuses_to_include=statuses_to_include, + statuses_to_include_map_metric=statuses_to_include_map_metric, map_keys_as_parameters=map_keys_as_parameters, ) ) @@ -530,7 +600,8 @@ def observations_from_map_data( cols=complete_feature_cols, arm_name_only=arm_name_only, map_keys=map_data.map_keys, - include_abandoned=include_abandoned, + statuses_to_include=statuses_to_include, + statuses_to_include_map_metric=statuses_to_include_map_metric, map_keys_as_parameters=map_keys_as_parameters, ) ) diff --git a/ax/core/tests/test_observation.py b/ax/core/tests/test_observation.py index e229c9d2d57..b76143f6896 100644 --- a/ax/core/tests/test_observation.py +++ b/ax/core/tests/test_observation.py @@ -17,7 +17,9 @@ from ax.core.data import Data from ax.core.generator_run import GeneratorRun from ax.core.map_data import MapData, MapKeyInfo +from ax.core.map_metric import MapMetric from ax.core.observation import ( + _filter_data_on_status, Observation, ObservationData, ObservationFeatures, @@ -274,15 +276,34 @@ def test_ObservationsFromData(self) -> None: } type(experiment).arms_by_name = PropertyMock(return_value=arms) type(experiment).trials = PropertyMock(return_value=trials) - type(experiment).metrics = PropertyMock(return_value={"a": "a", "b": "b"}) df = pd.DataFrame(truth)[ ["arm_name", "trial_index", "mean", "sem", "metric_name"] ] data = Data(df=df) + + with self.assertRaisesRegex(ValueError, "`metric_name` column is missing"): + observations = _filter_data_on_status( + df=df.drop(columns="metric_name"), + experiment=experiment, + trial_status=None, + is_arm_abandoned=False, + statuses_to_include=set(), + statuses_to_include_map_metric=set(), + ) + type(experiment).metrics = PropertyMock(return_value={"a": "a"}) observations = observations_from_data(experiment, data) + self.assertEqual(len(observations), 2) + # Data is filtered only to metrics attached to the experiment. + for obs in observations: + self.assertListEqual(obs.data.metric_names, ["a"]) + type(experiment).metrics = PropertyMock(return_value={"a": "a", "b": "b"}) + observations = observations_from_data(experiment, data) self.assertEqual(len(observations), 2) + self.assertListEqual(observations[0].data.metric_names, ["a", "b"]) + self.assertListEqual(observations[1].data.metric_names, ["a"]) + # Get them in the order we want for tests below if observations[0].features.parameters["x"] == 1: observations.reverse() @@ -474,8 +495,8 @@ def test_ObservationsFromMapData(self) -> None: self.assertEqual(obs.features.metadata, {"timestamp": t["timestamp"]}) def test_ObservationsFromDataAbandoned(self) -> None: - truth = { - 0.5: { + truth = [ + { "arm_name": "0_0", "parameters": {"x": 0, "y": "a", "z": 1}, "mean": 2.0, @@ -488,7 +509,20 @@ def test_ObservationsFromDataAbandoned(self) -> None: "z": 0.5, "timestamp": 50, }, - 1: { + { + "arm_name": "1_0", + "parameters": {"x": 0, "y": "a", "z": 1}, + "mean": 4.0, + "sem": 4.0, + "trial_index": 1, + "metric_name": "a", + "updated_parameters": {"x": 0, "y": "a", "z": 1}, + "mean_t": np.array([4.0]), + "covariance_t": np.array([[16.0]]), + "z": 1, + "timestamp": 100, + }, + { "arm_name": "1_0", "parameters": {"x": 0, "y": "a", "z": 1}, "mean": 4.0, @@ -501,7 +535,7 @@ def test_ObservationsFromDataAbandoned(self) -> None: "z": 1, "timestamp": 100, }, - 0.25: { + { "arm_name": "2_0", "parameters": {"x": 1, "y": "a", "z": 0.5}, "mean": 3.0, @@ -514,7 +548,7 @@ def test_ObservationsFromDataAbandoned(self) -> None: "z": 0.25, "timestamp": 25, }, - 0.75: { + { "arm_name": "2_1", "parameters": {"x": 1, "y": "b", "z": 0.75}, "mean": 3.0, @@ -527,7 +561,7 @@ def test_ObservationsFromDataAbandoned(self) -> None: "z": 0.75, "timestamp": 25, }, - } + ] arms = { # pyre-fixme[6]: For 1st param expected `Optional[str]` but got # `Union[Dict[str, Union[float, str]], Dict[str, Union[int, str]], float, @@ -536,7 +570,7 @@ def test_ObservationsFromDataAbandoned(self) -> None: # float, int, str]]` but got `Union[Dict[str, Union[float, str]], # Dict[str, Union[int, str]], float, ndarray, str]`. obs["arm_name"]: Arm(name=obs["arm_name"], parameters=obs["parameters"]) - for _, obs in truth.items() + for obs in truth } experiment = Mock() experiment._trial_indices_by_status = {status: set() for status in TrialStatus} @@ -544,7 +578,7 @@ def test_ObservationsFromDataAbandoned(self) -> None: obs["trial_index"]: ( Trial(experiment, GeneratorRun(arms=[arms[obs["arm_name"]]])) ) - for _, obs in list(truth.items())[:-1] + for obs in truth[:-1] # pyre-fixme[16]: Item `Dict` of `Union[Dict[str, typing.Union[float, # str]], Dict[str, typing.Union[int, str]], float, ndarray, str]` has no # attribute `startswith`. @@ -562,9 +596,11 @@ def test_ObservationsFromDataAbandoned(self) -> None: trials.get(2).mark_arm_abandoned(arm_name="2_1") type(experiment).arms_by_name = PropertyMock(return_value=arms) type(experiment).trials = PropertyMock(return_value=trials) - type(experiment).metrics = PropertyMock(return_value={"a": "a", "b": "b"}) + type(experiment).metrics = PropertyMock( + return_value={"a": "a", "b": MapMetric(name="b")} + ) - df = pd.DataFrame(list(truth.values()))[ + df = pd.DataFrame(truth)[ ["arm_name", "trial_index", "mean", "sem", "metric_name"] ] data = Data(df=df) @@ -574,10 +610,19 @@ def test_ObservationsFromDataAbandoned(self) -> None: obs_no_abandoned = observations_from_data(experiment, data) self.assertEqual(len(obs_no_abandoned), 2) - # 1 arm is abandoned and 1 trial is abandoned, so only 2 observations should be - # included. + # Including all statuses for non-map metrics should yield all rows except those + # corresponding to a non-map metric (4 rows). + obs_with_abandoned = observations_from_data( + experiment, data, statuses_to_include=set(TrialStatus) + ) + self.assertEqual(len(obs_with_abandoned), 4) + + # Including all statuses for all metrics should yield all rows (5 rows). obs_with_abandoned = observations_from_data( - experiment, data, include_abandoned=True + experiment, + data, + statuses_to_include=set(TrialStatus), + statuses_to_include_map_metric=set(TrialStatus), ) self.assertEqual(len(obs_with_abandoned), 4) diff --git a/ax/modelbridge/base.py b/ax/modelbridge/base.py index 05a7e4ef40e..f152a223302 100644 --- a/ax/modelbridge/base.py +++ b/ax/modelbridge/base.py @@ -18,6 +18,7 @@ from typing import Any, Dict, List, MutableMapping, Optional, Set, Tuple, Type from ax.core.arm import Arm +from ax.core.base_trial import NON_ABANDONED_STATUSES, TrialStatus from ax.core.data import Data from ax.core.experiment import Experiment from ax.core.generator_run import extract_arm_predictions, GeneratorRun @@ -269,7 +270,10 @@ def _prepare_observations( if experiment is None or data is None: return [] return observations_from_data( - experiment=experiment, data=data, include_abandoned=self._fit_abandoned + experiment=experiment, + data=data, + statuses_to_include=self.statuses_to_fit, + statuses_to_include_map_metric=self.statuses_to_fit_map_metric, ) def _transform_data( @@ -495,6 +499,18 @@ def training_in_design(self) -> List[bool]: """ return self._training_in_design + @property + def statuses_to_fit(self) -> Set[TrialStatus]: + """Statuses to fit the model on.""" + if self._fit_abandoned: + return set(TrialStatus) + return NON_ABANDONED_STATUSES + + @property + def statuses_to_fit_map_metric(self) -> Set[TrialStatus]: + """Statuses to fit the model on.""" + return {TrialStatus.COMPLETED} + @training_in_design.setter def training_in_design(self, training_in_design: List[bool]) -> None: if len(training_in_design) != len(self._training_data): diff --git a/ax/modelbridge/map_torch.py b/ax/modelbridge/map_torch.py index 0f0f1f50418..61cd56b9834 100644 --- a/ax/modelbridge/map_torch.py +++ b/ax/modelbridge/map_torch.py @@ -5,11 +5,12 @@ # pyre-strict -from typing import Any, Dict, List, Optional, Tuple, Type +from typing import Any, Dict, List, Optional, Set, Tuple, Type import numpy as np import torch +from ax.core.base_trial import TrialStatus from ax.core.data import Data from ax.core.experiment import Experiment from ax.core.map_data import MapData @@ -66,6 +67,7 @@ def __init__( optimization_config: Optional[OptimizationConfig] = None, fit_out_of_design: bool = False, fit_on_init: bool = True, + fit_abandoned: bool = False, default_model_gen_options: Optional[TConfig] = None, map_data_limit_rows_per_metric: Optional[int] = None, map_data_limit_rows_per_group: Optional[int] = None, @@ -101,6 +103,9 @@ def __init__( To fit the model afterwards, use `_process_and_transform_data` to get the transformed inputs and call `_fit_if_implemented` with the transformed inputs. + fit_abandoned: Whether data for abandoned arms or trials should be + included in model training data. If ``False``, only + non-abandoned points are returned. default_model_gen_options: Options passed down to `model.gen(...)`. map_data_limit_rows_per_metric: Subsample the map data so that the total number of rows per metric is limited by this value. @@ -135,6 +140,10 @@ def __init__( default_model_gen_options=default_model_gen_options, ) + @property + def statuses_to_fit_map_metric(self) -> Set[TrialStatus]: + return self.statuses_to_fit + @property def parameters_with_map_keys(self) -> List[str]: return self.parameters + self._map_key_features @@ -239,9 +248,10 @@ def _prepare_observations( experiment=experiment, map_data=data, # pyre-ignore[6]: Checked in __init__. map_keys_as_parameters=True, - include_abandoned=self._fit_abandoned, limit_rows_per_metric=self._map_data_limit_rows_per_metric, limit_rows_per_group=self._map_data_limit_rows_per_group, + statuses_to_include=self.statuses_to_fit, + statuses_to_include_map_metric=self.statuses_to_fit_map_metric, ) def _compute_in_design(