Skip to content

Commit

Permalink
Enable pickle (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
Weisu Yin authored May 10, 2023
1 parent 311e9e0 commit 054878f
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 82 deletions.
4 changes: 2 additions & 2 deletions .github/workflow_scripts/test_cloud.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ set -ex

source $(dirname "$0")/env_setup.sh

install_cloud_test

if [ $MODULE = "tabular" ]
then
install_tabular $AG_VERSION
Expand All @@ -17,4 +15,6 @@ then
install_multimodal $AG_VERSION
fi

install_cloud_test

python3 -m pytest -n 2 --junitxml=results.xml tests/unittests/$MODULE/ --framework_version $AG_VERSION
11 changes: 10 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ def default_setup_args(*, version):

extras_require = dict()

test_requirements = ["tox", "pytest", "pytest-cov", "moto[all]"]
all_requires = ["autogluon>=0.7,<1.0"] # To allow user to pass ag objects
extras_require["all"] = all_requires

test_requirements = [
"tox",
"pytest",
"pytest-cov",
"moto[all]",
"autogluon.common>=0.7.0b,<1.0",
] # Install pre-release of common for testing

test_requirements = list(set(test_requirements))
extras_require["tests"] = test_requirements
Expand Down
19 changes: 19 additions & 0 deletions src/autogluon/cloud/backend/backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import pickle
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -68,6 +69,24 @@ def get_fit_job_info(self) -> Dict[str, Any]:
"""
raise NotImplementedError

def prepare_args(self, path: str, **kwargs):
"""
prepare parameter args required to be passed to remote AG, i.e. init args and fit args
The args will be saved as a pickle object
Parameters
----------
path: str
Path to save the pickle file
"""
assert self.predictor_type is not None
config = self._construct_ag_args(**kwargs)
with open(path, "wb") as f:
pickle.dump(config, f)

def _construct_ag_args(**kwargs):
raise NotImplementedError

@abstractmethod
def fit(self, **kwargs) -> None:
"""Fit AG on the backend"""
Expand Down
17 changes: 6 additions & 11 deletions src/autogluon/cloud/backend/ray_backend.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
from __future__ import annotations

import copy
import json
import logging
import os
import shutil
import time
from typing import Any, Dict, List, Optional, Tuple, Union

import pandas as pd
import yaml
from sagemaker import image_uris

from autogluon.common.utils.s3_utils import s3_bucket_prefix_to_path, s3_path_to_bucket_prefix
Expand Down Expand Up @@ -215,7 +213,10 @@ def fit(
image_uri = self._get_image_uri(
framework_version=framework_version, custom_image_uri=custom_image_uri, instance_type=instance_type
)
self._construct_ag_args(predictor_init_args=predictor_init_args, predictor_fit_args=predictor_fit_args)
ag_args_path = os.path.join(self.local_output_path, "job", "ag_args.pkl")
self.prepare_args(
path=ag_args_path, predictor_init_args=predictor_init_args, predictor_fit_args=predictor_fit_args
)
train_script = ScriptManager.get_train_script(backend_type=self.name, framework_version=framework_version)
job_path = os.path.join(self.local_output_path, "job")
shutil.copy(train_script, job_path)
Expand Down Expand Up @@ -262,9 +263,7 @@ def fit(
job_name = CLOUD_RESOURCE_PREFIX + "-" + get_utc_timestamp_now()
job = RayFitJob(output_path=self.cloud_output_path + "/model")

predictor_init_args = json.dumps(predictor_init_args)
predictor_fit_args = json.dumps(predictor_fit_args)
entry_point_command = f"python3 {os.path.basename(train_script)} --ag_args_path ag_args.yaml --train_data {train_data} --model_output_path {self.get_fit_job_output_path()} --ray_job_id {job_name}" # noqa: E501
entry_point_command = f"python3 {os.path.basename(train_script)} --ag_args_path {os.path.basename(ag_args_path)} --train_data {train_data} --model_output_path {self.get_fit_job_output_path()} --ray_job_id {job_name}" # noqa: E501
if tune_data is not None:
entry_point_command += f" --tune_data {tune_data}"
if leaderboard:
Expand Down Expand Up @@ -396,16 +395,12 @@ def _get_image_uri(self, framework_version: str, instance_type: str, custom_imag
return image_uri

def _construct_ag_args(self, predictor_init_args, predictor_fit_args, **kwargs):
assert self.predictor_type is not None
config = dict(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
**kwargs,
)
path = os.path.join(self.local_output_path, "job", "ag_args.yaml")
with open(path, "w") as f:
yaml.dump(config, f)
return path
return config

def _upload_data(
self, train_data: Union[str, pd.DataFrame], tune_data: Optional[Union[str, pd.DataFrame]] = None
Expand Down
26 changes: 12 additions & 14 deletions src/autogluon/cloud/backend/sagemaker_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import pandas as pd
import sagemaker
import yaml
from botocore.exceptions import ClientError
from sagemaker import Predictor

Expand Down Expand Up @@ -314,18 +313,19 @@ def fit(
# Avoid user passing in source_dir without specifying entry point
autogluon_sagemaker_estimator_kwargs.pop("source_dir", None)

config_args = dict(
ag_args = dict(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
leaderboard=leaderboard,
)
if image_column is not None:
config_args["image_column"] = image_column
config = self._construct_config(**config_args)
ag_args["image_column"] = image_column
ag_args_path = os.path.join(self.local_output_path, "utils", "ag_args.pkl")
self.prepare_args(path=ag_args_path, **ag_args)
inputs = self._upload_fit_artifact(
train_data=train_data,
tune_data=tune_data,
config=config,
ag_args=ag_args_path,
image_column=image_column,
serving_script=ScriptManager.get_serve_script(
backend_type=self.name, framework_version=framework_version
Expand Down Expand Up @@ -928,19 +928,15 @@ def download_predict_results(self, job_name: Optional[str] = None, save_path: Op

return results_save_path

def _construct_config(self, predictor_init_args, predictor_fit_args, leaderboard, **kwargs):
assert self.predictor_type is not None
def _construct_ag_args(self, predictor_init_args, predictor_fit_args, leaderboard, **kwargs):
config = dict(
predictor_type=self.predictor_type,
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
leaderboard=leaderboard,
**kwargs,
)
path = os.path.join(self.local_output_path, "utils", "config.yaml")
with open(path, "w") as f:
yaml.dump(config, f)
return path
return config

def _prepare_data(self, data, filename, output_type="csv"):
path = os.path.join(self.local_output_path, "utils")
Expand All @@ -958,7 +954,7 @@ def _upload_fit_artifact(
self,
train_data,
tune_data,
config,
ag_args,
serving_script,
image_column=None,
):
Expand Down Expand Up @@ -1003,7 +999,9 @@ def _upload_fit_artifact(
)
logger.log(20, "Tune data uploaded successfully")

config_input = self.sagemaker_session.upload_data(path=config, bucket=cloud_bucket, key_prefix=util_key_prefix)
ag_args_input = self.sagemaker_session.upload_data(
path=ag_args, bucket=cloud_bucket, key_prefix=util_key_prefix
)

serving_input = self.sagemaker_session.upload_data(
path=serving_script, bucket=cloud_bucket, key_prefix=util_key_prefix
Expand All @@ -1015,7 +1013,7 @@ def _upload_fit_artifact(
tune_images_input = self._upload_fit_image_artifact(
image_dir_path=common_tune_data_path, bucket=cloud_bucket, key_prefix=util_key_prefix
)
inputs = dict(train=train_input, config=config_input, serving=serving_input)
inputs = dict(train=train_input, ag_args=ag_args_input, serving=serving_input)
if tune_input is not None:
inputs["tune"] = tune_input
if train_images_input is not None:
Expand Down
30 changes: 0 additions & 30 deletions src/autogluon/cloud/backend/tabular_sagemaker_backend.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,6 @@
import copy
import os

import yaml

from .constant import TABULAR_SAGEMAKER
from .sagemaker_backend import SagemakerBackend


class TabularSagemakerBackend(SagemakerBackend):
name = TABULAR_SAGEMAKER

def _construct_config(self, predictor_init_args, predictor_fit_args, leaderboard, **kwargs):
assert self.predictor_type is not None
if "feature_metadata" in predictor_fit_args:
predictor_fit_args = copy.deepcopy(predictor_fit_args)
feature_metadata = predictor_fit_args.pop("feature_metadata")
feature_metadata = dict(
type_map_raw=feature_metadata.type_map_raw,
type_map_special=feature_metadata.get_type_map_special(),
)
assert (
"feature_metadata" not in kwargs
), "feature_metadata in both `predictor_fit_args` and kwargs. This should not happen."
kwargs["feature_metadata"] = feature_metadata
config = dict(
predictor_type=self.predictor_type,
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
leaderboard=leaderboard,
**kwargs,
)
path = os.path.join(self.local_output_path, "utils", "config.yaml")
with open(path, "w") as f:
yaml.dump(config, f)
return path
6 changes: 3 additions & 3 deletions src/autogluon/cloud/scripts/ray_scripts/train.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import argparse
import os
import pickle
import shutil
import time
from datetime import datetime, timezone
from typing import Optional

import boto3
import ray
import yaml

from autogluon.common.utils.s3_utils import s3_path_to_bucket_prefix
from autogluon.tabular import TabularDataset, TabularPredictor
Expand Down Expand Up @@ -90,8 +90,8 @@ def upload_file(file_name: str, bucket: str, prefix: Optional[str] = None):
tune_data = None
if args.tune_data is not None:
tune_data = TabularDataset(args.tune_data)
with open(args.ag_args_path) as f:
ag_args = yaml.safe_load(f)
with open(args.ag_args_path, "rb") as f:
ag_args = pickle.load(f)
predictor_init_args = ag_args["predictor_init_args"]
predictor_fit_args = ag_args["predictor_fit_args"]
save_path = f"ag_distributed_training_{get_utc_timestamp_now()}"
Expand Down
39 changes: 19 additions & 20 deletions src/autogluon/cloud/scripts/sagemaker_scripts/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import shutil
from pprint import pprint

import yaml
import pickle

from autogluon.common.loaders import load_pd
from autogluon.tabular import TabularPredictor, TabularDataset, FeatureMetadata
from autogluon.tabular import TabularPredictor, TabularDataset


def get_input_path(path):
Expand Down Expand Up @@ -81,7 +81,7 @@ def prepare_data(data_file, predictor_type, predictor_init_args=None):
parser.add_argument(
"--tune_images", type=str, required=False, default=get_env_if_present("SM_CHANNEL_TUNE_IMAGES")
)
parser.add_argument("--ag_config", type=str, default=get_env_if_present("SM_CHANNEL_CONFIG"))
parser.add_argument("--ag_args", type=str, default=get_env_if_present("SM_CHANNEL_AG_ARGS"))
parser.add_argument("--serving_script", type=str, default=get_env_if_present("SM_CHANNEL_SERVING"))

args, _ = parser.parse_known_args()
Expand All @@ -91,30 +91,28 @@ def prepare_data(data_file, predictor_type, predictor_init_args=None):
# See SageMaker-specific environment variables: https://sagemaker.readthedocs.io/en/stable/overview.html#prepare-a-training-script
os.makedirs(args.output_data_dir, mode=0o777, exist_ok=True)

config_file = get_input_path(args.ag_config)
with open(config_file) as f:
config = yaml.safe_load(f) # AutoGluon-specific config
ag_args_file = get_input_path(args.ag_args)
with open(ag_args_file, "rb") as f:
ag_args = pickle.load(f) # AutoGluon-specific args

if args.n_gpus:
config["num_gpus"] = int(args.n_gpus)
ag_args["num_gpus"] = int(args.n_gpus)

print("Running training job with the config:")
pprint(config)
pprint(ag_args)

# ---------------------------------------------------------------- Training
save_path = os.path.normpath(args.model_dir)
predictor_type = config["predictor_type"]
predictor_init_args = config["predictor_init_args"]
predictor_type = ag_args["predictor_type"]
predictor_init_args = ag_args["predictor_init_args"]
predictor_init_args["path"] = save_path
predictor_fit_args = config["predictor_fit_args"]
predictor_fit_args = ag_args["predictor_fit_args"]
valid_predictor_types = ["tabular", "multimodal", "timeseries"]
assert (
predictor_type in valid_predictor_types
), f"predictor_type {predictor_type} not supported. Valid options are {valid_predictor_types}"
if predictor_type == "tabular":
predictor_cls = TabularPredictor
if "feature_meatadata" in predictor_fit_args:
predictor_fit_args["feature_meatadata"] = FeatureMetadata(**predictor_fit_args["feature_meatadata"])
elif predictor_type == "multimodal":
from autogluon.multimodal import MultiModalPredictor

Expand All @@ -127,11 +125,12 @@ def prepare_data(data_file, predictor_type, predictor_init_args=None):
train_file = get_input_path(args.train_dir)
training_data = prepare_data(train_file, predictor_type, predictor_init_args)

if predictor_type == "tabular" and "image_column" in config:
if predictor_type == "tabular" and "image_column" in ag_args:
feature_metadata = predictor_fit_args.get("feature_metadata", None)
if feature_metadata is None:
feature_metadata = FeatureMetadata.from_df(training_data)
feature_metadata = feature_metadata.add_special_types({config["image_column"]: ["image_path"]})
assert (
feature_metadata is not None
), f"Detected image_column: {ag_args['image_column']} while feature metadata is not included"
feature_metadata = feature_metadata.add_special_types({ag_args["image_column"]: ["image_path"]})
predictor_fit_args["feature_metadata"] = feature_metadata

tuning_data = None
Expand All @@ -143,7 +142,7 @@ def prepare_data(data_file, predictor_type, predictor_init_args=None):
train_image_compressed_file = get_input_path(args.train_images)
train_images_dir = "train_images"
shutil.unpack_archive(train_image_compressed_file, train_images_dir)
image_column = config["image_column"]
image_column = ag_args["image_column"]
training_data[image_column] = training_data[image_column].apply(
lambda path: os.path.join(train_images_dir, path)
)
Expand All @@ -152,7 +151,7 @@ def prepare_data(data_file, predictor_type, predictor_init_args=None):
tune_image_compressed_file = get_input_path(args.tune_images)
tune_images_dir = "tune_images"
shutil.unpack_archive(tune_image_compressed_file, tune_images_dir)
image_column = config["image_column"]
image_column = ag_args["image_column"]
tuning_data[image_column] = tuning_data[image_column].apply(lambda path: os.path.join(tune_images_dir, path))

predictor = predictor_cls(**predictor_init_args).fit(training_data, tuning_data=tuning_data, **predictor_fit_args)
Expand All @@ -163,7 +162,7 @@ def prepare_data(data_file, predictor_type, predictor_init_args=None):
predictor.save(path=save_path, standalone=True)

if predictor_cls == TabularPredictor:
if config.get("leaderboard", False):
if ag_args.get("leaderboard", False):
lb = predictor.leaderboard(silent=False)
lb.to_csv(f"{args.output_data_dir}/leaderboard.csv")

Expand Down
8 changes: 7 additions & 1 deletion tests/unittests/cluster/test_distributed_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas as pd

from autogluon.cloud import TabularCloudPredictor
from autogluon.common import space
from autogluon.common.utils.s3_utils import s3_path_to_bucket_prefix


Expand All @@ -26,10 +27,15 @@ def test_distributed_training(test_helper, framework_version):
predictor_fit_args = {
"train_data": train_data,
"hyperparameters": {
"GBM": {},
"GBM": {"num_leaves": space.Int(lower=26, upper=66, default=36)},
},
"num_bag_folds": 2,
"num_bag_sets": 1,
"hyperparameter_tune_kwargs": { # HPO is not performed unless hyperparameter_tune_kwargs is specified
"num_trials": 2,
"scheduler": "local",
"searcher": "auto",
},
}

image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=False)
Expand Down
Loading

0 comments on commit 054878f

Please sign in to comment.