From 7e224a1880baf36e65aadb5859fdfc37821087c6 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Wed, 9 Mar 2022 17:20:24 -0500 Subject: [PATCH] fix: Fix Spark template to work correctly on feast init -t spark as well as ensuring timestamps are recent so feature view TTLs don't expire Signed-off-by: Danny Chiao --- sdk/python/feast/templates/spark/bootstrap.py | 63 ++++++++----------- sdk/python/feast/templates/spark/example.py | 4 +- 2 files changed, 27 insertions(+), 40 deletions(-) diff --git a/sdk/python/feast/templates/spark/bootstrap.py b/sdk/python/feast/templates/spark/bootstrap.py index 155a86bf48..b57387d3d7 100644 --- a/sdk/python/feast/templates/spark/bootstrap.py +++ b/sdk/python/feast/templates/spark/bootstrap.py @@ -1,48 +1,35 @@ -from datetime import datetime, timedelta -from pathlib import Path - -from pyspark.sql import SparkSession - -from feast.driver_test_data import ( - create_customer_daily_profile_df, - create_driver_hourly_stats_df, -) - -CURRENT_DIR = Path(__file__).parent -DRIVER_ENTITIES = [1001, 1002, 1003] -CUSTOMER_ENTITIES = [201, 202, 203] -START_DATE = datetime.strptime("2022-01-01", "%Y-%m-%d") -END_DATE = START_DATE + timedelta(days=7) - - def bootstrap(): # Bootstrap() will automatically be called from the init_repo() during `feast init` - generate_example_data( - spark_session=SparkSession.builder.getOrCreate(), base_dir=str(CURRENT_DIR), - ) - + import pathlib + from datetime import datetime, timedelta -def example_data_exists(base_dir: str) -> bool: - for path in [ - Path(base_dir) / "data" / "driver_hourly_stats", - Path(base_dir) / "data" / "customer_daily_profile", - ]: - if not path.exists(): - return False - return True + from feast.driver_test_data import ( + create_customer_daily_profile_df, + create_driver_hourly_stats_df, + ) + repo_path = pathlib.Path(__file__).parent.absolute() + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) -def generate_example_data(spark_session: SparkSession, base_dir: str) -> None: - spark_session.createDataFrame( - data=create_driver_hourly_stats_df(DRIVER_ENTITIES, START_DATE, END_DATE) - ).write.parquet( - path=str(Path(base_dir) / "data" / "driver_hourly_stats"), mode="overwrite", + driver_entities = [1001, 1002, 1003] + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + driver_stats_df = create_driver_hourly_stats_df( + driver_entities, start_date, end_date + ) + driver_stats_df.to_parquet( + path=str(data_path / "driver_hourly_stats.parquet"), + allow_truncated_timestamps=True, ) - spark_session.createDataFrame( - data=create_customer_daily_profile_df(CUSTOMER_ENTITIES, START_DATE, END_DATE) - ).write.parquet( - path=str(Path(base_dir) / "data" / "customer_daily_profile"), mode="overwrite", + customer_entities = [201, 202, 203] + customer_profile_df = create_customer_daily_profile_df( + customer_entities, start_date, end_date + ) + customer_profile_df.to_parquet( + path=str(data_path / "customer_daily_profile.parquet"), + allow_truncated_timestamps=True, ) diff --git a/sdk/python/feast/templates/spark/example.py b/sdk/python/feast/templates/spark/example.py index 11a25e1be2..2b738c4337 100644 --- a/sdk/python/feast/templates/spark/example.py +++ b/sdk/python/feast/templates/spark/example.py @@ -24,14 +24,14 @@ # Sources driver_hourly_stats = SparkSource( name="driver_hourly_stats", - path=f"{CURRENT_DIR}/data/driver_hourly_stats", + path=f"{CURRENT_DIR}/data/driver_hourly_stats.parquet", file_format="parquet", event_timestamp_column="event_timestamp", created_timestamp_column="created", ) customer_daily_profile = SparkSource( name="customer_daily_profile", - path=f"{CURRENT_DIR}/data/customer_daily_profile", + path=f"{CURRENT_DIR}/data/customer_daily_profile.parquet", file_format="parquet", event_timestamp_column="event_timestamp", created_timestamp_column="created",