Skip to content

Commit

Permalink
feat: Add better spark support for snowflake offline store
Browse files Browse the repository at this point in the history
Signed-off-by: miles.adkins <[email protected]>
  • Loading branch information
sfc-gh-madkins committed Mar 11, 2023
1 parent 4f9ad7e commit 9affa5c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 44 deletions.
8 changes: 0 additions & 8 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ def __init__(self, name, project=None):
super().__init__(f"Feature view {name} does not exist")


class InvalidSparkSessionException(Exception):
def __init__(self, spark_arg):
super().__init__(
f" Need Spark Session to convert results to spark data frame\
recieved {type(spark_arg)} instead. "
)


class OnDemandFeatureViewNotFoundException(FeastObjectNotFoundException):
def __init__(self, name, project=None):
if project:
Expand Down
70 changes: 34 additions & 36 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import uuid
import warnings
from datetime import datetime
from functools import reduce
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand All @@ -28,11 +27,7 @@

from feast import OnDemandFeatureView
from feast.data_source import DataSource
from feast.errors import (
EntitySQLEmptyResults,
InvalidEntityType,
InvalidSparkSessionException,
)
from feast.errors import EntitySQLEmptyResults, InvalidEntityType
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
from feast.infra.offline_stores import offline_utils
Expand Down Expand Up @@ -459,47 +454,50 @@ def to_sql(self) -> str:
with self._query_generator() as query:
return query

def to_spark_df(self, spark_session: "SparkSession") -> "DataFrame":
def to_pyspark_df(
self, spark_session: "SparkSession", sfparams: dict
) -> "DataFrame":
"""
Method to convert snowflake query results to pyspark data frame.
Method to convert snowflake query results to pyspark dataframe.
Args:
spark_session: spark Session variable of current environment.
spark_session: SparkSession variable of current environment.
sfparams: sfparams = {
"sfURL" : "<account_identifier>.snowflakecomputing.com",
"sfUser" : "<user_name>",
"sfPassword" : "<password>",
"sfRole" : "<role>",
"sfWarehouse" : "<warehouse",
}
Returns:
spark_df: A pyspark dataframe.
"""

try:
from pyspark.sql import DataFrame, SparkSession
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("spark", str(e))

if isinstance(spark_session, SparkSession):
with self._query_generator() as query:

arrow_batches = execute_snowflake_statement(
self.snowflake_conn, query
).fetch_arrow_batches()

if arrow_batches:
spark_df = reduce(
DataFrame.unionAll,
[
spark_session.createDataFrame(batch.to_pandas())
for batch in arrow_batches
],
)
sfparams.update(
{
"sfDatabase": f"{self.config.offline_store.database}",
"sfSchema": f"{self.config.offline_store.schema_}",
}
)

return spark_df
table_name = "feast_spark_" + uuid.uuid4().hex
with self._query_generator() as query:
self.to_snowflake(table_name=table_name)

query = f'SELECT * FROM "{table_name}"'
spark_df = (
spark_session.read.format("snowflake")
.options(**sfparams)
.option("query", query)
.option("autopushdown", "on")
.load()
)

else:
raise EntitySQLEmptyResults(query)
query = f'DROP TABLE "{table_name}"'
execute_snowflake_statement(self.snowflake_conn, query)

else:
raise InvalidSparkSessionException(spark_session)
return spark_df

def persist(self, storage: SavedDatasetStorage, allow_overwrite: bool = False):
assert isinstance(storage, SavedDatasetSnowflakeStorage)
Expand Down

0 comments on commit 9affa5c

Please sign in to comment.