Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Feast init command #1414

Merged
merged 3 commits into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@
from feast.feature_table import FeatureTable
from feast.loaders.yaml import yaml_loader
from feast.repo_config import load_repo_config
from feast.repo_operations import apply_total, registry_dump, teardown
from feast.repo_operations import (
apply_total,
cli_check_repo,
init_repo,
registry_dump,
teardown,
)

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -360,22 +366,28 @@ def project_list():


@cli.command("apply")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument(
"repo_path", type=click.Path(dir_okay=True, exists=True), default=Path.cwd
)
def apply_total_command(repo_path: str):
"""
Applies a feature repo
"""
cli_check_repo(Path(repo_path))
repo_config = load_repo_config(Path(repo_path))

apply_total(repo_config, Path(repo_path).resolve())


@cli.command("teardown")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument(
"repo_path", type=click.Path(dir_okay=True, exists=True), default=Path.cwd
)
def teardown_command(repo_path: str):
"""
Tear down infra for a feature repo
"""
cli_check_repo(Path(repo_path))
repo_config = load_repo_config(Path(repo_path))

teardown(repo_config, Path(repo_path).resolve())
Expand All @@ -393,13 +405,15 @@ def registry_dump_command(repo_path: str):


@cli.command("materialize")
@click.argument("repo_path", type=click.Path(dir_okay=True, exists=True))
@click.argument("start_ts")
@click.argument("end_ts")
@click.argument(
"repo_path", type=click.Path(dir_okay=True, exists=True,), default=Path.cwd
)
@click.option(
"--views", "-v", help="Feature views to materialize", multiple=True,
)
def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[str]):
def materialize_command(start_ts: str, end_ts: str, repo_path: str, views: List[str]):
"""
Run a (non-incremental) materialization job to ingest data into the online store. Feast
will read all data between START_TS and END_TS from the offline store and write it to the
Expand All @@ -416,5 +430,12 @@ def materialize_command(repo_path: str, start_ts: str, end_ts: str, views: List[
)


@cli.command("init")
@click.option("--minimal", "-m", is_flag=True, help="Only generate the config")
def init_command(minimal: bool):
repo_path = Path.cwd()
init_repo(repo_path, minimal)


if __name__ == "__main__":
cli()
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# This module generates dummy data to be used for tests and examples.
import numpy as np
import pandas as pd

Expand Down
36 changes: 36 additions & 0 deletions sdk/python/feast/example_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This is an example feature definition file

from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, ValueType
from feast.data_source import FileSource

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_hourly_stats = FileSource(
path="%PARQUET_PATH%",
event_timestamp_column="datetime",
created_timestamp_column="created",
)

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
input=driver_hourly_stats,
tags={},
)
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __repr__(self) -> str:

def load_repo_config(repo_path: Path) -> RepoConfig:
config_path = repo_path / "feature_store.yaml"

with open(config_path) as f:
raw_config = yaml.safe_load(f)
try:
Expand Down
84 changes: 84 additions & 0 deletions sdk/python/feast/repo_operations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import importlib
import os
import random
import string
import sys
from datetime import datetime, timedelta
from pathlib import Path
from textwrap import dedent
from typing import List, NamedTuple, Union

from feast import Entity, FeatureTable
from feast.driver_test_data import create_driver_hourly_stats_df
from feast.feature_view import FeatureView
from feast.infra.provider import get_provider
from feast.registry import Registry
Expand Down Expand Up @@ -132,3 +137,82 @@ def registry_dump(repo_config: RepoConfig):
print(entity)
for table in registry.list_feature_tables(project=project):
print(table)


def cli_check_repo(repo_path: Path):
config_path = repo_path / "feature_store.yaml"
if not config_path.exists():
print(
f"Can't find feature_store.yaml at {repo_path}. Make sure you're running this command in an initialized feast repository."
)
sys.exit(1)


def init_repo(repo_path: Path, minimal: bool):

repo_config = repo_path / "feature_store.yaml"

if repo_config.exists():
print("Feature repository is already initalized, nothing to do.")
sys.exit(1)

project_id = "proj" + "".join(
woop marked this conversation as resolved.
Show resolved Hide resolved
random.choice(string.ascii_lowercase + string.digits) for _ in range(5)
)

if minimal:
repo_config.write_text(
dedent(
f"""
project: {project_id}
metadata_store: /path/to/metadata.db
provider: local
online_store:
local:
path: /path/to/online_store.db
"""
)
)
print(
"Generated example feature_store.yaml. Please edit metadata_store"
woop marked this conversation as resolved.
Show resolved Hide resolved
"location before running apply"
)

else:
example_py = (Path(__file__).parent / "example_repo.py").read_text()

data_path = repo_path / "data"
data_path.mkdir(exist_ok=True)

end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
start_date = end_date - timedelta(days=15)

driver_entities = [1001, 1002, 1003, 1004, 1005]
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)

driver_stats_path = data_path / "driver_stats.parquet"
driver_df.to_parquet(
path=str(driver_stats_path), allow_truncated_timestamps=True
)

with open(repo_path / "example.py", "wt") as f:
f.write(example_py.replace("%PARQUET_PATH%", str(driver_stats_path)))

# Generate config
repo_config.write_text(
dedent(
f"""
project: {project_id}
metadata_store: {data_path / "metadata.db"}
woop marked this conversation as resolved.
Show resolved Hide resolved
provider: local
online_store:
local:
path: {data_path / "online_store.db"}
"""
)
)

print("Generated feature_store.yaml and example features in example_repo.py")
print(
"Now try runing `feast apply` to apply, or `feast materialize` to sync data to the online store"
)
4 changes: 2 additions & 2 deletions sdk/python/tests/cli/test_e2e_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import pandas as pd

import tests.driver_test_data as driver_data
import feast.driver_test_data as driver_data
from tests.cli.utils import CliRunner, get_example_repo


Expand Down Expand Up @@ -57,9 +57,9 @@ def test_basic(self) -> None:
r = runner.run(
[
"materialize",
str(store.repo_path),
start_date.isoformat(),
end_date.isoformat(),
str(store.repo_path),
],
cwd=Path(store.repo_path),
)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/test_historical_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from google.cloud import bigquery
from pandas.testing import assert_frame_equal

import tests.driver_test_data as driver_data
import feast.driver_test_data as driver_data
from feast.data_source import BigQuerySource, FileSource
from feast.entity import Entity
from feast.feature import Feature
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/tests/test_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import tempfile
from datetime import datetime, timedelta
from pathlib import Path

from tests.cli.utils import CliRunner


def test_repo_init() -> None:
"""
This test simply makes sure that you can run `feast apply && feast materialize` on
the repo created by "feast init" without errors.
"""
runner = CliRunner()
with tempfile.TemporaryDirectory() as repo_dir_name:
repo_path = Path(repo_dir_name)
result = runner.run(["init"], cwd=repo_path)
assert result.returncode == 0
result = runner.run(["apply"], cwd=repo_path)
assert result.returncode == 0

end_date = datetime.utcnow()
start_date = end_date - timedelta(days=100)
result = runner.run(
["materialize", start_date.isoformat(), end_date.isoformat()], cwd=repo_path
)
assert result.returncode == 0