Skip to content

Commit

Permalink
[air] Deprecate MlflowTrainableMixin, move to setup_mlflow() function (
Browse files Browse the repository at this point in the history
…ray-project#31295)

Following ray-project#29828, this PR introduces a `setup_mlflow()` method to replace the previous `wandb_mixin` decorator.

The current mlflow mixin can't be used with Ray AIR trainers. By adding a `setup_mlflow()` function that can just be called in (data parallel) function trainers, we follow the recent changes to wandb and provide a similar interface.

`setup_mlflow` also follows the wandb integration by acting on the configuration dict. 

Lastly, this PR moves AIR integration tests to the correct subdirectory and combines the old `test_mlflow.py` into the new `test_integration_mlflow.py`.

Signed-off-by: Kai Fricke <[email protected]>
Signed-off-by: Kai Fricke <[email protected]>
Co-authored-by: Amog Kamsetty <[email protected]>
Signed-off-by: tmynn <[email protected]>
  • Loading branch information
2 people authored and tamohannes committed Jan 25, 2023
1 parent 6389359 commit 8597904
Show file tree
Hide file tree
Showing 18 changed files with 736 additions and 702 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@
"source": [
"With our `trainer_init_per_worker` complete, we can now instantiate the `HuggingFaceTrainer`. Aside from the function, we set the `scaling_config`, controlling the amount of workers and resources used, and the `datasets` we will use for training and evaluation.\n",
"\n",
"We specify the `MlflowLoggerCallback` inside the `run_config`, and pass the preprocessor we have defined earlier as an argument. The preprocessor will be included with the returned `Checkpoint`, meaning it will also be applied during inference."
"We specify the `MLflowLoggerCallback` inside the `run_config`, and pass the preprocessor we have defined earlier as an argument. The preprocessor will be included with the returned `Checkpoint`, meaning it will also be applied during inference."
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion doc/source/tune/api_docs/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ MLflow (tune.integration.mlflow)
.. autoclass:: ray.air.integrations.mlflow.MLflowLoggerCallback
:noindex:

.. autofunction:: ray.tune.integration.mlflow.mlflow_mixin
.. autofunction:: ray.air.integrations.mlflow.setup_mlflow


.. _tune-integration-mxnet:
Expand Down
687 changes: 221 additions & 466 deletions doc/source/tune/examples/tune-mlflow.ipynb

Large diffs are not rendered by default.

27 changes: 22 additions & 5 deletions python/ray/air/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,35 @@ py_test(
deps = [":ml_lib"]
)


py_test(
name = "test_keras_callback",
name = "test_integration_comet",
size = "small",
srcs = ["tests/test_keras_callback.py"],
srcs = ["tests/test_integration_comet.py"],
deps = [":ml_lib"],
tags = ["team:ml", "exclusive"],
deps = [":ml_lib"]
)

py_test(
name = "test_mlflow",
name = "test_integration_wandb",
size = "small",
srcs = ["tests/test_mlflow.py"],
srcs = ["tests/test_integration_wandb.py"],
deps = [":ml_lib"],
tags = ["team:ml", "exclusive"],
)

py_test(
name = "test_integration_mlflow",
size = "small",
srcs = ["tests/test_integration_mlflow.py"],
deps = [":ml_lib"],
tags = ["team:ml", "exclusive"]
)

py_test(
name = "test_keras_callback",
size = "small",
srcs = ["tests/test_keras_callback.py"],
tags = ["team:ml", "exclusive"],
deps = [":ml_lib"]
)
Expand Down
10 changes: 5 additions & 5 deletions python/ray/air/_internal/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def setup_mlflow(
registry_uri: Optional[str] = None,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
tracking_token=None,
tracking_token: Optional[str] = None,
create_experiment_if_not_exists: bool = True,
):
"""
Expand All @@ -62,13 +62,13 @@ def setup_mlflow(
``experiment_name`` will be used instead. This argument takes
precedence over ``experiment_name`` if both are passed in.
experiment_name: The experiment name to use for logging.
If None is passed in here, the
the MLFLOW_EXPERIMENT_NAME environment variables is used to
determine the experiment name.
If None is passed in here, the MLFLOW_EXPERIMENT_NAME environment
variable is used to determine the experiment name.
If the experiment with the name already exists with MLflow,
it will be reused. If not, a new experiment will be created
with the provided name if
``create_experiment_if_not_exists`` is set to True.
tracking_token: Tracking token used to authenticate with MLflow.
create_experiment_if_not_exists: Whether to create an
experiment with the provided name if it does not already
exist. Defaults to True.
Expand Down Expand Up @@ -182,7 +182,7 @@ def start_run(
"""Starts a new run and possibly sets it as the active run.
Args:
tags (Optional[Dict]): Tags to set for the new run.
tags: Tags to set for the new run.
set_active: Whether to set the new run as the active run.
If an active run already exists, then that run is returned.
Expand Down
185 changes: 184 additions & 1 deletion python/ray/air/integrations/mlflow.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,193 @@
import logging
from typing import Dict, Optional
import warnings
from types import ModuleType
from typing import Dict, Optional, Union

import ray
from ray.air import session

from ray.air._internal.mlflow import _MLflowLoggerUtil
from ray.tune.logger import LoggerCallback
from ray.tune.result import TIMESTEPS_TOTAL, TRAINING_ITERATION
from ray.tune.experiment import Trial
from ray.util.annotations import PublicAPI

try:
import mlflow
except ImportError:
mlflow = None


logger = logging.getLogger(__name__)


class _NoopModule:
def __getattr__(self, item):
return _NoopModule()

def __call__(self, *args, **kwargs):
return None


@PublicAPI(stability="alpha")
def setup_mlflow(
config: Optional[Dict] = None,
tracking_uri: Optional[str] = None,
registry_uri: Optional[str] = None,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
tracking_token: Optional[str] = None,
create_experiment_if_not_exists: bool = False,
tags: Optional[Dict] = None,
rank_zero_only: bool = True,
) -> Union[ModuleType, _NoopModule]:
"""Set up a MLflow session.
This function can be used to initialize an MLflow session in a
(distributed) training or tuning run.
By default, the MLflow experiment ID is the Ray trial ID and the
MLlflow experiment name is the Ray trial name. These settings can be overwritten by
passing the respective keyword arguments.
The ``config`` dict is automatically logged as the run parameters (excluding the
mlflow settings).
In distributed training with Ray Train, only the zero-rank worker will initialize
mlflow. All other workers will return a noop client, so that logging is not
duplicated in a distributed run. This can be disabled by passing
``rank_zero_only=False``, which will then initialize mlflow in every training
worker.
This function will return the ``mlflow`` module or a noop module for
non-rank zero workers ``if rank_zero_only=True``. By using
``mlflow = setup_mlflow(config)`` you can ensure that only the rank zero worker
calls the mlflow API.
Args:
config: Configuration dict to be logged to mlflow as parameters.
tracking_uri: The tracking URI for MLflow tracking. If using
Tune in a multi-node setting, make sure to use a remote server for
tracking.
registry_uri: The registry URI for the MLflow model registry.
experiment_id: The id of an already created MLflow experiment.
All logs from all trials in ``tune.Tuner()`` will be reported to this
experiment. If this is not provided or the experiment with this
id does not exist, you must provide an``experiment_name``. This
parameter takes precedence over ``experiment_name``.
experiment_name: The name of an already existing MLflow
experiment. All logs from all trials in ``tune.Tuner()`` will be
reported to this experiment. If this is not provided, you must
provide a valid ``experiment_id``.
tracking_token: A token to use for HTTP authentication when
logging to a remote tracking server. This is useful when you
want to log to a Databricks server, for example. This value will
be used to set the MLFLOW_TRACKING_TOKEN environment variable on
all the remote training processes.
create_experiment_if_not_exists: Whether to create an
experiment with the provided name if it does not already
exist. Defaults to False.
tags: Tags to set for the new run.
rank_zero_only: If True, will return an initialized session only for the
rank 0 worker in distributed training. If False, will initialize a
session for all workers. Defaults to True.
Example:
Per default, you can just call ``setup_mlflow`` and continue to use
MLflow like you would normally do:
.. code-block:: python
from ray.air.integrations.mlflow import setup_mlflow
def training_loop(config):
setup_mlflow(config)
# ...
mlflow.log_metric(key="loss", val=0.123, step=0)
In distributed data parallel training, you can utilize the return value of
``setup_mlflow``. This will make sure it is only invoked on the first worker
in distributed training runs.
.. code-block:: python
from ray.air.integrations.mlflow import setup_mlflow
def training_loop(config):
mlflow = setup_mlflow(config)
# ...
mlflow.log_metric(key="loss", val=0.123, step=0)
You can also use MlFlow's autologging feature if using a training
framework like Pytorch Lightning, XGBoost, etc. More information can be
found here
(https://mlflow.org/docs/latest/tracking.html#automatic-logging).
.. code-block:: python
from ray.tune.integration.mlflow import setup_mlflow
def train_fn(config):
mlflow = setup_mlflow(config)
mlflow.autolog()
xgboost_results = xgb.train(config, ...)
"""
if not mlflow:
raise RuntimeError(
"mlflow was not found - please install with `pip install mlflow`"
)

try:
# Do a try-catch here if we are not in a train session
_session = session._get_session(warn=False)
if _session and rank_zero_only and session.get_world_rank() != 0:
return _NoopModule()

default_trial_id = session.get_trial_id()
default_trial_name = session.get_trial_name()

except RuntimeError:
default_trial_id = None
default_trial_name = None

_config = config.copy() if config else {}
mlflow_config = _config.pop("mlflow", {}).copy()

# Deprecate: 2.4
if mlflow_config:
warnings.warn(
"Passing a `mlflow` key in the config dict is deprecated and will raise an "
"error in the future. Please pass the actual arguments to `setup_mlflow()` "
"instead.",
DeprecationWarning,
)

experiment_id = experiment_id or default_trial_id
experiment_name = experiment_name or default_trial_name

# Setup mlflow
mlflow_util = _MLflowLoggerUtil()
mlflow_util.setup_mlflow(
tracking_uri=tracking_uri or mlflow_config.get("tracking_uri", None),
registry_uri=registry_uri or mlflow_config.get("registry_uri", None),
experiment_id=experiment_id or mlflow_config.get("experiment_id", None),
experiment_name=experiment_name or mlflow_config.get("experiment_name", None),
tracking_token=tracking_token or mlflow_config.get("tracking_token", None),
create_experiment_if_not_exists=create_experiment_if_not_exists,
)

mlflow_util.start_run(
run_name=experiment_name,
tags=tags or mlflow_config.get("tags", None),
set_active=True,
)
mlflow_util.log_params(_config)
return mlflow_util._mlflow


class MLflowLoggerCallback(LoggerCallback):
"""MLflow Logger to automatically log Tune results and config to MLflow.
Expand All @@ -32,6 +210,7 @@ class MLflowLoggerCallback(LoggerCallback):
that name.
tags: An optional dictionary of string keys and values to set
as tags on the run
tracking_token: Tracking token used to authenticate with MLflow.
save_artifact: If set to True, automatically save the entire
contents of the Tune local_dir as an artifact to the
corresponding run in MlFlow.
Expand Down Expand Up @@ -62,16 +241,19 @@ class MLflowLoggerCallback(LoggerCallback):
def __init__(
self,
tracking_uri: Optional[str] = None,
*,
registry_uri: Optional[str] = None,
experiment_name: Optional[str] = None,
tags: Optional[Dict] = None,
tracking_token: Optional[str] = None,
save_artifact: bool = False,
):

self.tracking_uri = tracking_uri
self.registry_uri = registry_uri
self.experiment_name = experiment_name
self.tags = tags
self.tracking_token = tracking_token
self.should_save_artifact = save_artifact

self.mlflow_util = _MLflowLoggerUtil()
Expand All @@ -92,6 +274,7 @@ def setup(self, *args, **kwargs):
tracking_uri=self.tracking_uri,
registry_uri=self.registry_uri,
experiment_name=self.experiment_name,
tracking_token=self.tracking_token,
)

if self.tags is None:
Expand Down
Loading

0 comments on commit 8597904

Please sign in to comment.