From 3ed80c2dbc3430cfe28736860f6ab476954b0c24 Mon Sep 17 00:00:00 2001 From: Miles Adkins Date: Sat, 22 Apr 2023 13:09:07 -0500 Subject: [PATCH] fix: Clean up snowflake to_spark_df() Signed-off-by: Miles Adkins --- docs/reference/offline-stores/overview.md | 6 ++-- docs/reference/offline-stores/snowflake.md | 2 +- sdk/python/feast/errors.py | 8 ----- .../feast/infra/offline_stores/snowflake.py | 32 +++++++------------ 4 files changed, 15 insertions(+), 33 deletions(-) diff --git a/docs/reference/offline-stores/overview.md b/docs/reference/offline-stores/overview.md index b760a8a617..8ce9045496 100644 --- a/docs/reference/offline-stores/overview.md +++ b/docs/reference/offline-stores/overview.md @@ -46,11 +46,11 @@ Below is a matrix indicating which `RetrievalJob`s support what functionality. | --------------------------------- | --- | --- | --- | --- | --- | --- | --- | | export to dataframe | yes | yes | yes | yes | yes | yes | yes | | export to arrow table | yes | yes | yes | yes | yes | yes | yes | -| export to arrow batches | no | no | yes | yes | no | no | no | -| export to SQL | no | yes | yes | yes | yes | no | yes | +| export to arrow batches | no | no | no | yes | no | no | no | +| export to SQL | no | yes | yes | yes | yes | no | yes | | export to data lake (S3, GCS, etc.) | no | no | yes | no | yes | no | no | | export to data warehouse | no | yes | yes | yes | yes | no | no | -| export as Spark dataframe | no | no | yes | no | no | yes | no | +| export as Spark dataframe | no | no | yes | no | no | yes | no | | local execution of Python-based on-demand transforms | yes | yes | yes | yes | yes | no | yes | | remote execution of Python-based on-demand transforms | no | no | no | no | no | no | no | | persist results in the offline store | yes | yes | yes | yes | yes | yes | no | diff --git a/docs/reference/offline-stores/snowflake.md b/docs/reference/offline-stores/snowflake.md index a3782024a1..9f2dafee67 100644 --- a/docs/reference/offline-stores/snowflake.md +++ b/docs/reference/offline-stores/snowflake.md @@ -53,7 +53,7 @@ Below is a matrix indicating which functionality is supported by `SnowflakeRetri | ----------------------------------------------------- | --------- | | export to dataframe | yes | | export to arrow table | yes | -| export to arrow batches | yes | +| export to arrow batches | yes | | export to SQL | yes | | export to data lake (S3, GCS, etc.) | yes | | export to data warehouse | yes | diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 57d04c2700..9097e40c94 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -56,14 +56,6 @@ def __init__(self, name, project=None): super().__init__(f"Feature view {name} does not exist") -class InvalidSparkSessionException(Exception): - def __init__(self, spark_arg): - super().__init__( - f" Need Spark Session to convert results to spark data frame\ - recieved {type(spark_arg)} instead. " - ) - - class OnDemandFeatureViewNotFoundException(FeastObjectNotFoundException): def __init__(self, name, project=None): if project: diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 4cb525fc6e..38568ce79b 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -28,11 +28,7 @@ from feast import OnDemandFeatureView from feast.data_source import DataSource -from feast.errors import ( - EntitySQLEmptyResults, - InvalidEntityType, - InvalidSparkSessionException, -) +from feast.errors import EntitySQLEmptyResults, InvalidEntityType from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils @@ -528,28 +524,22 @@ def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame": """ try: - from pyspark.sql import DataFrame, SparkSession + from pyspark.sql import DataFrame except ImportError as e: from feast.errors import FeastExtrasDependencyImportError raise FeastExtrasDependencyImportError("spark", str(e)) - if isinstance(spark_session, SparkSession): - arrow_batches = self.to_arrow_batches() + spark_session.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") - if arrow_batches: - spark_df = reduce( - DataFrame.unionAll, - [ - spark_session.createDataFrame(batch.to_pandas()) - for batch in arrow_batches - ], - ) - return spark_df - else: - raise EntitySQLEmptyResults(self.to_sql()) - else: - raise InvalidSparkSessionException(spark_session) + # This can be improved by parallelizing the read of chunks + pandas_batches = self.to_pandas_batches() + + spark_df = reduce( + DataFrame.unionAll, + [spark_session.createDataFrame(batch) for batch in pandas_batches], + ) + return spark_df def persist( self,