From 6de815219458c68cf9d9f1c2c2f2f769dfe99c5d Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Thu, 2 Feb 2023 02:11:05 +0000 Subject: [PATCH 01/16] checkpoint --- src/autogluon/cloud/__init__.py | 2 +- src/autogluon/cloud/predictor/__init__.py | 1 + .../predictor/timeseries_cloud_predictor.py | 130 ++++++++++++++++++ src/autogluon/cloud/scripts/script_manager.py | 8 +- .../cloud/scripts/timeseries_serve.py | 1 + src/autogluon/cloud/scripts/train.py | 35 ++++- 6 files changed, 169 insertions(+), 8 deletions(-) create mode 100644 src/autogluon/cloud/predictor/timeseries_cloud_predictor.py create mode 100644 src/autogluon/cloud/scripts/timeseries_serve.py diff --git a/src/autogluon/cloud/__init__.py b/src/autogluon/cloud/__init__.py index e7d4a7a..8e8ccea 100644 --- a/src/autogluon/cloud/__init__.py +++ b/src/autogluon/cloud/__init__.py @@ -1,5 +1,5 @@ from autogluon.common.utils.log_utils import _add_stream_handler -from .predictor import MultiModalCloudPredictor, TabularCloudPredictor +from .predictor import MultiModalCloudPredictor, TabularCloudPredictor, TimeSeriesCloudPredictor _add_stream_handler() diff --git a/src/autogluon/cloud/predictor/__init__.py b/src/autogluon/cloud/predictor/__init__.py index 3b7eca0..1b128e3 100644 --- a/src/autogluon/cloud/predictor/__init__.py +++ b/src/autogluon/cloud/predictor/__init__.py @@ -1,2 +1,3 @@ from .multimodal_cloud_predictor import MultiModalCloudPredictor from .tabular_cloud_predictor import TabularCloudPredictor +from .timeseries_cloud_predictor import TimeSeriesCloudPredictor diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py new file mode 100644 index 0000000..e90306c --- /dev/null +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -0,0 +1,130 @@ +from __future__ import annotations +import copy +import logging +from typing import Optional, Dict, Any + +import pandas as pd + +from autogluon.common.loaders import load_pd + +from .cloud_predictor import CloudPredictor + +logger = logging.getLogger(__name__) + + +class TimeSeriesCloudPredictor(CloudPredictor): + predictor_file_name = "TimeSeriesCloudPredictor.pkl" + + @property + def predictor_type(self): + """ + Type of the underneath AutoGluon Predictor + """ + return "timeseries" + + def _get_local_predictor_cls(self): + from autogluon.timeseries import TimeSeriesPredictor + + predictor_cls = TimeSeriesPredictor + return predictor_cls + + def _preprocess_data(self, data, id_column, timestamp_column): + if isinstance(data, str): + data = load_pd.load(data) + else: + data = copy.copy(data) + cols = data.columns.to_list() + # Make sure id and timestamp columns are the first two columns + timestamp_index = cols.index(timestamp_column) + cols.insert(0, cols.pop(timestamp_index)) + id_index = cols.index(id_column) + cols.insert(0, cols.pop(id_index)) + data = data[cols] + + return data + + def fit( + self, + *, + predictor_init_args: Dict[str, Any], + predictor_fit_args: Dict[str, Any], + id_column: str, + timestamp_column: str, + framework_version: str = "latest", + job_name: Optional[str] = None, + instance_type: str = "ml.m5.2xlarge", + instance_count: int = 1, + volume_size: int = 100, + custom_image_uri: Optional[str] = None, + wait: bool = True, + autogluon_sagemaker_estimator_kwargs: Dict = None, + **kwargs, + ) -> TimeSeriesCloudPredictor: + """ + Fit the predictor with SageMaker. + This function will first upload necessary config and train data to s3 bucket. + Then launch a SageMaker training job with the AutoGluon training container. + + Parameters + ---------- + predictor_init_args: dict + Init args for the predictor + predictor_fit_args: dict + Fit args for the predictor + image_column: str, default = None + The column name in the training/tuning data that contains the image paths. + The image paths MUST be absolute paths to you local system. + framework_version: str, default = `latest` + Training container version of autogluon. + If `latest`, will use the latest available container version. + If provided a specific version, will use this version. + If `custom_image_uri` is set, this argument will be ignored. + job_name: str, default = None + Name of the launched training job. + If None, CloudPredictor will create one with prefix ag-cloudpredictor + instance_type: str, default = 'ml.m5.2xlarge' + Instance type the predictor will be trained on with SageMaker. + instance_count: int, default = 1 + Number of instance used to fit the predictor. + volumes_size: int, default = 30 + Size in GB of the EBS volume to use for storing input data during training (default: 30). + Must be large enough to store training data if File Mode is used (which is the default). + wait: bool, default = True + Whether the call should wait until the job completes + To be noticed, the function won't return immediately because there are some preparations needed prior fit. + Use `get_fit_job_status` to get job status. + autogluon_sagemaker_estimator_kwargs: dict, default = dict() + Any extra arguments needed to initialize AutoGluonSagemakerEstimator + Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework for all options + **kwargs: + Any extra arguments needed to pass to fit. + Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework.fit for all options + + Returns + ------- + `TimeSeriesCloudPredictor` object. Returns self. + """ + predictor_fit_args = copy.deepcopy(predictor_fit_args) + train_data = predictor_fit_args.pop("train_data") + tuning_data = predictor_fit_args.pop("tuning_data", None) + train_data = self._preprocess_data(data=train_data, id_column=id_column, timestamp_column=timestamp_column) + if tuning_data is not None: + tuning_data = self._preprocess_data(data=tuning_data, id_column=id_column, timestamp_column=timestamp_column) + predictor_fit_args["train_data"] = train_data + predictor_fit_args["tuning_data"] = tuning_data + print(train_data.dtypes) + print(train_data) + return super().fit( + predictor_init_args=predictor_init_args, + predictor_fit_args=predictor_fit_args, + framework_version=framework_version, + job_name=job_name, + instance_type=instance_type, + instance_count=instance_count, + volume_size=volume_size, + custom_image_uri=custom_image_uri, + wait=wait, + autogluon_sagemaker_estimator_kwargs=autogluon_sagemaker_estimator_kwargs, + **kwargs, + ) + \ No newline at end of file diff --git a/src/autogluon/cloud/scripts/script_manager.py b/src/autogluon/cloud/scripts/script_manager.py index 7ee9123..7fc78bc 100644 --- a/src/autogluon/cloud/scripts/script_manager.py +++ b/src/autogluon/cloud/scripts/script_manager.py @@ -8,18 +8,20 @@ class ScriptManager: TRAIN_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "train.py") TABULAR_SERVE_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "tabular_serve.py") MULTIMODAL_SERVE_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "multimodal_serve.py") + TIMESERIES_SERVE_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "timeseries_serve.py") _SERVE_SCRIPT_MAP = dict( tabular=TABULAR_SERVE_SCRIPT_PATH, multimodal=MULTIMODAL_SERVE_SCRIPT_PATH, + timeseries=TIMESERIES_SERVE_SCRIPT_PATH ) @classmethod def get_train_script(cls, predictor_type, framework_version): - assert predictor_type in ["tabular", "multimodal"] - # tabular, multimodal ßshare the same training script + assert predictor_type in ["tabular", "multimodal", "timeseries"] + # tabular, multimodal, timeseries share the same training script return cls.TRAIN_SCRIPT_PATH @classmethod def get_serve_script(cls, predictor_type, framework_version): - assert predictor_type in ["tabular", "multimodal"] + assert predictor_type in ["tabular", "multimodal", "timeseries"] return cls._SERVE_SCRIPT_MAP[predictor_type] diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py new file mode 100644 index 0000000..d25d49e --- /dev/null +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -0,0 +1 @@ +a = 1 \ No newline at end of file diff --git a/src/autogluon/cloud/scripts/train.py b/src/autogluon/cloud/scripts/train.py index 4915e43..a945546 100644 --- a/src/autogluon/cloud/scripts/train.py +++ b/src/autogluon/cloud/scripts/train.py @@ -4,11 +4,13 @@ # https://github.com/autogluon/autogluon/issues/2042 import argparse import os +import pandas as pd import shutil from pprint import pprint import yaml +from autogluon.common.loaders import load_pd from autogluon.tabular import TabularPredictor, TabularDataset, FeatureMetadata @@ -28,6 +30,28 @@ def get_env_if_present(name): return result +def prepare_timeseries_dataframe(df): + cols = df.columns.to_list() + id_column = cols[0] + timestamp_column = cols[1] + df[timestamp_column] = pd.to_datetime(df[timestamp_column]) + df = TimeSeriesDataFrame.from_data_frame( + df, + id_column=id_column, + timestamp_column=timestamp_column + ) + return df + + +def prepare_data(data_file, predictor_type): + if predictor_type == "timeseries": + data = load_pd.load(data_file) + data = prepare_timeseries_dataframe(data) + else: + data = TabularDataset(data_file) + return data + + if __name__ == "__main__": # Disable Autotune os.environ["MXNET_CUDNN_AUTOTUNE_DEFAULT"] = "0" @@ -74,7 +98,7 @@ def get_env_if_present(name): predictor_init_args = config["predictor_init_args"] predictor_init_args["path"] = save_path predictor_fit_args = config["predictor_fit_args"] - valid_predictor_types = ["tabular", "multimodal"] + valid_predictor_types = ["tabular", "multimodal", "timeseries"] assert ( predictor_type in valid_predictor_types ), f"predictor_type {predictor_type} not supported. Valid options are {valid_predictor_types}" @@ -84,11 +108,14 @@ def get_env_if_present(name): predictor_fit_args["feature_meatadata"] = FeatureMetadata(**predictor_fit_args["feature_meatadata"]) elif predictor_type == "multimodal": from autogluon.multimodal import MultiModalPredictor - predictor_cls = MultiModalPredictor + elif predictor_type == "timeseries": + from autogluon.timeseries import TimeSeriesPredictor, TimeSeriesDataFrame + predictor_cls = TimeSeriesPredictor train_file = get_input_path(args.train_dir) - training_data = TabularDataset(train_file) + training_data = prepare_data(train_file, predictor_type) + if predictor_type == "tabular" and "image_column" in config: feature_metadata = predictor_fit_args.get("feature_metadata", None) if feature_metadata is None: @@ -99,7 +126,7 @@ def get_env_if_present(name): tuning_data = None if args.tune_dir: tune_file = get_input_path(args.tune_dir) - tuning_data = TabularDataset(tune_file) + tuning_data = prepare_data(tune_file, predictor_type) if args.train_images: train_image_compressed_file = get_input_path(args.train_images) From 8d9409b7705209ee2f95656825228a6605baeb57 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Thu, 2 Feb 2023 22:39:12 +0000 Subject: [PATCH 02/16] checkpoint --- .../predictor/timeseries_cloud_predictor.py | 37 ++++++++++++++++--- src/autogluon/cloud/scripts/train.py | 20 ++++++++-- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index e90306c..97e99bf 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -28,19 +28,33 @@ def _get_local_predictor_cls(self): predictor_cls = TimeSeriesPredictor return predictor_cls - def _preprocess_data(self, data, id_column, timestamp_column): + def _preprocess_data( + self, + data, + id_column, + timestamp_column, + target, + static_features=None, + ): if isinstance(data, str): data = load_pd.load(data) else: data = copy.copy(data) cols = data.columns.to_list() - # Make sure id and timestamp columns are the first two columns + # Make sure id and timestamp columns are the first two columns, and target column is in the end + # This is to ensure in the container we know how to find id and timestamp columns, and whether there are static features being merged timestamp_index = cols.index(timestamp_column) cols.insert(0, cols.pop(timestamp_index)) id_index = cols.index(id_column) cols.insert(0, cols.pop(id_index)) + target_index = cols.index(target) + cols.append(cols.pop(target_index)) data = data[cols] + if static_features is not None: + # Merge static features so only one dataframe needs to be sent to remote container + data = pd.merge(data, static_features, how="left", on=id_column) + return data def fit( @@ -50,6 +64,7 @@ def fit( predictor_fit_args: Dict[str, Any], id_column: str, timestamp_column: str, + static_features: Optional[pd.DataFrame] = None, framework_version: str = "latest", job_name: Optional[str] = None, instance_type: str = "ml.m5.2xlarge", @@ -107,12 +122,24 @@ def fit( predictor_fit_args = copy.deepcopy(predictor_fit_args) train_data = predictor_fit_args.pop("train_data") tuning_data = predictor_fit_args.pop("tuning_data", None) - train_data = self._preprocess_data(data=train_data, id_column=id_column, timestamp_column=timestamp_column) + target = predictor_init_args.get("target") + train_data = self._preprocess_data( + data=train_data, + id_column=id_column, + timestamp_column=timestamp_column, + target=target, + static_features=static_features + ) if tuning_data is not None: - tuning_data = self._preprocess_data(data=tuning_data, id_column=id_column, timestamp_column=timestamp_column) + tuning_data = self._preprocess_data( + data=tuning_data, + id_column=id_column, + timestamp_column=timestamp_column, + target=target, + static_features=static_features + ) predictor_fit_args["train_data"] = train_data predictor_fit_args["tuning_data"] = tuning_data - print(train_data.dtypes) print(train_data) return super().fit( predictor_init_args=predictor_init_args, diff --git a/src/autogluon/cloud/scripts/train.py b/src/autogluon/cloud/scripts/train.py index a945546..01356b8 100644 --- a/src/autogluon/cloud/scripts/train.py +++ b/src/autogluon/cloud/scripts/train.py @@ -30,23 +30,35 @@ def get_env_if_present(name): return result -def prepare_timeseries_dataframe(df): +def prepare_timeseries_dataframe(df, predictor_init_args): + target = predictor_init_args['target'] cols = df.columns.to_list() id_column = cols[0] timestamp_column = cols[1] df[timestamp_column] = pd.to_datetime(df[timestamp_column]) + static_features = None + if target != cols[-1]: + # target is not the last column, then there are static features being merged in + target_index = cols.index(target) + static_columns = cols[target_index+1:] + static_features = df[[id_column]+static_columns].groupby([id_column], sort=False).head(1) + static_features.set_index(id_column, inplace=True) + df.drop(columns=static_columns, inplace=True) df = TimeSeriesDataFrame.from_data_frame( df, id_column=id_column, timestamp_column=timestamp_column ) + if static_features is not None: + print(static_features) + df.static_features = static_features return df -def prepare_data(data_file, predictor_type): +def prepare_data(data_file, predictor_type, predictor_init_args): if predictor_type == "timeseries": data = load_pd.load(data_file) - data = prepare_timeseries_dataframe(data) + data = prepare_timeseries_dataframe(data, predictor_init_args) else: data = TabularDataset(data_file) return data @@ -114,7 +126,7 @@ def prepare_data(data_file, predictor_type): predictor_cls = TimeSeriesPredictor train_file = get_input_path(args.train_dir) - training_data = prepare_data(train_file, predictor_type) + training_data = prepare_data(train_file, predictor_type, predictor_init_args) if predictor_type == "tabular" and "image_column" in config: feature_metadata = predictor_fit_args.get("feature_metadata", None) From ec5ababeca286e75cbb381c11325cb512e8a6aec Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Thu, 2 Feb 2023 23:55:57 +0000 Subject: [PATCH 03/16] checkpoint --- .../predictor/timeseries_cloud_predictor.py | 68 ++++++++++++++- .../cloud/scripts/timeseries_serve.py | 83 ++++++++++++++++++- 2 files changed, 148 insertions(+), 3 deletions(-) diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index 97e99bf..62f5824 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -1,7 +1,7 @@ from __future__ import annotations import copy import logging -from typing import Optional, Dict, Any +from typing import Optional, Union, Dict, Any import pandas as pd @@ -154,4 +154,68 @@ def fit( autogluon_sagemaker_estimator_kwargs=autogluon_sagemaker_estimator_kwargs, **kwargs, ) - \ No newline at end of file + + def predict_real_time( + self, + test_data: Union[str, pd.DataFrame], + id_column: str, + timestamp_column: str, + static_features: Optional[pd.DataFrame] = None, + accept: str = "application/x-parquet", + ) -> pd.DataFrame: + self._validate_predict_real_time_args(accept) + test_data = self._preprocess_data( + data=test_data, + id_column=id_column, + timestamp_column=timestamp_column, + static_features=static_features + ) + pred, _ = self._predict_real_time(test_data=test_data, accept=accept) + return pred + + def predict_proba_real_time( + self, + **kwargs + ) -> pd.DataFrame: + raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.") + + def predict( + self, + test_data: Union[str, pd.DataFrame], + id_column: str, + timestamp_column: str, + target: str, + static_features: Optional[pd.DataFrame] = None, + **kwargs, + ) -> Optional[pd.DataFrame]: + """ + test_data: str + The test data to be inferenced. + Can be a pandas.DataFrame or a local path to a csv file. + When predicting multimodality with image modality: + You need to specify `test_data_image_column`, and make sure the image column contains relative path to the image. + When predicting with only images: + Can be a local path to a directory containing the images or a local path to a single image. + test_data_image_column: Optional(str) + If test_data involves image modality, you must specify the column name corresponding to image paths. + The path MUST be an abspath + kwargs: + Refer to `CloudPredictor.predict()` + """ + test_data = self._preprocess_data( + data=test_data, + id_column=id_column, + timestamp_column=timestamp_column, + target=target, + static_features=static_features + ) + return super().predict( + test_data, + **kwargs, + ) + + def predict_proba( + self, + **kwargs, + ) -> Optional[pd.DataFrame]: + raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.") diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index d25d49e..76613bf 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -1 +1,82 @@ -a = 1 \ No newline at end of file +# flake8: noqa +import base64 +import hashlib +import os +from io import BytesIO, StringIO + +import pandas as pd +from PIL import Image + +from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor + + + +def model_fn(model_dir): + """loads model from previously saved artifact""" + model = TimeSeriesPredictor.load(model_dir) + return model + + +def prepare_timeseries_dataframe(df, predictor): + target = predictor.target + cols = df.columns.to_list() + id_column = cols[0] + timestamp_column = cols[1] + df[timestamp_column] = pd.to_datetime(df[timestamp_column]) + static_features = None + if target != cols[-1]: + # target is not the last column, then there are static features being merged in + target_index = cols.index(target) + static_columns = cols[target_index+1:] + static_features = df[[id_column]+static_columns].groupby([id_column], sort=False).head(1) + static_features.set_index(id_column, inplace=True) + df.drop(columns=static_columns, inplace=True) + df = TimeSeriesDataFrame.from_data_frame( + df, + id_column=id_column, + timestamp_column=timestamp_column + ) + if static_features is not None: + print(static_features) + df.static_features = static_features + return df + + +def transform_fn(model, request_body, input_content_type, output_content_type="application/json"): + if input_content_type == "application/x-parquet": + buf = BytesIO(request_body) + data = pd.read_parquet(buf) + + elif input_content_type == "text/csv": + buf = StringIO(request_body) + data = pd.read_csv(buf) + + elif input_content_type == "application/json": + buf = StringIO(request_body) + data = pd.read_json(buf) + + elif input_content_type == "application/jsonl": + buf = StringIO(request_body) + data = pd.read_json(buf, orient="records", lines=True) + + else: + raise ValueError(f"{input_content_type} input content type not supported.") + + data = prepare_timeseries_dataframe(data, model) + prediction = model.predict(data) + prediction = pd.DataFrame(prediction) + + if "application/x-parquet" in output_content_type: + prediction.columns = prediction.columns.astype(str) + output = prediction.to_parquet() + output_content_type = "application/x-parquet" + elif "application/json" in output_content_type: + output = prediction.to_json() + output_content_type = "application/json" + elif "text/csv" in output_content_type: + output = prediction.to_csv(index=None) + output_content_type = "text/csv" + else: + raise ValueError(f"{output_content_type} content type not supported") + + return output, output_content_type From 71f84fcc4b14898815539dcbac271221594d0e0c Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Fri, 3 Feb 2023 21:31:22 +0000 Subject: [PATCH 04/16] tests --- .../predictor/multimodal_cloud_predictor.py | 8 ++ .../predictor/timeseries_cloud_predictor.py | 75 +++++++++++++++---- .../cloud/scripts/timeseries_serve.py | 4 - tests/unittests/timeseries/test_timeseries.py | 63 +++++++++++++++- 4 files changed, 131 insertions(+), 19 deletions(-) diff --git a/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py b/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py index 750ab4f..b654e2d 100644 --- a/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py @@ -163,6 +163,14 @@ def predict( **kwargs, ) -> Optional[pd.Series]: """ + Predict using SageMaker batch transform. + When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. + If you want to minimize latency, use `predict_real_time()` instead. + This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, + then create a transformer with it, and call transform in the end. + + Parameters + ---------- test_data: str The test data to be inferenced. Can be a pandas.DataFrame or a local path to a csv file. diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index 62f5824..97e4222 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -53,6 +53,8 @@ def _preprocess_data( if static_features is not None: # Merge static features so only one dataframe needs to be sent to remote container + if isinstance(static_features, str): + static_features = load_pd.load(static_features) data = pd.merge(data, static_features, how="left", on=id_column) return data @@ -64,7 +66,7 @@ def fit( predictor_fit_args: Dict[str, Any], id_column: str, timestamp_column: str, - static_features: Optional[pd.DataFrame] = None, + static_features: Optional[Union[str, pd.DataFrame]] = None, framework_version: str = "latest", job_name: Optional[str] = None, instance_type: str = "ml.m5.2xlarge", @@ -86,9 +88,14 @@ def fit( Init args for the predictor predictor_fit_args: dict Fit args for the predictor - image_column: str, default = None - The column name in the training/tuning data that contains the image paths. - The image paths MUST be absolute paths to you local system. + id_column: str + Name of the 'item_id' column + timestamp_column: str + Name of the 'timestamp' column + static_features: Optional[pd.DataFrame] + An optional data frame describing the metadata attributes of individual items in the item index. + For more detail, please refer to `TimeSeriesDataFrame` documentation: + https://auto.gluon.ai/stable/api/autogluon.predictor.html#timeseriesdataframe framework_version: str, default = `latest` Training container version of autogluon. If `latest`, will use the latest available container version. @@ -160,14 +167,45 @@ def predict_real_time( test_data: Union[str, pd.DataFrame], id_column: str, timestamp_column: str, - static_features: Optional[pd.DataFrame] = None, + target: str, + static_features: Optional[Union[str, pd.DataFrame]] = None, accept: str = "application/x-parquet", ) -> pd.DataFrame: + """ + Predict with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required. + This is intended to provide a low latency inference. + If you want to inference on a large dataset, use `predict()` instead. + + Parameters + ---------- + test_data: Union(str, pandas.DataFrame) + The test data to be inferenced. + Can be a pandas.DataFrame or a local path to a csv file. + id_column: str + Name of the 'item_id' column + timestamp_column: str + Name of the 'timestamp' column + static_features: Optional[pd.DataFrame] + An optional data frame describing the metadata attributes of individual items in the item index. + For more detail, please refer to `TimeSeriesDataFrame` documentation: + https://auto.gluon.ai/stable/api/autogluon.predictor.html#timeseriesdataframe + target: str + Name of column that contains the target values to forecast + accept: str, default = application/x-parquet + Type of accept output content. + Valid options are application/x-parquet, text/csv, application/json + + Returns + ------- + Pandas.DataFrame + Predict results in DataFrame + """ self._validate_predict_real_time_args(accept) test_data = self._preprocess_data( data=test_data, id_column=id_column, timestamp_column=timestamp_column, + target=target, static_features=static_features ) pred, _ = self._predict_real_time(test_data=test_data, accept=accept) @@ -185,20 +223,31 @@ def predict( id_column: str, timestamp_column: str, target: str, - static_features: Optional[pd.DataFrame] = None, + static_features: Optional[Union[str, pd.DataFrame]] = None, **kwargs, ) -> Optional[pd.DataFrame]: """ + Predict using SageMaker batch transform. + When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. + If you want to minimize latency, use `predict_real_time()` instead. + This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, + then create a transformer with it, and call transform in the end. + + Parameters + ---------- test_data: str The test data to be inferenced. Can be a pandas.DataFrame or a local path to a csv file. - When predicting multimodality with image modality: - You need to specify `test_data_image_column`, and make sure the image column contains relative path to the image. - When predicting with only images: - Can be a local path to a directory containing the images or a local path to a single image. - test_data_image_column: Optional(str) - If test_data involves image modality, you must specify the column name corresponding to image paths. - The path MUST be an abspath + id_column: str + Name of the 'item_id' column + timestamp_column: str + Name of the 'timestamp' column + static_features: Optional[Union[str, pd.DataFrame]] + An optional data frame describing the metadata attributes of individual items in the item index. + For more detail, please refer to `TimeSeriesDataFrame` documentation: + https://auto.gluon.ai/stable/api/autogluon.predictor.html#timeseriesdataframe + target: str + Name of column that contains the target values to forecast kwargs: Refer to `CloudPredictor.predict()` """ diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index 76613bf..a910e1c 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -1,11 +1,7 @@ # flake8: noqa -import base64 -import hashlib -import os from io import BytesIO, StringIO import pandas as pd -from PIL import Image from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor diff --git a/tests/unittests/timeseries/test_timeseries.py b/tests/unittests/timeseries/test_timeseries.py index 3c1f51b..34f552f 100644 --- a/tests/unittests/timeseries/test_timeseries.py +++ b/tests/unittests/timeseries/test_timeseries.py @@ -1,2 +1,61 @@ -def test_timeseries(): - pass +import os +import tempfile + +from autogluon.cloud import TimeSeriesCloudPredictor + + +def test_timeseries(test_helper, framework_version): + train_data = "timeseries_train.csv" + static_features = "timeseries_static_features.csv" + id_column="item_id" + timestamp_column="timestamp" + target="target" + timestamp = test_helper.get_utc_timestamp_now() + with tempfile.TemporaryDirectory() as temp_dir: + os.chdir(temp_dir) + test_helper.prepare_data(train_data, static_features) + time_limit = 60 + + predictor_init_args = dict( + target=target + ) + + predictor_fit_args = dict( + train_data=train_data, + presets="medium_quality", + time_limit=time_limit, + ) + cloud_predictor = TimeSeriesCloudPredictor( + cloud_output_path=f"s3://autogluon-cloud-ci/test-timeseries/{timestamp}", + local_output_path="test_timeseries_cloud_predictor", + ) + training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=False) + inference_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="inference", gpu=False) + test_helper.test_basic_functionality( + cloud_predictor, + predictor_init_args, + predictor_fit_args, + train_data, + fit_kwargs=dict( + id_column=id_column, + timestamp_column=timestamp_column, + static_features=static_features, + framework_version=framework_version, + custom_image_uri=training_custom_image_uri, + ), + deploy_kwargs=dict(framework_version=framework_version, custom_image_uri=inference_custom_image_uri), + predict_kwargs=dict( + id_column=id_column, + timestamp_column=timestamp_column, + target=target, + static_features=static_features, + framework_version=framework_version, + custom_image_uri=inference_custom_image_uri + ), + predict_real_time_kwargs=dict( + id_column=id_column, + timestamp_column=timestamp_column, + target=target, + static_features=static_features, + ), + ) From 355ebfe3ce156f0215b6a4ee9916dbd434ce7693 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Fri, 3 Feb 2023 21:32:50 +0000 Subject: [PATCH 05/16] black --- .../predictor/timeseries_cloud_predictor.py | 32 +++++++++---------- src/autogluon/cloud/scripts/script_manager.py | 2 +- .../cloud/scripts/timeseries_serve.py | 11 ++----- src/autogluon/cloud/scripts/train.py | 14 ++++---- tests/unittests/timeseries/test_timeseries.py | 12 +++---- 5 files changed, 30 insertions(+), 41 deletions(-) diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index 97e4222..4447801 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -1,7 +1,8 @@ from __future__ import annotations + import copy import logging -from typing import Optional, Union, Dict, Any +from typing import Any, Dict, Optional, Union import pandas as pd @@ -27,7 +28,7 @@ def _get_local_predictor_cls(self): predictor_cls = TimeSeriesPredictor return predictor_cls - + def _preprocess_data( self, data, @@ -50,15 +51,15 @@ def _preprocess_data( target_index = cols.index(target) cols.append(cols.pop(target_index)) data = data[cols] - + if static_features is not None: # Merge static features so only one dataframe needs to be sent to remote container if isinstance(static_features, str): static_features = load_pd.load(static_features) data = pd.merge(data, static_features, how="left", on=id_column) - + return data - + def fit( self, *, @@ -135,7 +136,7 @@ def fit( id_column=id_column, timestamp_column=timestamp_column, target=target, - static_features=static_features + static_features=static_features, ) if tuning_data is not None: tuning_data = self._preprocess_data( @@ -143,7 +144,7 @@ def fit( id_column=id_column, timestamp_column=timestamp_column, target=target, - static_features=static_features + static_features=static_features, ) predictor_fit_args["train_data"] = train_data predictor_fit_args["tuning_data"] = tuning_data @@ -161,7 +162,7 @@ def fit( autogluon_sagemaker_estimator_kwargs=autogluon_sagemaker_estimator_kwargs, **kwargs, ) - + def predict_real_time( self, test_data: Union[str, pd.DataFrame], @@ -206,17 +207,14 @@ def predict_real_time( id_column=id_column, timestamp_column=timestamp_column, target=target, - static_features=static_features + static_features=static_features, ) pred, _ = self._predict_real_time(test_data=test_data, accept=accept) return pred - - def predict_proba_real_time( - self, - **kwargs - ) -> pd.DataFrame: + + def predict_proba_real_time(self, **kwargs) -> pd.DataFrame: raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.") - + def predict( self, test_data: Union[str, pd.DataFrame], @@ -256,13 +254,13 @@ def predict( id_column=id_column, timestamp_column=timestamp_column, target=target, - static_features=static_features + static_features=static_features, ) return super().predict( test_data, **kwargs, ) - + def predict_proba( self, **kwargs, diff --git a/src/autogluon/cloud/scripts/script_manager.py b/src/autogluon/cloud/scripts/script_manager.py index 7fc78bc..8162db5 100644 --- a/src/autogluon/cloud/scripts/script_manager.py +++ b/src/autogluon/cloud/scripts/script_manager.py @@ -12,7 +12,7 @@ class ScriptManager: _SERVE_SCRIPT_MAP = dict( tabular=TABULAR_SERVE_SCRIPT_PATH, multimodal=MULTIMODAL_SERVE_SCRIPT_PATH, - timeseries=TIMESERIES_SERVE_SCRIPT_PATH + timeseries=TIMESERIES_SERVE_SCRIPT_PATH, ) @classmethod diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index a910e1c..23f4d0e 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -6,7 +6,6 @@ from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor - def model_fn(model_dir): """loads model from previously saved artifact""" model = TimeSeriesPredictor.load(model_dir) @@ -23,15 +22,11 @@ def prepare_timeseries_dataframe(df, predictor): if target != cols[-1]: # target is not the last column, then there are static features being merged in target_index = cols.index(target) - static_columns = cols[target_index+1:] - static_features = df[[id_column]+static_columns].groupby([id_column], sort=False).head(1) + static_columns = cols[target_index + 1 :] + static_features = df[[id_column] + static_columns].groupby([id_column], sort=False).head(1) static_features.set_index(id_column, inplace=True) df.drop(columns=static_columns, inplace=True) - df = TimeSeriesDataFrame.from_data_frame( - df, - id_column=id_column, - timestamp_column=timestamp_column - ) + df = TimeSeriesDataFrame.from_data_frame(df, id_column=id_column, timestamp_column=timestamp_column) if static_features is not None: print(static_features) df.static_features = static_features diff --git a/src/autogluon/cloud/scripts/train.py b/src/autogluon/cloud/scripts/train.py index 01356b8..43a0cc9 100644 --- a/src/autogluon/cloud/scripts/train.py +++ b/src/autogluon/cloud/scripts/train.py @@ -31,7 +31,7 @@ def get_env_if_present(name): def prepare_timeseries_dataframe(df, predictor_init_args): - target = predictor_init_args['target'] + target = predictor_init_args["target"] cols = df.columns.to_list() id_column = cols[0] timestamp_column = cols[1] @@ -40,15 +40,11 @@ def prepare_timeseries_dataframe(df, predictor_init_args): if target != cols[-1]: # target is not the last column, then there are static features being merged in target_index = cols.index(target) - static_columns = cols[target_index+1:] - static_features = df[[id_column]+static_columns].groupby([id_column], sort=False).head(1) + static_columns = cols[target_index + 1 :] + static_features = df[[id_column] + static_columns].groupby([id_column], sort=False).head(1) static_features.set_index(id_column, inplace=True) df.drop(columns=static_columns, inplace=True) - df = TimeSeriesDataFrame.from_data_frame( - df, - id_column=id_column, - timestamp_column=timestamp_column - ) + df = TimeSeriesDataFrame.from_data_frame(df, id_column=id_column, timestamp_column=timestamp_column) if static_features is not None: print(static_features) df.static_features = static_features @@ -120,9 +116,11 @@ def prepare_data(data_file, predictor_type, predictor_init_args): predictor_fit_args["feature_meatadata"] = FeatureMetadata(**predictor_fit_args["feature_meatadata"]) elif predictor_type == "multimodal": from autogluon.multimodal import MultiModalPredictor + predictor_cls = MultiModalPredictor elif predictor_type == "timeseries": from autogluon.timeseries import TimeSeriesPredictor, TimeSeriesDataFrame + predictor_cls = TimeSeriesPredictor train_file = get_input_path(args.train_dir) diff --git a/tests/unittests/timeseries/test_timeseries.py b/tests/unittests/timeseries/test_timeseries.py index 34f552f..db6ee46 100644 --- a/tests/unittests/timeseries/test_timeseries.py +++ b/tests/unittests/timeseries/test_timeseries.py @@ -7,18 +7,16 @@ def test_timeseries(test_helper, framework_version): train_data = "timeseries_train.csv" static_features = "timeseries_static_features.csv" - id_column="item_id" - timestamp_column="timestamp" - target="target" + id_column = "item_id" + timestamp_column = "timestamp" + target = "target" timestamp = test_helper.get_utc_timestamp_now() with tempfile.TemporaryDirectory() as temp_dir: os.chdir(temp_dir) test_helper.prepare_data(train_data, static_features) time_limit = 60 - predictor_init_args = dict( - target=target - ) + predictor_init_args = dict(target=target) predictor_fit_args = dict( train_data=train_data, @@ -50,7 +48,7 @@ def test_timeseries(test_helper, framework_version): target=target, static_features=static_features, framework_version=framework_version, - custom_image_uri=inference_custom_image_uri + custom_image_uri=inference_custom_image_uri, ), predict_real_time_kwargs=dict( id_column=id_column, From 7a5d09f819c4cbb60562089de431268ea759c91a Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 18:30:35 +0000 Subject: [PATCH 06/16] fix --- src/autogluon/cloud/scripts/train.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/autogluon/cloud/scripts/train.py b/src/autogluon/cloud/scripts/train.py index 43a0cc9..4c0e9cd 100644 --- a/src/autogluon/cloud/scripts/train.py +++ b/src/autogluon/cloud/scripts/train.py @@ -51,8 +51,9 @@ def prepare_timeseries_dataframe(df, predictor_init_args): return df -def prepare_data(data_file, predictor_type, predictor_init_args): +def prepare_data(data_file, predictor_type, predictor_init_args=None): if predictor_type == "timeseries": + assert predictor_init_args is not None data = load_pd.load(data_file) data = prepare_timeseries_dataframe(data, predictor_init_args) else: From b8cd395af373e90d77c4b3f6f1d85c58986d1f48 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 18:57:04 +0000 Subject: [PATCH 07/16] fix --- tests/conftest.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 27a2720..1454d32 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,8 @@ import zipfile from datetime import datetime, timezone +from autogluon.cloud import TimeSeriesCloudPredictor + import boto3 import pandas as pd import pytest @@ -63,9 +65,12 @@ def replace_image_abspath(data, image_column): def test_endpoint(cloud_predictor, test_data, **predict_real_time_kwargs): try: pred = cloud_predictor.predict_real_time(test_data, **predict_real_time_kwargs) - assert isinstance(pred, pd.Series) - pred_proba = cloud_predictor.predict_proba_real_time(test_data, **predict_real_time_kwargs) - assert isinstance(pred_proba, pd.DataFrame) + if isinstance(cloud_predictor, TimeSeriesCloudPredictor): + assert isinstance(pred, pd.DataFrame) + else: + assert isinstance(pred, pd.Series) + pred_proba = cloud_predictor.predict_proba_real_time(test_data, **predict_real_time_kwargs) + assert isinstance(pred_proba, pd.DataFrame) except Exception as e: cloud_predictor.cleanup_deployment() # cleanup endpoint if test failed raise e @@ -110,8 +115,12 @@ def test_basic_functionality( if predict_kwargs is None: predict_kwargs = dict() - pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) - assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) + if isinstance(cloud_predictor, TimeSeriesCloudPredictor): + pred = cloud_predictor.predict(test_data, **predict_kwargs) + assert isinstance(pred, pd.DataFrame) + else: + pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) + assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) info = cloud_predictor.info() assert info["recent_transform_job"]["status"] == "Completed" From 33d811613c5e65f20e276b5daa8d4eebeb0556a5 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 19:00:50 +0000 Subject: [PATCH 08/16] isort --- tests/conftest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 1454d32..9e21ff5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,12 +2,12 @@ import zipfile from datetime import datetime, timezone -from autogluon.cloud import TimeSeriesCloudPredictor - import boto3 import pandas as pd import pytest +from autogluon.cloud import TimeSeriesCloudPredictor + class CloudTestHelper: cpu_training_image = "369469875935.dkr.ecr.us-east-1.amazonaws.com/autogluon-nightly-training:cpu-latest" From 59184ee514eb49d75181b75513507fe144af815a Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 19:49:41 +0000 Subject: [PATCH 09/16] fix --- tests/conftest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9e21ff5..9026764 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,10 +117,10 @@ def test_basic_functionality( predict_kwargs = dict() if isinstance(cloud_predictor, TimeSeriesCloudPredictor): pred = cloud_predictor.predict(test_data, **predict_kwargs) - assert isinstance(pred, pd.DataFrame) + assert pred is not None else: pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) - assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) + assert pred is not None and pred_proba is not None info = cloud_predictor.info() assert info["recent_transform_job"]["status"] == "Completed" @@ -172,7 +172,7 @@ def test_functionality( if predict_kwargs is None: predict_kwargs = dict() pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) - assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) + assert pred is not None and pred_proba is not None info = cloud_predictor.info() assert info["recent_transform_job"]["status"] == "Completed" @@ -185,7 +185,7 @@ def test_functionality( pred, pred_proba = cloud_predictor_no_train.predict_proba( test_data, predictor_path=trained_predictor_path, **predict_kwargs ) - assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) + assert pred is not None and pred_proba is not None info = cloud_predictor_no_train.info() assert info["recent_transform_job"]["status"] == "Completed" From c0ac54197ec48fae7f1e1625d3f38e73134630c0 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 20:21:33 +0000 Subject: [PATCH 10/16] fix --- tests/conftest.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9026764..e6645c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -65,10 +65,8 @@ def replace_image_abspath(data, image_column): def test_endpoint(cloud_predictor, test_data, **predict_real_time_kwargs): try: pred = cloud_predictor.predict_real_time(test_data, **predict_real_time_kwargs) - if isinstance(cloud_predictor, TimeSeriesCloudPredictor): - assert isinstance(pred, pd.DataFrame) - else: - assert isinstance(pred, pd.Series) + assert pred is not None + if not isinstance(cloud_predictor, TimeSeriesCloudPredictor): pred_proba = cloud_predictor.predict_proba_real_time(test_data, **predict_real_time_kwargs) assert isinstance(pred_proba, pd.DataFrame) except Exception as e: @@ -120,7 +118,7 @@ def test_basic_functionality( assert pred is not None else: pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) - assert pred is not None and pred_proba is not None + assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) info = cloud_predictor.info() assert info["recent_transform_job"]["status"] == "Completed" @@ -172,7 +170,7 @@ def test_functionality( if predict_kwargs is None: predict_kwargs = dict() pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) - assert pred is not None and pred_proba is not None + assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) info = cloud_predictor.info() assert info["recent_transform_job"]["status"] == "Completed" @@ -185,7 +183,7 @@ def test_functionality( pred, pred_proba = cloud_predictor_no_train.predict_proba( test_data, predictor_path=trained_predictor_path, **predict_kwargs ) - assert pred is not None and pred_proba is not None + assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame) info = cloud_predictor_no_train.info() assert info["recent_transform_job"]["status"] == "Completed" From ffd883849117494383ea2fd492952a724fec1ec2 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 22:51:15 +0000 Subject: [PATCH 11/16] copy model --- src/autogluon/cloud/scripts/timeseries_serve.py | 13 ++++++++++++- tests/unittests/general/test_full_functionality.py | 4 ++-- tests/unittests/image/test_image.py | 2 +- tests/unittests/multimodal/test_multimodal.py | 2 +- tests/unittests/tabular/test_tabular.py | 2 +- tests/unittests/text/test_text.py | 2 +- tests/unittests/timeseries/test_timeseries.py | 2 +- 7 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index 23f4d0e..e5e0bed 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -1,14 +1,25 @@ # flake8: noqa from io import BytesIO, StringIO +import os import pandas as pd +import shutil from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor def model_fn(model_dir): """loads model from previously saved artifact""" - model = TimeSeriesPredictor.load(model_dir) + # TSPredictor will write to the model file during inference while the default model_dir is read only + # Copy the model file to a writable location as a temporary workaround + tmp_model_dir = os.path.join("/tmp", "model") + try: + shutil.copytree(model_dir, tmp_model_dir, dirs_exist_ok=True) + except: + # model already copied + pass + model = TimeSeriesPredictor.load(tmp_model_dir) + print("MODEL LOADED") return model diff --git a/tests/unittests/general/test_full_functionality.py b/tests/unittests/general/test_full_functionality.py index 7362664..bebc13c 100644 --- a/tests/unittests/general/test_full_functionality.py +++ b/tests/unittests/general/test_full_functionality.py @@ -25,11 +25,11 @@ def test_full_functionality(test_helper, framework_version): time_limit=time_limit, ) cloud_predictor = TabularCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-tabular/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-tabular/{framework_version}/{timestamp}", local_output_path="test_tabular_cloud_predictor", ) cloud_predictor_no_train = TabularCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-tabular-no-train/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-tabular-no-train/{framework_version}/{timestamp}", local_output_path="test_tabular_cloud_predictor_no_train", ) training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=False) diff --git a/tests/unittests/image/test_image.py b/tests/unittests/image/test_image.py index d1cba0f..aa2b16b 100644 --- a/tests/unittests/image/test_image.py +++ b/tests/unittests/image/test_image.py @@ -21,7 +21,7 @@ def test_multimodal_image_only(test_helper, framework_version="source"): predictor_init_args = dict(label="label", eval_metric="acc") predictor_fit_args = dict(train_data=train_data, time_limit=time_limit) cloud_predictor = MultiModalCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-multimodal-image/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-multimodal-image/{framework_version}/{timestamp}", local_output_path="test_multimodal_image_cloud_predictor", ) training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=True) diff --git a/tests/unittests/multimodal/test_multimodal.py b/tests/unittests/multimodal/test_multimodal.py index ce78025..86bf82e 100644 --- a/tests/unittests/multimodal/test_multimodal.py +++ b/tests/unittests/multimodal/test_multimodal.py @@ -23,7 +23,7 @@ def test_multimodal_tabular_text_image(test_helper, framework_version): ) predictor_fit_args = dict(train_data=train_data, time_limit=time_limit) cloud_predictor = MultiModalCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-multimodal-tabular-text-image/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-multimodal-tabular-text-image/{framework_version}/{timestamp}", local_output_path="test_multimodal_tabular_text_image_cloud_predictor", ) training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=True) diff --git a/tests/unittests/tabular/test_tabular.py b/tests/unittests/tabular/test_tabular.py index 749df45..19ff5bd 100644 --- a/tests/unittests/tabular/test_tabular.py +++ b/tests/unittests/tabular/test_tabular.py @@ -33,7 +33,7 @@ def test_tabular_tabular_text_image(test_helper, framework_version): }, ) cloud_predictor = TabularCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-tabular-tabular-text-image/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-tabular-tabular-text-image/{framework_version}/{timestamp}", local_output_path="test_tabular_tabular_text_image_cloud_predictor", ) training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=True) diff --git a/tests/unittests/text/test_text.py b/tests/unittests/text/test_text.py index 4522888..8afb2be 100644 --- a/tests/unittests/text/test_text.py +++ b/tests/unittests/text/test_text.py @@ -17,7 +17,7 @@ def test_multimodal_text_only(test_helper, framework_version): predictor_init_args = dict(label="label", eval_metric="acc") predictor_fit_args = dict(train_data=train_data, tuning_data=tune_data, time_limit=time_limit) cloud_predictor = MultiModalCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-multimodal-text/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-multimodal-text/{framework_version}/{timestamp}", local_output_path="test_multimodal_text_cloud_predictor", ) training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=True) diff --git a/tests/unittests/timeseries/test_timeseries.py b/tests/unittests/timeseries/test_timeseries.py index db6ee46..d9ecc58 100644 --- a/tests/unittests/timeseries/test_timeseries.py +++ b/tests/unittests/timeseries/test_timeseries.py @@ -24,7 +24,7 @@ def test_timeseries(test_helper, framework_version): time_limit=time_limit, ) cloud_predictor = TimeSeriesCloudPredictor( - cloud_output_path=f"s3://autogluon-cloud-ci/test-timeseries/{timestamp}", + cloud_output_path=f"s3://autogluon-cloud-ci/test-timeseries/{framework_version}/{timestamp}", local_output_path="test_timeseries_cloud_predictor", ) training_custom_image_uri = test_helper.get_custom_image_uri(framework_version, type="training", gpu=False) From dcdd59dbe8ac4e73486dfb16e320f171e3d5cbef Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 6 Feb 2023 22:58:10 +0000 Subject: [PATCH 12/16] isort --- src/autogluon/cloud/scripts/timeseries_serve.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index e5e0bed..6c80c44 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -1,9 +1,9 @@ # flake8: noqa +import os +import shutil from io import BytesIO, StringIO -import os import pandas as pd -import shutil from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor From cca9d6936e1ff57b95e8da76a93dd6984fad38f1 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 13 Feb 2023 19:50:57 +0000 Subject: [PATCH 13/16] address tony commets --- src/autogluon/cloud/predictor/cloud_predictor.py | 2 ++ .../cloud/predictor/multimodal_cloud_predictor.py | 8 ++++++++ .../cloud/predictor/timeseries_cloud_predictor.py | 13 +++++++------ src/autogluon/cloud/scripts/timeseries_serve.py | 4 ++-- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/autogluon/cloud/predictor/cloud_predictor.py b/src/autogluon/cloud/predictor/cloud_predictor.py index 2580aeb..4b94bd5 100644 --- a/src/autogluon/cloud/predictor/cloud_predictor.py +++ b/src/autogluon/cloud/predictor/cloud_predictor.py @@ -1052,6 +1052,7 @@ def predict( Predict using SageMaker batch transform. When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. If you want to minimize latency, use `predict_real_time()` instead. + To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, then create a transformer with it, and call transform in the end. @@ -1152,6 +1153,7 @@ def predict_proba( Predict using SageMaker batch transform. When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. If you want to minimize latency, use `predict_real_time()` instead. + To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, then create a transformer with it, and call transform in the end. diff --git a/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py b/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py index b654e2d..9aa010a 100644 --- a/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/multimodal_cloud_predictor.py @@ -166,6 +166,7 @@ def predict( Predict using SageMaker batch transform. When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. If you want to minimize latency, use `predict_real_time()` instead. + To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, then create a transformer with it, and call transform in the end. @@ -210,6 +211,13 @@ def predict_proba( **kwargs, ) -> Optional[Union[Tuple[pd.Series, Union[pd.DataFrame, pd.Series]], Union[pd.DataFrame, pd.Series]]]: """ + Predict using SageMaker batch transform. + When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. + If you want to minimize latency, use `predict_real_time()` instead. + To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html + This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, + then create a transformer with it, and call transform in the end. + test_data: str The test data to be inferenced. Can be a pandas.DataFrame or a local path to a csv file. diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index 4447801..2fbc0a9 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -31,12 +31,12 @@ def _get_local_predictor_cls(self): def _preprocess_data( self, - data, - id_column, - timestamp_column, - target, - static_features=None, - ): + data: Union[pd.DataFrame, str], + id_column: str, + timestamp_column: str, + target: str, + static_features: Optional[Union[pd.DataFrame, str]] = None, + ) -> pd.DataFrame: if isinstance(data, str): data = load_pd.load(data) else: @@ -228,6 +228,7 @@ def predict( Predict using SageMaker batch transform. When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate. If you want to minimize latency, use `predict_real_time()` instead. + To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor, then create a transformer with it, and call transform in the end. diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index 6c80c44..0f33de6 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -18,7 +18,8 @@ def model_fn(model_dir): except: # model already copied pass - model = TimeSeriesPredictor.load(tmp_model_dir) + # model = TimeSeriesPredictor.load(tmp_model_dir) + model = TimeSeriesPredictor.load(model_dir) print("MODEL LOADED") return model @@ -39,7 +40,6 @@ def prepare_timeseries_dataframe(df, predictor): df.drop(columns=static_columns, inplace=True) df = TimeSeriesDataFrame.from_data_frame(df, id_column=id_column, timestamp_column=timestamp_column) if static_features is not None: - print(static_features) df.static_features = static_features return df From b098b3638b690ab41714221162595fa56b1bff5c Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 13 Feb 2023 21:26:10 +0000 Subject: [PATCH 14/16] fix pred --- src/autogluon/cloud/predictor/cloud_predictor.py | 5 ++++- src/autogluon/cloud/predictor/timeseries_cloud_predictor.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/autogluon/cloud/predictor/cloud_predictor.py b/src/autogluon/cloud/predictor/cloud_predictor.py index 4b94bd5..d569633 100644 --- a/src/autogluon/cloud/predictor/cloud_predictor.py +++ b/src/autogluon/cloud/predictor/cloud_predictor.py @@ -904,6 +904,7 @@ def _predict( save_path=None, model_kwargs=None, transformer_kwargs=None, + split_pred_proba=True, **kwargs, ): if not predictor_path: @@ -1024,7 +1025,9 @@ def _predict( results_path = self.download_predict_results(save_path=save_path) # Batch inference will only return json format results = pd.read_json(results_path) - pred, pred_proba = split_pred_and_pred_proba(results) + pred = results + if split_pred_proba: + pred, pred_proba = split_pred_and_pred_proba(results) if not persist: os.remove(results_path) diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index 2fbc0a9..d9d9352 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -257,10 +257,12 @@ def predict( target=target, static_features=static_features, ) - return super().predict( - test_data, + pred, _ = super()._predict( + test_data=test_data, + split_pred_proba=False, **kwargs, ) + return pred def predict_proba( self, From 14a700d0f5a9ab2e4ceb23f40d5cc15a7c7b4bf7 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 13 Feb 2023 21:27:45 +0000 Subject: [PATCH 15/16] remove check style --- tests/test_check_style.py | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 tests/test_check_style.py diff --git a/tests/test_check_style.py b/tests/test_check_style.py deleted file mode 100644 index 0ad5614..0000000 --- a/tests/test_check_style.py +++ /dev/null @@ -1,15 +0,0 @@ -import logging -import warnings -from subprocess import PIPE, Popen - - -def test_check_style(): - logging.getLogger().setLevel(logging.INFO) - logging.info("PEP8 Style check") - flake8_proc = Popen(["flake8", "--count", "--max-line-length", "300"], stdout=PIPE) - flake8_out = flake8_proc.communicate()[0] - lines = flake8_out.splitlines() - count = int(lines[-1].decode()) - if count > 0: - warnings.warn(f"{count} PEP8 warnings remaining") - assert count < 1000, "Too many PEP8 warnings found, improve code quality to pass test." From f1a93f92b099518e7307ece0b117c20838262808 Mon Sep 17 00:00:00 2001 From: Weisu Yin Date: Mon, 13 Feb 2023 22:22:46 +0000 Subject: [PATCH 16/16] fix --- src/autogluon/cloud/predictor/cloud_predictor.py | 7 +++++-- .../cloud/predictor/timeseries_cloud_predictor.py | 2 +- src/autogluon/cloud/scripts/timeseries_serve.py | 5 ++--- tests/conftest.py | 11 +++++++---- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/autogluon/cloud/predictor/cloud_predictor.py b/src/autogluon/cloud/predictor/cloud_predictor.py index d569633..717ee87 100644 --- a/src/autogluon/cloud/predictor/cloud_predictor.py +++ b/src/autogluon/cloud/predictor/cloud_predictor.py @@ -787,10 +787,13 @@ def _load_predict_real_time_test_data(self, test_data): return test_data - def _predict_real_time(self, test_data, accept, **initial_args): + def _predict_real_time(self, test_data, accept, split_pred_proba=True, **initial_args): try: prediction = self.endpoint.predict(test_data, initial_args={"Accept": accept, **initial_args}) - pred, pred_proba = split_pred_and_pred_proba(prediction) + pred, pred_proba = None, None + pred = prediction + if split_pred_proba: + pred, pred_proba = split_pred_and_pred_proba(prediction) return pred, pred_proba except ClientError as e: if e.response["Error"]["Code"] == "413": # Error code for pay load too large diff --git a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py index d9d9352..e391ac8 100644 --- a/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py +++ b/src/autogluon/cloud/predictor/timeseries_cloud_predictor.py @@ -209,7 +209,7 @@ def predict_real_time( target=target, static_features=static_features, ) - pred, _ = self._predict_real_time(test_data=test_data, accept=accept) + pred, _ = self._predict_real_time(test_data=test_data, accept=accept, split_pred_proba=False) return pred def predict_proba_real_time(self, **kwargs) -> pd.DataFrame: diff --git a/src/autogluon/cloud/scripts/timeseries_serve.py b/src/autogluon/cloud/scripts/timeseries_serve.py index 0f33de6..cf43635 100644 --- a/src/autogluon/cloud/scripts/timeseries_serve.py +++ b/src/autogluon/cloud/scripts/timeseries_serve.py @@ -14,12 +14,11 @@ def model_fn(model_dir): # Copy the model file to a writable location as a temporary workaround tmp_model_dir = os.path.join("/tmp", "model") try: - shutil.copytree(model_dir, tmp_model_dir, dirs_exist_ok=True) + shutil.copytree(model_dir, tmp_model_dir, dirs_exist_ok=False) except: # model already copied pass - # model = TimeSeriesPredictor.load(tmp_model_dir) - model = TimeSeriesPredictor.load(model_dir) + model = TimeSeriesPredictor.load(tmp_model_dir) print("MODEL LOADED") return model diff --git a/tests/conftest.py b/tests/conftest.py index e6645c2..cbc5071 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,9 +64,12 @@ def replace_image_abspath(data, image_column): @staticmethod def test_endpoint(cloud_predictor, test_data, **predict_real_time_kwargs): try: - pred = cloud_predictor.predict_real_time(test_data, **predict_real_time_kwargs) - assert pred is not None - if not isinstance(cloud_predictor, TimeSeriesCloudPredictor): + if isinstance(cloud_predictor, TimeSeriesCloudPredictor): + pred = cloud_predictor.predict_real_time(test_data, **predict_real_time_kwargs) + assert isinstance(pred, pd.DataFrame) + else: + pred = cloud_predictor.predict_real_time(test_data, **predict_real_time_kwargs) + assert isinstance(pred, pd.Series) pred_proba = cloud_predictor.predict_proba_real_time(test_data, **predict_real_time_kwargs) assert isinstance(pred_proba, pd.DataFrame) except Exception as e: @@ -115,7 +118,7 @@ def test_basic_functionality( predict_kwargs = dict() if isinstance(cloud_predictor, TimeSeriesCloudPredictor): pred = cloud_predictor.predict(test_data, **predict_kwargs) - assert pred is not None + assert isinstance(pred, pd.DataFrame) else: pred, pred_proba = cloud_predictor.predict_proba(test_data, **predict_kwargs) assert isinstance(pred, pd.Series) and isinstance(pred_proba, pd.DataFrame)