Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add better spark support for snowflake offline store #3419

Closed
wants to merge 1 commit into from

Conversation

sfc-gh-madkins
Copy link
Collaborator

@sfc-gh-madkins sfc-gh-madkins commented Dec 28, 2022

Signed-off-by: miles.adkins [email protected]

What this PR does / why we need it:

Add spark output to snowflake

Which issue(s) this PR fixes:

Fixes #3364

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: sfc-gh-madkins

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@sfc-gh-madkins
Copy link
Collaborator Author

/ok-to-test

@sfc-gh-madkins
Copy link
Collaborator Author

@amithadiraju1694 try this out ... you will need to have a pyspark environment with the snowflake spark connector installed.

You will need to pass in the spark session plus a dict of snowflake login params ... see the function comments

@sfc-gh-madkins
Copy link
Collaborator Author

@amithadiraju1694 you probably already tried this code is my guess ... the reason you were getting that error is because the initial input dataframe is scoped to a different connection and spark cant find it

@amithadiraju1694
Copy link
Contributor

@amithadiraju1694 you probably already tried this code is my guess ... the reason you were getting that error is because the initial input dataframe is scoped to a different connection and spark cant find it

Thanks for this @sfc-gh-madkins. Tried different variation of this along with your original solution, but the execution never stops ( data bricks keeps saying "Running Command" ).

My final attempt looked like this in snowflake.py

`
def to_pyspark_df(self, spark_session: SparkSession, sfparam: dict) -> DataFrame:
"""
Method to convert snowflake query results to pyspark data frame.

    Args:
        spark_session: spark Session variable of current environment.

    Returns:
        spark_df: A pyspark dataframe.
    """

    if isinstance(spark_session, SparkSession):
        table_name = "feast_spark_" + uuid.uuid4().hex
        self.to_snowflake(table_name = table_name)
        query = f'SELECT * FROM "{table_name}"'

        spark_df = spark_session.read.format( "net.snowflake.spark.snowflake"  ).options(**sfparam).option("query", query).option("autopushdown" , "on").load()

        query = f'DROP TABLE "{table_name}"'
        execute_snowflake_statement(self.snowflake_conn, query)

        return spark_df` 

I tried the original solution as well, which's giving me the same result. I'm wondering if from original solution snowflake.py -> line no 486 to 500 should be run inside with query scope or outside of it. If inside, I'm confused on why it needs to be run inside that scope ?

@sfc-gh-madkins
Copy link
Collaborator Author

sfc-gh-madkins commented Dec 31, 2022 via email

@amithadiraju1694
Copy link
Contributor

amithadiraju1694 commented Jan 4, 2023

Are you sure you have the snowflake spark connector installed? Do you see the query being issued to the snowflake side? I was able to test this with success locally. Temporary tables are scoped to a specific snowflake session.

I was running my code on databricks, so snowflake connector for spark must be installed already ... Since we're giving "auto pushdown: on" query should be executing on snowflake side, though I'm not able to figure out why it's running forever.

Does self.to_snowflake(table_name=table_name) has to be called from with-in the with self._query_generator() as query scope ? I removed that bit in my previous trial, as I assumed that query variable was not being used anywhere in the code ( we're creating our own query right ? ).

@amithadiraju1694
Copy link
Contributor

amithadiraju1694 commented Jan 4, 2023

Are you sure you have the snowflake spark connector installed? Do you see the query being issued to the snowflake side? I was able to test this with success locally. Temporary tables are scoped to a specific snowflake session.

I debugged and found that, the program halts at self.to_snowflake(table_name) ; temporary table with given table name isn't created at all in given database.schema for some reason; not sure if this is cuz of access issues, I'm using a dev instance creds and should have required accesses.

@sfc-gh-madkins
Copy link
Collaborator Author

sfc-gh-madkins commented Jan 4, 2023 via email

@amithadiraju1694
Copy link
Contributor

Is there an error on the snowflake side?

in to_snowflake method, temporary argument was set to false by default, changing that to true solved the unresponsiveness of the query. But now, I see SQL compilation error , my 'DB.SCHEMA."table_name"' is not found or not authorized. I faced a similar issue before for which I made a quick fix, but even the quick fix isn't working now ( my schema is not public contains underscore in its name ).

@sfc-gh-madkins
Copy link
Collaborator Author

sfc-gh-madkins commented Jan 5, 2023 via email

@sfc-gh-madkins
Copy link
Collaborator Author

@amithadiraju1694 is this new PR going to break your existing code?

@sfc-gh-madkins
Copy link
Collaborator Author

@adchia this might cause a breaking change for a single user, but he has been unresponsive

auto-merge was automatically disabled April 21, 2023 20:26

Pull request was closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for Snowflake connector with Spark
3 participants