Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema Inferencing should happen at apply time #1646

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 10 additions & 54 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@


import enum
import re
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow.parquet import ParquetFile
Expand Down Expand Up @@ -371,7 +370,7 @@ class DataSource:

def __init__(
self,
event_timestamp_column: str,
event_timestamp_column: Optional[str] = "",
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = "",
Expand Down Expand Up @@ -520,45 +519,11 @@ def to_proto(self) -> DataSourceProto:
"""
raise NotImplementedError

def _infer_event_timestamp_column(self, ts_column_type_regex_pattern):
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"
USER_GUIDANCE = "Please specify event_timestamp_column explicitly."

if isinstance(self, FileSource) or isinstance(self, BigQuerySource):
event_timestamp_column, matched_flag = None, False
for col_name, col_datatype in self.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria. {USER_GUIDANCE}
"""
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
return event_timestamp_column
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
{USER_GUIDANCE}
"""
)
else:
raise TypeError(
f"""
{ERROR_MSG_PREFIX} because this DataSource currently does not support this inference.
{USER_GUIDANCE}
"""
)


class FileSource(DataSource):
def __init__(
self,
event_timestamp_column: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
file_url: Optional[str] = None,
path: Optional[str] = None,
file_format: FileFormat = None,
Expand Down Expand Up @@ -598,7 +563,7 @@ def __init__(
self._file_options = FileOptions(file_format=file_format, file_url=file_url)

super().__init__(
event_timestamp_column or self._infer_event_timestamp_column(r"^timestamp"),
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
Expand Down Expand Up @@ -662,7 +627,7 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
class BigQuerySource(DataSource):
def __init__(
self,
event_timestamp_column: Optional[str] = None,
event_timestamp_column: Optional[str] = "",
table_ref: Optional[str] = None,
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
Expand All @@ -672,8 +637,7 @@ def __init__(
self._bigquery_options = BigQueryOptions(table_ref=table_ref, query=query)

super().__init__(
event_timestamp_column
or self._infer_event_timestamp_column("TIMESTAMP|DATETIME"),
event_timestamp_column,
created_timestamp_column,
field_mapping,
date_partition_column,
Expand Down Expand Up @@ -743,20 +707,12 @@ def get_table_column_names_and_types(self) -> Iterable[Tuple[str, str]]:
from google.cloud import bigquery

client = bigquery.Client()
name_type_pairs = []
if self.table_ref is not None:
project_id, dataset_id, table_id = self.table_ref.split(".")
bq_columns_query = f"""
SELECT COLUMN_NAME, DATA_TYPE FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_id}'
"""
table_schema = (
client.query(bq_columns_query).result().to_dataframe_iterable()
)
for df in table_schema:
name_type_pairs.extend(
list(zip(df["COLUMN_NAME"].to_list(), df["DATA_TYPE"].to_list()))
)
table_schema = client.get_table(self.table_ref).schema
if not isinstance(table_schema[0], bigquery.schema.SchemaField):
raise TypeError("Could not parse BigQuery table schema.")

name_type_pairs = [(field.name, field.field_type) for field in table_schema]
else:
bq_columns_query = f"SELECT * FROM ({self.query}) LIMIT 1"
queryRes = client.query(bq_columns_query).result()
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,11 @@ def __init__(
f"The DataFrame from {source} being materialized must have at least {join_key_columns} columns present, "
f"but these were missing: {join_key_columns - source_columns} "
)


class RegistryInferenceFailure(Exception):
def __init__(self, repo_obj_type: str, specific_issue: str):
super().__init__(
f"Inference to fill in missing information for {repo_obj_type} failed. {specific_issue}. "
"Try filling the information explicitly."
)
10 changes: 9 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
from feast.entity import Entity
from feast.errors import FeastProviderLoginError, FeatureViewNotFoundException
from feast.feature_view import FeatureView
from feast.inference import infer_entity_value_type_from_feature_views
from feast.inference import (
infer_entity_value_type_from_feature_views,
update_data_sources_with_inferred_event_timestamp_col,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.protos.feast.serving.ServingService_pb2 import (
Expand Down Expand Up @@ -224,6 +227,11 @@ def apply(
entities_to_update = infer_entity_value_type_from_feature_views(
[ob for ob in objects if isinstance(ob, Entity)], views_to_update
)
update_data_sources_with_inferred_event_timestamp_col(
[view.input for view in views_to_update]
)
for view in views_to_update:
view.infer_features_from_input_source()

if len(views_to_update) + len(entities_to_update) != len(objects):
raise ValueError("Unknown object type provided as part of apply() call")
Expand Down
56 changes: 33 additions & 23 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from feast import utils
from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import RegistryInferenceFailure
from feast.feature import Feature
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
Expand Down Expand Up @@ -64,29 +65,6 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
online: bool = True,
):
if not features:
features = [] # to handle python's mutable default arguments
columns_to_exclude = {
input.event_timestamp_column,
input.created_timestamp_column,
} | set(entities)

for col_name, col_datatype in input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$", col_name
):
features.append(
Feature(
col_name,
input.source_datatype_to_feast_value_type()(col_datatype),
)
)

if not features:
raise ValueError(
f"Could not infer Features for the FeatureView named {name}. Please specify Features explicitly for this FeatureView."
)

cols = [entity for entity in entities] + [feat.name for feat in features]
for col in cols:
if input.field_mapping is not None and col in input.field_mapping.keys():
Expand Down Expand Up @@ -241,3 +219,35 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

def infer_features_from_input_source(self):
if not self.features:
achals marked this conversation as resolved.
Show resolved Hide resolved
columns_to_exclude = {
self.input.event_timestamp_column,
self.input.created_timestamp_column,
} | set(self.entities)

for col_name, col_datatype in self.input.get_table_column_names_and_types():
if col_name not in columns_to_exclude and not re.match(
"^__|__$",
col_name, # double underscores often signal an internal-use column
):
feature_name = (
self.input.field_mapping[col_name]
if col_name in self.input.field_mapping.keys()
else col_name
)
self.features.append(
Feature(
feature_name,
self.input.source_datatype_to_feast_value_type()(
col_datatype
),
)
)

if not self.features:
raise RegistryInferenceFailure(
"FeatureView",
f"Could not infer Features for the FeatureView named {self.name}.",
)
69 changes: 65 additions & 4 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from typing import List
import re
from typing import List, Union

from feast import Entity
from feast.data_source import BigQuerySource, FileSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.value_type import ValueType

Expand Down Expand Up @@ -45,12 +48,70 @@ def infer_entity_value_type_from_feature_views(
entity.value_type != ValueType.UNKNOWN
and entity.value_type != inferred_value_type
) or (len(extracted_entity_name_type_pairs) > 1):
raise ValueError(
raise RegistryInferenceFailure(
"Entity",
f"""Entity value_type inference failed for {entity_name} entity.
Multiple viable matches. Please explicitly specify the entity value_type
for this entity."""
Multiple viable matches.
""",
)

entity.value_type = inferred_value_type

return entities


def update_data_sources_with_inferred_event_timestamp_col(
data_sources: List[Union[BigQuerySource, FileSource]],
) -> None:
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"

for data_source in data_sources:
if (
data_source.event_timestamp_column is None
or data_source.event_timestamp_column == ""
):
achals marked this conversation as resolved.
Show resolved Hide resolved
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
if isinstance(data_source, FileSource):
ts_column_type_regex_pattern = r"^timestamp"
elif isinstance(data_source, BigQuerySource):
ts_column_type_regex_pattern = "TIMESTAMP|DATETIME"
else:
raise RegistryInferenceFailure(
"DataSource",
"""
DataSource inferencing of event_timestamp_column is currently only supported
for FileSource and BigQuerySource.
""",
)
# for informing the type checker
assert isinstance(data_source, FileSource) or isinstance(
data_source, BigQuerySource
)

# loop through table columns to find singular match
event_timestamp_column, matched_flag = None, False
for (
col_name,
col_datatype,
) in data_source.get_table_column_names_and_types():
if re.match(ts_column_type_regex_pattern, col_datatype):
if matched_flag:
raise RegistryInferenceFailure(
"DataSource",
f"""
{ERROR_MSG_PREFIX} due to multiple possible columns satisfying
the criteria. {ts_column_type_regex_pattern} {col_name}
""",
)
matched_flag = True
event_timestamp_column = col_name
if matched_flag:
data_source.event_timestamp_column = event_timestamp_column
else:
raise RegistryInferenceFailure(
"DataSource",
f"""
{ERROR_MSG_PREFIX} due to an absence of columns that satisfy the criteria.
""",
)
10 changes: 9 additions & 1 deletion sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

from feast import Entity, FeatureTable
from feast.feature_view import FeatureView
from feast.inference import infer_entity_value_type_from_feature_views
from feast.inference import (
infer_entity_value_type_from_feature_views,
update_data_sources_with_inferred_event_timestamp_col,
)
from feast.infra.offline_stores.helpers import assert_offline_store_supports_data_source
from feast.infra.provider import get_provider
from feast.names import adjectives, animals
Expand Down Expand Up @@ -136,6 +139,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
),
feature_views=repo.feature_views,
)

sys.dont_write_bytecode = False
for entity in repo.entities:
registry.apply_entity(entity, project=project)
Expand All @@ -156,6 +160,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path):
repo_config.offline_store, data_source
)

update_data_sources_with_inferred_event_timestamp_col(data_sources)
for view in repo.feature_views:
view.infer_features_from_input_source()

tables_to_delete = []
for registry_table in registry.list_feature_tables(project=project):
if registry_table.name not in repo_table_names:
Expand Down
Loading