Skip to content

Commit

Permalink
Schema Inferencing should happen at apply time (feast-dev#1646)
Browse files Browse the repository at this point in the history
* wip1

Signed-off-by: David Y Liu <[email protected]>

* just need to do clean up

Signed-off-by: David Y Liu <[email protected]>

* linted

Signed-off-by: David Y Liu <[email protected]>

* improve test coverage

Signed-off-by: David Y Liu <[email protected]>

* changed placement of inference methods in repo_operation apply_total

Signed-off-by: David Y Liu <[email protected]>

* updated inference method name + changed to void return since it updates in place

Signed-off-by: David Y Liu <[email protected]>

* fixed integration test and added comments

Signed-off-by: David Y Liu <[email protected]>

* Made DataSource event_timestamp_column optional

Signed-off-by: David Y Liu <[email protected]>
Signed-off-by: Mwad22 <[email protected]>
  • Loading branch information
mavysavydav authored and Mwad22 committed Jul 7, 2021
1 parent 462da43 commit df95ee8
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 103 deletions.
64 changes: 10 additions & 54 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


import enum
import re
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile
Expand Down Expand Up @@ -371,7 +370,7 @@ class DataSource:

def __init__(
self,
event_timestamp_column: str,
event_timestamp_column: Optional[str] = "",
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
Expand Down Expand Up @@ -520,45 +519,11 @@ def to_proto(self) -> DataSourceProto:
"""
raise NotImplementedError

def _infer_event_timestamp_column(self, ts_column_type_regex_pattern):
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
USER_GUIDANCE = "Please specify event_timestamp_column explicitly."

if isinstance(self, FileSource) or isinstance(self, BigQuerySource):
event_timestamp_column, matched_flag = None, False
for col_name, col_datatype in self.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria. {USER_GUIDANCE}
"""
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
return event_timestamp_column
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
{USER_GUIDANCE}
"""
)
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} because this DataSource currently does not support this inference.
{USER_GUIDANCE}
"""
)


class FileSource(DataSource):
def __init__(
self,
event_timestamp_column: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
file_url: Optional[str] = None,
path: Optional[str] = None,
file_format: FileFormat = None,
Expand Down Expand Up @@ -598,7 +563,7 @@ def __init__(
self._file_options = FileOptions(file_format=file_format, file_url=file_url)

super().__init__(
event_timestamp_column or self._infer_event_timestamp_column(r"^timestamp"),
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
Expand Down Expand Up @@ -662,7 +627,7 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
class BigQuerySource(DataSource):
def __init__(
self,
event_timestamp_column: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
Expand All @@ -672,8 +637,7 @@ def __init__(
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)

super().__init__(
event_timestamp_column
or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"),
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
Expand Down Expand Up @@ -743,20 +707,12 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
from google.cloud import bigquery

client = bigquery.Client()
name_type_pairs = []
if self.table_ref is not None:
project_id, dataset_id, table_id = self.table_ref.split(".")
bq_columns_query = f"""
SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_id}'
"""
table_schema = (
client.query(bq_columns_query).result().to_dataframe_iterable()
)
for df in table_schema:
name_type_pairs.extend(
list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list()))
)
table_schema = client.get_table(self.table_ref).schema
if not isinstance(table_schema[0], bigquery.schema.SchemaField):
raise TypeError("Could not parse BigQuery table schema.")

name_type_pairs = [(field.name, field.field_type) for field in table_schema]
else:
bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1"
queryRes = client.query(bq_columns_query).result()
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,11 @@ def __init__(
f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, "
f"but these were missing: {join_key_columns - source_columns} "
)


class RegistryInferenceFailure(Exception):
def __init__(self, repo_obj_type: str, specific_issue: str):
super().__init__(
f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. "
"Try filling the information explicitly."
)
10 changes: 9 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
FeatureViewNotFoundException,
)
from feast.feature_view import FeatureView
from feast.inference import infer_entity_value_type_from_feature_views
from feast.inference import (
infer_entity_value_type_from_feature_views,
update_data_sources_with_inferred_event_timestamp_col,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.protos.feast.serving.ServingService_pb2 import (
Expand Down Expand Up @@ -228,6 +231,11 @@ def apply(
entities_to_update = infer_entity_value_type_from_feature_views(
[ob for ob in objects if isinstance(ob, Entity)], views_to_update
)
update_data_sources_with_inferred_event_timestamp_col(
[view.input for view in views_to_update]
)
for view in views_to_update:
view.infer_features_from_input_source()

if len(views_to_update) + len(entities_to_update) != len(objects):
raise ValueError("Unknown object type provided as part of apply() call")
Expand Down
56 changes: 33 additions & 23 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from feast import utils
from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
Expand Down Expand Up @@ -64,29 +65,6 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
if not features:
features = [] # to handle python's mutable default arguments
columns_to_exclude = {
input.event_timestamp_column,
input.created_timestamp_column,
} | set(entities)

for col_name, col_datatype in input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$", col_name
):
features.append(
Feature(
col_name,
input.source_datatype_to_feast_value_type()(col_datatype),
)
)

if not features:
raise ValueError(
f"Could not infer Features for the FeatureView named {name}. Please specify Features explicitly for this FeatureView."
)

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
Expand Down Expand Up @@ -241,3 +219,35 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_input_source(self):
if not self.features:
columns_to_exclude = {
self.input.event_timestamp_column,
self.input.created_timestamp_column,
} | set(self.entities)

for col_name, col_datatype in self.input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.input.field_mapping[col_name]
if col_name in self.input.field_mapping.keys()
else col_name
)
self.features.append(
Feature(
feature_name,
self.input.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not self.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)
69 changes: 65 additions & 4 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import List
import re
from typing import List, Union

from feast import Entity
from feast.data_source import BigQuerySource, FileSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.value_type import ValueType

Expand Down Expand Up @@ -45,12 +48,70 @@ def infer_entity_value_type_from_feature_views(
entity.value_type != ValueType.UNKNOWN
and entity.value_type != inferred_value_type
) or (len(extracted_entity_name_type_pairs) > 1):
raise ValueError(
raise RegistryInferenceFailure(
"Entity",
f"""Entity value_type inference failed for {entity_name} entity.
Multiple viable matches. Please explicitly specify the entity value_type
for this entity."""
Multiple viable matches.
""",
)

entity.value_type = inferred_value_type

return entities


def update_data_sources_with_inferred_event_timestamp_col(
data_sources: List[Union[BigQuerySource, FileSource]],
) -> None:
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"

for data_source in data_sources:
if (
data_source.event_timestamp_column is None
or data_source.event_timestamp_column == ""
):
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
if isinstance(data_source, FileSource):
ts_column_type_regex_pattern = r"^timestamp"
elif isinstance(data_source, BigQuerySource):
ts_column_type_regex_pattern = "TIMESTAMP|DATETIME"
else:
raise RegistryInferenceFailure(
"DataSource",
"""
DataSource inferencing of event_timestamp_column is currently only supported
for FileSource and BigQuerySource.
""",
)
# for informing the type checker
assert isinstance(data_source, FileSource) or isinstance(
data_source, BigQuerySource
)

# loop through table columns to find singular match
event_timestamp_column, matched_flag = None, False
for (
col_name,
col_datatype,
) in data_source.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise RegistryInferenceFailure(
"DataSource",
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria. {ts_column_type_regex_pattern} {col_name}
""",
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
data_source.event_timestamp_column = event_timestamp_column
else:
raise RegistryInferenceFailure(
"DataSource",
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
""",
)
10 changes: 9 additions & 1 deletion sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

from feast import Entity, FeatureTable
from feast.feature_view import FeatureView
from feast.inference import infer_entity_value_type_from_feature_views
from feast.inference import (
infer_entity_value_type_from_feature_views,
update_data_sources_with_inferred_event_timestamp_col,
)
from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source
from feast.infra.provider import get_provider
from feast.names import adjectives, animals
Expand Down Expand Up @@ -136,6 +139,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
),
feature_views=repo.feature_views,
)

sys.dont_write_bytecode = False
for entity in repo.entities:
registry.apply_entity(entity, project=project)
Expand All @@ -156,6 +160,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
repo_config.offline_store, data_source
)

update_data_sources_with_inferred_event_timestamp_col(data_sources)
for view in repo.feature_views:
view.infer_features_from_input_source()

tables_to_delete = []
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
Expand Down
Loading

0 comments on commit df95ee8

Please sign in to comment.