Skip to content

Commit

Permalink
New TB metric
Browse files Browse the repository at this point in the history
Summary:
Created a new, massively simplified TB metric with the intention of reaping the old TensorboardCurveMetric. The new version has much shorter source, circumvents the bespoke logic in CurveMetric (by opting to hew closer to the vanilla MapMetric), does not need the "get_ids_from_trials" function, cleanly separates log-reading from curve processing, and supports bulk_fetch_trial_data.

The new TensorboardMetric also has a new setting "smoothing" which emulates the behavior of the smoothing slider in the Tensorboard UI, which many users have requested.

Also, "curve_name" has been renamed "tag" to better reflect Tensorboard terminology.

Differential Revision: D53862120
  • Loading branch information
mpolson64 authored and facebook-github-bot committed Mar 4, 2024
1 parent b98f3be commit 66da1a7
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 5 deletions.
137 changes: 135 additions & 2 deletions ax/metrics/tensorboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,24 @@
import logging

from logging import Logger
from typing import Dict, Iterable, List, NamedTuple, Optional, Set, Union
from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Set, Union

import pandas as pd
from ax.core.map_data import MapKeyInfo
from ax.core.base_trial import BaseTrial
from ax.core.map_data import MapData, MapKeyInfo
from ax.core.map_metric import MapMetric
from ax.core.metric import Metric, MetricFetchResult
from ax.core.trial import Trial
from ax.metrics.curve import AbstractCurveMetric
from ax.utils.common.logger import get_logger
from ax.utils.common.result import Ok
from pyre_extensions import assert_is_instance

logger: Logger = get_logger(__name__)

SMOOTHING_DEFAULT = 0.6 # Default in Tensorboard UI
RUN_METADATA_KEY = "tb_log_dir"

try:
from tensorboard.backend.event_processing import (
plugin_event_multiplexer as event_multiplexer,
Expand All @@ -26,6 +35,130 @@

logging.getLogger("tensorboard").setLevel(logging.CRITICAL)

class TensorboardMetric(MapMetric):
"""A *new* `MapMetric` for getting Tensorboard metrics."""

map_key_info: MapKeyInfo[float] = MapKeyInfo(key="step", default_value=0.0)

def __init__(
self,
name: str,
tag: str,
lower_is_better: bool = True,
smoothing: float = SMOOTHING_DEFAULT,
cumulative_best: bool = False,
) -> None:
"""
Args:
name: The name of the metric.
tag: The name of the learning curve in the Tensorboard Scalars tab.
lower_is_better: If True, lower curve values are considered better.
smoothing: If > 0, apply exponential weighted mean to the curve. This
is the same postprocessing as the "smoothing" slider in the
Tensorboard UI.
cumulative_best: If True, for each trial, apply cumulative best to
the curve (i.e., if lower is better, then we return a curve
representing the cumulative min of the raw curve).
"""
super().__init__(name=name, lower_is_better=lower_is_better)

self.smoothing = smoothing
self.tag = tag
self.cumulative_best = cumulative_best

def bulk_fetch_trial_data(
self, trial: BaseTrial, metrics: List[Metric], **kwargs: Any
) -> Dict[str, MetricFetchResult]:
"""Fetch multiple metrics data for one trial, using instance attributes
of the metrics.
Returns Dict of metric_name => Result
Default behavior calls `fetch_trial_data` for each metric. Subclasses should
override this to perform trial data computation for multiple metrics.
"""
tb_metrics = [
assert_is_instance(metric, TensorboardMetric) for metric in metrics
]

trial = assert_is_instance(trial, Trial)
if trial.arm is None:
raise ValueError("Trial must have arm set.")

arm_name = trial.arm.name

mul = self._get_event_multiplexer_for_trial(trial=trial)

res = {}
for metric in tb_metrics:
records = [
{
"arm_name": arm_name,
"metric_name": tag,
self.map_key_info.key: t.step,
"mean": (
t.tensor_proto.double_val[0]
if t.tensor_proto.double_val
else t.tensor_proto.float_val[0]
),
"sem": float("nan"),
}
for run_name, tb_metrics in mul.PluginRunToTagToContent(
"scalars"
).items()
for tag in tb_metrics
if tag == metric.tag
for t in mul.Tensors(run_name, tag)
]

df = (
pd.DataFrame(records)
# If a metric has multiple records for the same arm, metric, and
# step (sometimes caused by restarts, etc) take the mean
.groupby(["arm_name", "metric_name", self.map_key_info.key])
.mean()
.reset_index()
)

# Apply per-metric post-processing
# Apply cumulative "best" (min if lower_is_better)
if metric.lower_is_better:
df["mean"] = df["mean"].cummin()
else:
df["mean"] = df["mean"].cummax()

# Apply smoothing
if metric.smoothing > 0:
df["mean"] = df["mean"].ewm(alpha=metric.smoothing).mean()

res[metric.name] = Ok(
MapData(
df=df,
map_key_infos=[self.map_key_info],
)
)

return res

def fetch_trial_data(
self, trial: BaseTrial, **kwargs: Any
) -> MetricFetchResult:
"""Fetch data for one trial."""

return self.bulk_fetch_trial_data(trial=trial, metrics=[self], **kwargs)[
self.name
]

def _get_event_multiplexer_for_trial(
self, trial: BaseTrial
) -> event_multiplexer.EventMultiplexer:
"""Get an event multiplexer with the logs for a given trial."""

mul = event_multiplexer.EventMultiplexer(max_reload_threads=20)
mul.AddRunsFromDirectory(trial.run_metadata[RUN_METADATA_KEY], None)
mul.Reload()

return mul

class TensorboardCurveMetric(AbstractCurveMetric):
"""A `CurveMetric` for getting Tensorboard curves."""

Expand Down
149 changes: 146 additions & 3 deletions ax/metrics/tests/test_tensorboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,163 @@
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from typing import Dict, Iterable, List, Optional, Union
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Sequence, Union
from unittest import mock
from unittest.mock import patch

import numpy as np
import pandas as pd
from ax.core.arm import Arm
from ax.core.base_trial import BaseTrial
from ax.core.experiment import Experiment
from ax.core.map_data import MapData
from ax.core.objective import Objective
from ax.core.optimization_config import OptimizationConfig
from ax.metrics.tensorboard import TensorboardCurveMetric
from ax.metrics.tensorboard import TensorboardCurveMetric, TensorboardMetric
from ax.runners.synthetic import SyntheticRunner
from ax.utils.common.testutils import TestCase
from ax.utils.testing.core_stubs import get_branin_search_space
from ax.utils.testing.core_stubs import get_branin_search_space, get_trial
from pyre_extensions import assert_is_instance
from tensorboard.backend.event_processing import event_multiplexer


@dataclass
class _TensorProto:
double_val: List[float]


@dataclass
class _TensorEvent:
step: int
tensor_proto: _TensorProto


def _get_fake_multiplexer(
fake_data: Sequence[float],
) -> event_multiplexer.EventMultiplexer:
mul = event_multiplexer.EventMultiplexer()

# pyre-ignore[8] Return fake tags when content is requested
mul.PluginRunToTagToContent = lambda plugin: {".": {"loss": ""}}

# pyre-ignore[8] Return fake data when tensors requested
mul.Tensors = lambda run, tag: [
_TensorEvent(step=i, tensor_proto=_TensorProto(double_val=[dat]))
for i, dat in enumerate(fake_data)
]

return mul


class TensorboardMetricTest(TestCase):
def test_fetch_trial_data(self) -> None:
fake_data = [8.0, 4.0, 2.0, 1.0]
fake_multiplexer = _get_fake_multiplexer(fake_data=fake_data)

with patch.object(
TensorboardMetric,
"_get_event_multiplexer_for_trial",
return_value=fake_multiplexer,
):
metric = TensorboardMetric(
name="loss", tag="loss", lower_is_better=True, smoothing=0
)
trial = get_trial()

result = metric.fetch_trial_data(trial=trial)

df = assert_is_instance(result.unwrap(), MapData).map_df

expected_df = pd.DataFrame(
[
{
"arm_name": "0_0",
"metric_name": "loss",
"mean": fake_data[i],
"sem": float("nan"),
"step": float(i),
}
for i in range(len(fake_data))
]
)

self.assertTrue(df.equals(expected_df))

def test_smoothing(self) -> None:
fake_data = [8.0, 4.0, 2.0, 1.0]
smoothing = 0.5
smooth_data = pd.Series(fake_data).ewm(alpha=smoothing).mean().tolist()

fake_multiplexer = _get_fake_multiplexer(fake_data=fake_data)

with patch.object(
TensorboardMetric,
"_get_event_multiplexer_for_trial",
return_value=fake_multiplexer,
):
metric = TensorboardMetric(
name="loss", tag="loss", lower_is_better=True, smoothing=smoothing
)
trial = get_trial()

result = metric.fetch_trial_data(trial=trial)

df = assert_is_instance(result.unwrap(), MapData).map_df

expected_df = pd.DataFrame(
[
{
"arm_name": "0_0",
"metric_name": "loss",
"mean": smooth_data[i],
"sem": float("nan"),
"step": float(i),
}
for i in range(len(fake_data))
]
)

self.assertTrue(df.equals(expected_df))

def test_cumulative_best(self) -> None:
fake_data = [4.0, 8.0, 2.0, 1.0]
sorted_data = pd.Series(fake_data).cummin().tolist()

fake_multiplexer = _get_fake_multiplexer(fake_data=fake_data)

with patch.object(
TensorboardMetric,
"_get_event_multiplexer_for_trial",
return_value=fake_multiplexer,
):
metric = TensorboardMetric(
name="loss",
tag="loss",
lower_is_better=True,
cumulative_best=True,
smoothing=0,
)
trial = get_trial()

result = metric.fetch_trial_data(trial=trial)

df = assert_is_instance(result.unwrap(), MapData).map_df

expected_df = pd.DataFrame(
[
{
"arm_name": "0_0",
"metric_name": "loss",
"mean": sorted_data[i],
"sem": float("nan"),
"step": float(i),
}
for i in range(len(fake_data))
]
)

self.assertTrue(df.equals(expected_df))


class TensorboardCurveMetricTest(TestCase):
Expand Down

0 comments on commit 66da1a7

Please sign in to comment.