Skip to content

Commit

Permalink
fix: Parse inline data sources (#3036)
Browse files Browse the repository at this point in the history
* Remove deprecated CLI warnings

Signed-off-by: Felix Wang <[email protected]>

* Fix repo parsing logic

Signed-off-by: Felix Wang <[email protected]>

* Add tests

Signed-off-by: Felix Wang <[email protected]>

Signed-off-by: Felix Wang <[email protected]>
  • Loading branch information
felixwang9817 authored Aug 15, 2022
1 parent 66d2c76 commit c7ba370
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 194 deletions.
12 changes: 0 additions & 12 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
import json
import logging
import warnings
from datetime import datetime
from pathlib import Path
from typing import List, Optional
Expand Down Expand Up @@ -45,7 +44,6 @@
from feast.utils import maybe_local_tz

_logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=DeprecationWarning, module="(?!feast)")


class NoOptionDefaultFormat(click.Command):
Expand Down Expand Up @@ -197,11 +195,6 @@ def data_source_describe(ctx: click.Context, name: str):
print(e)
exit(1)

warnings.warn(
"Describing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.24, data source unique names will be required to encourage data source discovery.",
RuntimeWarning,
)
print(
yaml.dump(
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
Expand All @@ -224,11 +217,6 @@ def data_source_list(ctx: click.Context):

from tabulate import tabulate

warnings.warn(
"Listing data sources will only work properly if all data sources have names or table names specified. "
"Starting Feast 0.24, data source unique names will be required to encourage data source discovery",
RuntimeWarning,
)
print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))


Expand Down
20 changes: 19 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@
from feast import feature_server, flags_helper, ui_server, utils
from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource, PushMode
from feast.data_source import (
DataSource,
KafkaSource,
KinesisSource,
PushMode,
PushSource,
)
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.dqm.errors import ValidationFailed
Expand Down Expand Up @@ -827,6 +833,18 @@ def apply(
ob for ob in objects if isinstance(ob, ValidationReference)
]

batch_sources_to_add: List[DataSource] = []
for data_source in data_sources_set_to_update:
if (
isinstance(data_source, PushSource)
or isinstance(data_source, KafkaSource)
or isinstance(data_source, KinesisSource)
):
assert data_source.batch_source
batch_sources_to_add.append(data_source.batch_source)
for batch_source in batch_sources_to_add:
data_sources_set_to_update.add(batch_source)

for fv in itertools.chain(views_to_update, sfvs_to_update):
data_sources_set_to_update.add(fv.batch_source)
if fv.stream_source:
Expand Down
60 changes: 40 additions & 20 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from feast import PushSource
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource, KafkaSource
from feast.data_source import DataSource, KafkaSource, KinesisSource
from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add
from feast.entity import Entity
from feast.feature_service import FeatureService
Expand Down Expand Up @@ -114,44 +114,64 @@ def parse_repo(repo_root: Path) -> RepoContents:
request_feature_views=[],
)

data_sources_set = set()
for repo_file in get_repo_files(repo_root):
module_path = py_path_to_module(repo_file)
module = importlib.import_module(module_path)

for attr_name in dir(module):
obj = getattr(module, attr_name)

if isinstance(obj, DataSource) and not any(
(obj is ds) for ds in res.data_sources
):
res.data_sources.append(obj)
data_sources_set.add(obj)

# Handle batch sources defined within stream sources.
if (
isinstance(obj, PushSource)
or isinstance(obj, KafkaSource)
or isinstance(obj, KinesisSource)
):
batch_source = obj.batch_source

if batch_source and not any(
(batch_source is ds) for ds in res.data_sources
):
res.data_sources.append(batch_source)
if (
isinstance(obj, FeatureView)
and not any((obj is fv) for fv in res.feature_views)
and not isinstance(obj, StreamFeatureView)
and not isinstance(obj, BatchFeatureView)
):
res.feature_views.append(obj)
if isinstance(obj.stream_source, PushSource) and not any(
(obj is ds) for ds in res.data_sources
):
push_source_dep = obj.stream_source.batch_source
# Don't add if the push source's batch source is a duplicate of an existing batch source
if push_source_dep not in data_sources_set:
res.data_sources.append(push_source_dep)

# Handle batch sources defined with feature views.
batch_source = obj.batch_source
assert batch_source
if not any((batch_source is ds) for ds in res.data_sources):
res.data_sources.append(batch_source)

# Handle stream sources defined with feature views.
if obj.stream_source:
stream_source = obj.stream_source
if not any((stream_source is ds) for ds in res.data_sources):
res.data_sources.append(stream_source)
elif isinstance(obj, StreamFeatureView) and not any(
(obj is sfv) for sfv in res.stream_feature_views
):
res.stream_feature_views.append(obj)
if (
isinstance(obj.stream_source, PushSource)
or isinstance(obj.stream_source, KafkaSource)
and not any((obj is ds) for ds in res.data_sources)
):
batch_source_dep = obj.stream_source.batch_source
# Don't add if the push source's batch source is a duplicate of an existing batch source
if batch_source_dep and batch_source_dep not in data_sources_set:
res.data_sources.append(batch_source_dep)

# Handle batch sources defined with feature views.
batch_source = obj.batch_source
if not any((batch_source is ds) for ds in res.data_sources):
res.data_sources.append(batch_source)

# Handle stream sources defined with feature views.
stream_source = obj.stream_source
assert stream_source
if not any((stream_source is ds) for ds in res.data_sources):
res.data_sources.append(stream_source)
elif isinstance(obj, Entity) and not any(
(obj is entity) for entity in res.entities
):
Expand All @@ -168,6 +188,7 @@ def parse_repo(repo_root: Path) -> RepoContents:
(obj is rfv) for rfv in res.request_feature_views
):
res.request_feature_views.append(obj)

res.entities.append(DUMMY_ENTITY)
return res

Expand Down Expand Up @@ -300,7 +321,6 @@ def log_infra_changes(

@log_exceptions_and_usage
def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool):

os.chdir(repo_path)
project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path)
apply_total_with_repo_instance(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int32, Int64

driver = Entity(
name="driver_id",
description="driver id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
tags={},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import timedelta

from feast import Entity, FeatureView, Field, FileSource, KafkaSource
from feast.data_format import AvroFormat
from feast.types import Float32, Int32, Int64

driver = Entity(
name="driver_id",
description="driver id",
)

driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
watermark_delay_threshold=timedelta(days=1),
),
tags={},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from datetime import timedelta

from feast import FileSource, KafkaSource
from feast.data_format import AvroFormat

stream_source = KafkaSource(
name="kafka",
timestamp_field="event_timestamp",
kafka_bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(
path="data/driver_stats.parquet", # Fake path
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
watermark_delay_threshold=timedelta(days=1),
)
Loading

0 comments on commit c7ba370

Please sign in to comment.