Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix materialization when running on Spark cluster. #3166

Merged
merged 3 commits into from
Sep 9, 2022

Conversation

ckarwicki
Copy link
Contributor

What this PR does / why we need it:
When running materialization and have Spark offline store configured to use cluster (spark.master pointing to actual Spark master node) self.to_spark_df().write.parquet(temp_dir, mode="overwrite") will create parquet file in worker node but return pq.read_table(temp_dir) is executed on driver node and it can't read from worker. Proposed fix makes materialization work when run on Spark cluster.

Which issue(s) this PR fixes:

Fixes #

@ckarwicki ckarwicki changed the title Fix materialization when running on Spark cluster. fix: Fix materialization when running on Spark cluster. Sep 1, 2022
@achals
Copy link
Member

achals commented Sep 1, 2022

Thanks for the PR @ckarwicki , have you been able to test this out yourself?

@achals
Copy link
Member

achals commented Sep 1, 2022

/ok-to-test

@codecov-commenter
Copy link

codecov-commenter commented Sep 1, 2022

Codecov Report

Base: 66.85% // Head: 75.69% // Increases project coverage by +8.84% 🎉

Coverage data is based on head (b9d36c1) compared to base (d7b0c52).
Patch coverage: 0.00% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #3166      +/-   ##
==========================================
+ Coverage   66.85%   75.69%   +8.84%     
==========================================
  Files         175      211      +36     
  Lines       15848    17923    +2075     
==========================================
+ Hits        10595    13567    +2972     
+ Misses       5253     4356     -897     
Flag Coverage Δ
integrationtests 66.85% <ø> (-0.01%) ⬇️
unittests 57.90% <0.00%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ffline_stores/contrib/spark_offline_store/spark.py 7.72% <0.00%> (ø)
...on/feast/infra/materialization/snowflake_engine.py 92.09% <0.00%> (-0.05%) ⬇️
sdk/python/feast/infra/provider.py 77.94% <0.00%> (ø)
sdk/python/feast/infra/passthrough_provider.py 96.29% <0.00%> (ø)
.../python/feast/infra/feature_servers/base_config.py 100.00% <0.00%> (ø)
...n/feast/infra/feature_servers/aws_lambda/config.py 100.00% <0.00%> (ø)
..._stores/contrib/postgres_offline_store/postgres.py 34.14% <0.00%> (ø)
...ine_stores/contrib/cassandra_repo_configuration.py 100.00% <0.00%> (ø)
...offline_stores/contrib/spark_repo_configuration.py 20.00% <0.00%> (ø)
...thon/feast/infra/utils/postgres/postgres_config.py 100.00% <0.00%> (ø)
... and 113 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

@ckarwicki
Copy link
Contributor Author

ckarwicki commented Sep 2, 2022

@achals Yes, this has been tested on Spark cluster and local mode.

@adchia
Copy link
Collaborator

adchia commented Sep 2, 2022

Hey! @ckarwicki can you also sign your commits?

@niklasvm
Copy link
Collaborator

niklasvm commented Sep 3, 2022

This may break some of the unit tests that test for data types being preserved between arrow and spark. I previously changed this method to resolve failing unit tests.

@ckarwicki
Copy link
Contributor Author

@niklasvm Unit test are all fine - you can see unit test checks on this PR - all are greed. This change preserves types and only removes code to create parquet files - which fails when run on Spark cluster. Instead of creating parquet file and then reading it to create Arrow table we are directly creating Arrow table from Pandas df. Besides toPandas() internally is also using Arrow.

@kevjumba
Copy link
Collaborator

kevjumba commented Sep 7, 2022

@ckarwicki can you sign your commits, the DCO check details should give you a command to sign them

When running materialization and have Spark offline store configured to use cluster (`spark.master` pointing to actual Spark master node) `self.to_spark_df().write.parquet(temp_dir, mode="overwrite")` will create parquet file in worker node but `return pq.read_table(temp_dir)` is executed on driver node and it can't read from worker. Proposed fix makes materialization work when run on Spark cluster.

Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
Signed-off-by: ckarwicki <[email protected]>
@ckarwicki
Copy link
Contributor Author

@kevjumba signed commits.

Copy link
Collaborator

@kevjumba kevjumba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: adchia, ckarwicki, kevjumba

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@feast-ci-bot feast-ci-bot merged commit 175fd25 into feast-dev:master Sep 9, 2022
felixwang9817 pushed a commit that referenced this pull request Sep 20, 2022
# [0.25.0](v0.24.0...v0.25.0) (2022-09-20)

### Bug Fixes

* Broken Feature Service Link ([#3227](#3227)) ([e117082](e117082))
* Feature-server image is missing mysql dependency for mysql registry ([#3223](#3223)) ([ae37b20](ae37b20))
* Fix handling of TTL in Go server ([#3232](#3232)) ([f020630](f020630))
* Fix materialization when running on Spark cluster. ([#3166](#3166)) ([175fd25](175fd25))
* Fix push API to respect feature view's already inferred entity types ([#3172](#3172)) ([7c50ab5](7c50ab5))
* Fix release workflow ([#3144](#3144)) ([20a9dd9](20a9dd9))
* Fix Shopify timestamp bug and add warnings to help with debugging entity registration ([#3191](#3191)) ([de75971](de75971))
* Handle complex Spark data types in SparkSource ([#3154](#3154)) ([5ddb83b](5ddb83b))
* Local staging location provision ([#3195](#3195)) ([cdf0faf](cdf0faf))
* Remove bad snowflake offline store method ([#3204](#3204)) ([dfdd0ca](dfdd0ca))
* Remove opening file object when validating S3 parquet source ([#3217](#3217)) ([a906018](a906018))
* Snowflake config file search error ([#3193](#3193)) ([189afb9](189afb9))
* Update Snowflake Online docs ([#3206](#3206)) ([7bc1dff](7bc1dff))

### Features

* Add `to_remote_storage` functionality to `SparkOfflineStore` ([#3175](#3175)) ([2107ce2](2107ce2))
* Add ability to give boto extra args for registry config ([#3219](#3219)) ([fbc6a2c](fbc6a2c))
* Add health endpoint to py server ([#3202](#3202)) ([43222f2](43222f2))
* Add snowflake support for date & number with scale ([#3148](#3148)) ([50e8755](50e8755))
* Add tag kwarg to set Snowflake online store table path ([#3176](#3176)) ([39aeea3](39aeea3))
* Add workgroup to athena offline store config ([#3139](#3139)) ([a752211](a752211))
* Implement spark materialization engine ([#3184](#3184)) ([a59c33a](a59c33a))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants