Skip to content

Commit

Permalink
Added support to accept local avro files, GCS avro files and GCS wild…
Browse files Browse the repository at this point in the history
…card paths
  • Loading branch information
voonhous committed Dec 22, 2019
1 parent 437b350 commit 7a29d94
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 90 deletions.
201 changes: 157 additions & 44 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import json
import logging
import os
import time
from collections import OrderedDict
from typing import Dict, Union
from typing import List
from urllib.parse import urlparse

import fastavro
import grpc
import pandas as pd
import pyarrow as pa
Expand All @@ -38,7 +40,7 @@
from feast.feature_set import FeatureSet, Entity
from feast.job import Job
from feast.loaders.abstract_producer import get_producer
from feast.loaders.file import export_dataframe_to_staging_location
from feast.loaders.file import export_source_to_staging_location
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
from feast.loaders.ingest import get_feature_row_chunks
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
Expand Down Expand Up @@ -322,22 +324,28 @@ def list_entities(self) -> Dict[str, Entity]:
return entities_dict

def get_batch_features(
self, feature_ids: List[str], entity_rows: pd.DataFrame
self, feature_ids: List[str], entity_rows: Union[pd.DataFrame, str]
) -> Job:
"""
Retrieves historical features from a Feast Serving deployment.
Args:
feature_ids: List of feature ids that will be returned for each
entity. Each feature id should have the following format
feature_ids (List[str]):
List of feature ids that will be returned for each entity.
Each feature id should have the following format
"feature_set_name:version:feature_name".
entity_rows: Pandas dataframe containing entities and a 'datetime'
column. Each entity in a feature set must be present as a column
in this dataframe. The datetime column must
entity_rows (Union[pd.DataFrame, str]):
Pandas dataframe containing entities and a 'datetime' column.
Each entity in a feature set must be present as a column in this
dataframe. The datetime column must contain timestamps in
datetime64 format.
Returns:
Returns a job object that can be used to monitor retrieval progress
asynchronously, and can be used to materialize the results
feast.job.Job:
Returns a job object that can be used to monitor retrieval
progress asynchronously, and can be used to materialize the
results.
Examples:
>>> from feast import Client
Expand All @@ -360,21 +368,11 @@ def get_batch_features(

fs_request = _build_feature_set_request(feature_ids)

# Validate entity rows based on entities in Feast Core
self._validate_entity_rows_for_batch_retrieval(entity_rows, fs_request)

# Remove timezone from datetime column
if isinstance(
entity_rows["datetime"].dtype, pd.core.dtypes.dtypes.DatetimeTZDtype
):
entity_rows["datetime"] = pd.DatetimeIndex(
entity_rows["datetime"]
).tz_localize(None)

# Retrieve serving information to determine store type and
# staging location
serving_info = self._serving_service_stub.GetFeastServingInfo(
GetFeastServingInfoRequest(), timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT
GetFeastServingInfoRequest(),
timeout=GRPC_CONNECTION_TIMEOUT_DEFAULT
) # type: GetFeastServingInfoResponse

if serving_info.type != FeastServingType.FEAST_SERVING_TYPE_BATCH:
Expand All @@ -383,17 +381,50 @@ def get_batch_features(
f"does not support batch retrieval "
)

# Export and upload entity row dataframe to staging location
if isinstance(entity_rows, pd.DataFrame):
# Pandas DataFrame detected
# Validate entity rows to based on entities in Feast Core
self._validate_dataframe_for_batch_retrieval(
entity_rows=entity_rows,
feature_sets_request=fs_request
)

# Remove timezone from datetime column
if isinstance(
entity_rows["datetime"].dtype,
pd.core.dtypes.dtypes.DatetimeTZDtype
):
entity_rows["datetime"] = pd.DatetimeIndex(
entity_rows["datetime"]
).tz_localize(None)
elif isinstance(entity_rows, str):
# String based source
if entity_rows.endswith((".avro", "*")):
# Validate Avro entity rows to based on entities in Feast Core
self._validate_avro_for_batch_retrieval(
source=entity_rows,
feature_sets_request=fs_request
)
else:
raise Exception(
f"Only .avro and wildcard paths are accepted as entity_rows"
)
else:
raise Exception(f"Only pandas.DataFrame and str types are allowed"
f" as entity_rows, but got {type(entity_rows)}.")

# Export and upload entity row DataFrame to staging location
# provided by Feast
staged_file = export_dataframe_to_staging_location(
staged_files = export_source_to_staging_location(
entity_rows, serving_info.job_staging_location
) # type: str
) # type: List[str]

request = GetBatchFeaturesRequest(
feature_sets=fs_request,
dataset_source=DatasetSource(
file_source=DatasetSource.FileSource(
file_uris=[staged_file], data_format=DataFormat.DATA_FORMAT_AVRO
file_uris=staged_files,
data_format=DataFormat.DATA_FORMAT_AVRO
)
),
)
Expand All @@ -402,28 +433,107 @@ def get_batch_features(
response = self._serving_service_stub.GetBatchFeatures(request)
return Job(response.job, self._serving_service_stub)

def _validate_entity_rows_for_batch_retrieval(
self, entity_rows, feature_sets_request
def _validate_dataframe_for_batch_retrieval(
self, entity_rows: pd.DataFrame, feature_sets_request
):
"""
Validate whether an entity_row dataframe contains the correct
information for batch retrieval
Validate whether an the entity rows in a DataFrame contains the correct
information for batch retrieval.
Datetime column must be present in the DataFrame.
Args:
entity_rows: Pandas dataframe containing entities and datetime
column. Each entity in a feature set must be present as a
column in this dataframe.
feature_sets_request: Feature sets that will be requested
entity_rows (pd.DataFrame):
Pandas DataFrame containing entities and datetime column. Each
entity in a feature set must be present as a column in this
DataFrame.
feature_sets_request:
Feature sets that will be requested.
"""

self._validate_columns(
columns=entity_rows.columns,
feature_sets_request=feature_sets_request,
datetime_field="datetime"
)

def _validate_avro_for_batch_retrieval(
self, source: str, feature_sets_request
):
"""
Validate whether the entity rows in an Avro source file contains the
correct information for batch retrieval.
Only gs:// and local files (file://) uri schemes are allowed.
Avro file must have a column named "event_timestamp".
No checks will be done if a GCS path is provided.
Args:
source (str):
File path to Avro.
feature_sets_request:
Feature sets that will be requested.
"""
p = urlparse(source)

if p.scheme == "gs":
# GCS path provided (Risk is delegated to user)
# No validation if GCS path is provided
return
elif p.scheme == "file" or not p.scheme:
# Local file (file://) provided
file_path = os.path.abspath(os.path.join(p.netloc, p.path))
else:
raise Exception(f"Unsupported uri scheme provided {p.scheme}, only "
f"local files (file://), and gs:// schemes are "
f"allowed")

with open(file_path, "rb") as f:
reader = fastavro.reader(f)
schema = json.loads(reader.metadata["avro.schema"])
columns = [x["name"] for x in schema["fields"]]
self._validate_columns(
columns=columns,
feature_sets_request=feature_sets_request,
datetime_field="event_timestamp"
)

def _validate_columns(
self, columns: List[str],
feature_sets_request,
datetime_field: str
) -> None:
"""
Check if the required column contains the correct values for batch
retrieval.
Args:
columns (List[str]):
List of columns to validate against feature_sets_request.
feature_sets_request ():
Feature sets that will be requested.
datetime_field (str):
Name of the datetime field that must be enforced and present as
a column in the data source.
Returns:
None:
None
"""
# Ensure datetime column exists
if "datetime" not in entity_rows.columns:
if datetime_field not in columns:
raise ValueError(
f'Entity rows does not contain "datetime" column in columns '
f"{entity_rows.columns}"
f'Entity rows does not contain "{datetime_field}" column in '
f'columns {columns}'
)

# Validate dataframe columns based on feature set entities
# Validate Avro columns based on feature set entities
for feature_set in feature_sets_request:
fs = self.get_feature_set(
name=feature_set.name, version=feature_set.version
Expand All @@ -434,10 +544,10 @@ def _validate_entity_rows_for_batch_retrieval(
f"could not be found"
)
for entity_type in fs.entities:
if entity_type.name not in entity_rows.columns:
if entity_type.name not in columns:
raise ValueError(
f'Dataframe does not contain entity "{entity_type.name}"'
f' column in columns "{entity_rows.columns}"'
f'Input does not contain entity'
f' "{entity_type.name}" column in columns "{columns}"'
)

def get_online_features(
Expand Down Expand Up @@ -610,7 +720,9 @@ def ingest(
return None


def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]:
def _build_feature_set_request(
feature_ids: List[str]
) -> List[FeatureSetRequest]:
"""
Builds a list of FeatureSet objects from feature set ids in order to
retrieve feature data from Feast Serving
Expand Down Expand Up @@ -643,7 +755,7 @@ def _read_table_from_source(
max_workers: int
) -> str:
"""
Infers a data source type (path or Pandas Dataframe) and reads it in as
Infers a data source type (path or Pandas DataFrame) and reads it in as
a PyArrow Table.
The PyArrow Table that is read will be written to a parquet file with row
Expand Down Expand Up @@ -688,7 +800,8 @@ def _read_table_from_source(
else:
table = pq.read_table(file_path)
else:
raise ValueError(f"Unknown data source provided for ingestion: {source}")
raise ValueError(
f"Unknown data source provided for ingestion: {source}")

# Ensure that PyArrow table is initialised
assert isinstance(table, pa.lib.Table)
Expand Down
Loading

0 comments on commit 7a29d94

Please sign in to comment.