Skip to content

Commit

Permalink
TimeSeriesCloudPredictor (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
Weisu Yin authored Feb 20, 2023
1 parent 12f94cd commit 5b57879
Show file tree
Hide file tree
Showing 16 changed files with 510 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/autogluon/cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from autogluon.common.utils.log_utils import _add_stream_handler

from .predictor import MultiModalCloudPredictor, TabularCloudPredictor
from .predictor import MultiModalCloudPredictor, TabularCloudPredictor, TimeSeriesCloudPredictor

_add_stream_handler()
1 change: 1 addition & 0 deletions src/autogluon/cloud/predictor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .multimodal_cloud_predictor import MultiModalCloudPredictor
from .tabular_cloud_predictor import TabularCloudPredictor
from .timeseries_cloud_predictor import TimeSeriesCloudPredictor
14 changes: 11 additions & 3 deletions src/autogluon/cloud/predictor/cloud_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,10 +787,13 @@ def _load_predict_real_time_test_data(self, test_data):

return test_data

def _predict_real_time(self, test_data, accept, **initial_args):
def _predict_real_time(self, test_data, accept, split_pred_proba=True, **initial_args):
try:
prediction = self.endpoint.predict(test_data, initial_args={"Accept": accept, **initial_args})
pred, pred_proba = split_pred_and_pred_proba(prediction)
pred, pred_proba = None, None
pred = prediction
if split_pred_proba:
pred, pred_proba = split_pred_and_pred_proba(prediction)
return pred, pred_proba
except ClientError as e:
if e.response["Error"]["Code"] == "413": # Error code for pay load too large
Expand Down Expand Up @@ -904,6 +907,7 @@ def _predict(
save_path=None,
model_kwargs=None,
transformer_kwargs=None,
split_pred_proba=True,
**kwargs,
):
if not predictor_path:
Expand Down Expand Up @@ -1024,7 +1028,9 @@ def _predict(
results_path = self.download_predict_results(save_path=save_path)
# Batch inference will only return json format
results = pd.read_json(results_path)
pred, pred_proba = split_pred_and_pred_proba(results)
pred = results
if split_pred_proba:
pred, pred_proba = split_pred_and_pred_proba(results)
if not persist:
os.remove(results_path)

Expand Down Expand Up @@ -1052,6 +1058,7 @@ def predict(
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
Expand Down Expand Up @@ -1152,6 +1159,7 @@ def predict_proba(
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
Expand Down
16 changes: 16 additions & 0 deletions src/autogluon/cloud/predictor/multimodal_cloud_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ def predict(
**kwargs,
) -> Optional[pd.Series]:
"""
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
Parameters
----------
test_data: str
The test data to be inferenced.
Can be a pandas.DataFrame or a local path to a csv file.
Expand Down Expand Up @@ -202,6 +211,13 @@ def predict_proba(
**kwargs,
) -> Optional[Union[Tuple[pd.Series, Union[pd.DataFrame, pd.Series]], Union[pd.DataFrame, pd.Series]]]:
"""
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
test_data: str
The test data to be inferenced.
Can be a pandas.DataFrame or a local path to a csv file.
Expand Down
271 changes: 271 additions & 0 deletions src/autogluon/cloud/predictor/timeseries_cloud_predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
from __future__ import annotations

import copy
import logging
from typing import Any, Dict, Optional, Union

import pandas as pd

from autogluon.common.loaders import load_pd

from .cloud_predictor import CloudPredictor

logger = logging.getLogger(__name__)


class TimeSeriesCloudPredictor(CloudPredictor):
predictor_file_name = "TimeSeriesCloudPredictor.pkl"

@property
def predictor_type(self):
"""
Type of the underneath AutoGluon Predictor
"""
return "timeseries"

def _get_local_predictor_cls(self):
from autogluon.timeseries import TimeSeriesPredictor

predictor_cls = TimeSeriesPredictor
return predictor_cls

def _preprocess_data(
self,
data: Union[pd.DataFrame, str],
id_column: str,
timestamp_column: str,
target: str,
static_features: Optional[Union[pd.DataFrame, str]] = None,
) -> pd.DataFrame:
if isinstance(data, str):
data = load_pd.load(data)
else:
data = copy.copy(data)
cols = data.columns.to_list()
# Make sure id and timestamp columns are the first two columns, and target column is in the end
# This is to ensure in the container we know how to find id and timestamp columns, and whether there are static features being merged
timestamp_index = cols.index(timestamp_column)
cols.insert(0, cols.pop(timestamp_index))
id_index = cols.index(id_column)
cols.insert(0, cols.pop(id_index))
target_index = cols.index(target)
cols.append(cols.pop(target_index))
data = data[cols]

if static_features is not None:
# Merge static features so only one dataframe needs to be sent to remote container
if isinstance(static_features, str):
static_features = load_pd.load(static_features)
data = pd.merge(data, static_features, how="left", on=id_column)

return data

def fit(
self,
*,
predictor_init_args: Dict[str, Any],
predictor_fit_args: Dict[str, Any],
id_column: str,
timestamp_column: str,
static_features: Optional[Union[str, pd.DataFrame]] = None,
framework_version: str = "latest",
job_name: Optional[str] = None,
instance_type: str = "ml.m5.2xlarge",
instance_count: int = 1,
volume_size: int = 100,
custom_image_uri: Optional[str] = None,
wait: bool = True,
autogluon_sagemaker_estimator_kwargs: Dict = None,
**kwargs,
) -> TimeSeriesCloudPredictor:
"""
Fit the predictor with SageMaker.
This function will first upload necessary config and train data to s3 bucket.
Then launch a SageMaker training job with the AutoGluon training container.
Parameters
----------
predictor_init_args: dict
Init args for the predictor
predictor_fit_args: dict
Fit args for the predictor
id_column: str
Name of the 'item_id' column
timestamp_column: str
Name of the 'timestamp' column
static_features: Optional[pd.DataFrame]
An optional data frame describing the metadata attributes of individual items in the item index.
For more detail, please refer to `TimeSeriesDataFrame` documentation:
https://auto.gluon.ai/stable/api/autogluon.predictor.html#timeseriesdataframe
framework_version: str, default = `latest`
Training container version of autogluon.
If `latest`, will use the latest available container version.
If provided a specific version, will use this version.
If `custom_image_uri` is set, this argument will be ignored.
job_name: str, default = None
Name of the launched training job.
If None, CloudPredictor will create one with prefix ag-cloudpredictor
instance_type: str, default = 'ml.m5.2xlarge'
Instance type the predictor will be trained on with SageMaker.
instance_count: int, default = 1
Number of instance used to fit the predictor.
volumes_size: int, default = 30
Size in GB of the EBS volume to use for storing input data during training (default: 30).
Must be large enough to store training data if File Mode is used (which is the default).
wait: bool, default = True
Whether the call should wait until the job completes
To be noticed, the function won't return immediately because there are some preparations needed prior fit.
Use `get_fit_job_status` to get job status.
autogluon_sagemaker_estimator_kwargs: dict, default = dict()
Any extra arguments needed to initialize AutoGluonSagemakerEstimator
Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework for all options
**kwargs:
Any extra arguments needed to pass to fit.
Please refer to https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework.fit for all options
Returns
-------
`TimeSeriesCloudPredictor` object. Returns self.
"""
predictor_fit_args = copy.deepcopy(predictor_fit_args)
train_data = predictor_fit_args.pop("train_data")
tuning_data = predictor_fit_args.pop("tuning_data", None)
target = predictor_init_args.get("target")
train_data = self._preprocess_data(
data=train_data,
id_column=id_column,
timestamp_column=timestamp_column,
target=target,
static_features=static_features,
)
if tuning_data is not None:
tuning_data = self._preprocess_data(
data=tuning_data,
id_column=id_column,
timestamp_column=timestamp_column,
target=target,
static_features=static_features,
)
predictor_fit_args["train_data"] = train_data
predictor_fit_args["tuning_data"] = tuning_data
print(train_data)
return super().fit(
predictor_init_args=predictor_init_args,
predictor_fit_args=predictor_fit_args,
framework_version=framework_version,
job_name=job_name,
instance_type=instance_type,
instance_count=instance_count,
volume_size=volume_size,
custom_image_uri=custom_image_uri,
wait=wait,
autogluon_sagemaker_estimator_kwargs=autogluon_sagemaker_estimator_kwargs,
**kwargs,
)

def predict_real_time(
self,
test_data: Union[str, pd.DataFrame],
id_column: str,
timestamp_column: str,
target: str,
static_features: Optional[Union[str, pd.DataFrame]] = None,
accept: str = "application/x-parquet",
) -> pd.DataFrame:
"""
Predict with the deployed SageMaker endpoint. A deployed SageMaker endpoint is required.
This is intended to provide a low latency inference.
If you want to inference on a large dataset, use `predict()` instead.
Parameters
----------
test_data: Union(str, pandas.DataFrame)
The test data to be inferenced.
Can be a pandas.DataFrame or a local path to a csv file.
id_column: str
Name of the 'item_id' column
timestamp_column: str
Name of the 'timestamp' column
static_features: Optional[pd.DataFrame]
An optional data frame describing the metadata attributes of individual items in the item index.
For more detail, please refer to `TimeSeriesDataFrame` documentation:
https://auto.gluon.ai/stable/api/autogluon.predictor.html#timeseriesdataframe
target: str
Name of column that contains the target values to forecast
accept: str, default = application/x-parquet
Type of accept output content.
Valid options are application/x-parquet, text/csv, application/json
Returns
-------
Pandas.DataFrame
Predict results in DataFrame
"""
self._validate_predict_real_time_args(accept)
test_data = self._preprocess_data(
data=test_data,
id_column=id_column,
timestamp_column=timestamp_column,
target=target,
static_features=static_features,
)
pred, _ = self._predict_real_time(test_data=test_data, accept=accept, split_pred_proba=False)
return pred

def predict_proba_real_time(self, **kwargs) -> pd.DataFrame:
raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")

def predict(
self,
test_data: Union[str, pd.DataFrame],
id_column: str,
timestamp_column: str,
target: str,
static_features: Optional[Union[str, pd.DataFrame]] = None,
**kwargs,
) -> Optional[pd.DataFrame]:
"""
Predict using SageMaker batch transform.
When minimizing latency isn't a concern, then the batch transform functionality may be easier, more scalable, and more appropriate.
If you want to minimize latency, use `predict_real_time()` instead.
To learn more: https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform.html
This method would first create a AutoGluonSagemakerInferenceModel with the trained predictor,
then create a transformer with it, and call transform in the end.
Parameters
----------
test_data: str
The test data to be inferenced.
Can be a pandas.DataFrame or a local path to a csv file.
id_column: str
Name of the 'item_id' column
timestamp_column: str
Name of the 'timestamp' column
static_features: Optional[Union[str, pd.DataFrame]]
An optional data frame describing the metadata attributes of individual items in the item index.
For more detail, please refer to `TimeSeriesDataFrame` documentation:
https://auto.gluon.ai/stable/api/autogluon.predictor.html#timeseriesdataframe
target: str
Name of column that contains the target values to forecast
kwargs:
Refer to `CloudPredictor.predict()`
"""
test_data = self._preprocess_data(
data=test_data,
id_column=id_column,
timestamp_column=timestamp_column,
target=target,
static_features=static_features,
)
pred, _ = super()._predict(
test_data=test_data,
split_pred_proba=False,
**kwargs,
)
return pred

def predict_proba(
self,
**kwargs,
) -> Optional[pd.DataFrame]:
raise ValueError(f"{self.__class__.__name__} does not support predict_proba operation.")
8 changes: 5 additions & 3 deletions src/autogluon/cloud/scripts/script_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ class ScriptManager:
TRAIN_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "train.py")
TABULAR_SERVE_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "tabular_serve.py")
MULTIMODAL_SERVE_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "multimodal_serve.py")
TIMESERIES_SERVE_SCRIPT_PATH = os.path.join(SCRIPTS_PATH, "timeseries_serve.py")
_SERVE_SCRIPT_MAP = dict(
tabular=TABULAR_SERVE_SCRIPT_PATH,
multimodal=MULTIMODAL_SERVE_SCRIPT_PATH,
timeseries=TIMESERIES_SERVE_SCRIPT_PATH,
)

@classmethod
def get_train_script(cls, predictor_type, framework_version):
assert predictor_type in ["tabular", "multimodal"]
# tabular, multimodal ßshare the same training script
assert predictor_type in ["tabular", "multimodal", "timeseries"]
# tabular, multimodal, timeseries share the same training script
return cls.TRAIN_SCRIPT_PATH

@classmethod
def get_serve_script(cls, predictor_type, framework_version):
assert predictor_type in ["tabular", "multimodal"]
assert predictor_type in ["tabular", "multimodal", "timeseries"]
return cls._SERVE_SCRIPT_MAP[predictor_type]
Loading

0 comments on commit 5b57879

Please sign in to comment.