-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48370][CONNECT] Checkpoint and localCheckpoint in Scala Spark Connect client #46683
Conversation
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
Outdated
Show resolved
Hide resolved
...r/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
Show resolved
Hide resolved
cd18fe4
to
7778484
Compare
@@ -0,0 +1,176 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private var shouldStopCleaner = false | ||
private[sql] lazy val cleaner = { | ||
shouldStopCleaner = true | ||
new ContextCleaner(this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the JVM one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that the interface forces to fix generated code org.apache.spark.connect.proto.CachedRemoteRelation
to inherit AutoCloseable
, or I have to create a wrapper (see https://docs.oracle.com/javase/9/docs/api/java/lang/ref/Cleaner.html)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And seems like Java protobuf gen does not allow it:
Don’t go looking for facilities similar to class inheritance, though – protocol buffers don’t do that.
https://protobuf.dev/getting-started/javatutorial/#defining-your-protocol-format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me make it simpler at least.
2cca97e
to
fad0593
Compare
…n instead of DataFrame ### What changes were proposed in this pull request? This PR addresses #46683 (comment) comment within Python, by using ID at the plan instead of DataFrame itself. ### Why are the changes needed? Because the DataFrame holds the relation ID, if DataFrame B are derived from DataFrame A, and DataFrame A is garbage-collected, then the cache might not exist anymore. See the example below: ```python df = spark.range(1).localCheckpoint() df2 = df.repartition(10) del df df2.collect() ``` ``` pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.sql.connect.common.InvalidPlanInput) No DataFrame with id a4efa660-897c-4500-bd4e-bd57cd0263d2 is found in the session cd4764b4-90a9-4249-9140-12a6e4a98cd3 ``` ### Does this PR introduce _any_ user-facing change? No, the main change has not been released out yet. ### How was this patch tested? Manually tested, and added a unittest. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46694 from HyukjinKwon/SPARK-48258-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
I believe I addressed all major comments here. I need to make one more followup to fix both Scala/Python. Let me merge this one. and address others in that PR if there are more to fix. Merged to master. |
… and eager required fields in CheckpointCommand ### What changes were proposed in this pull request? This PR is a followup of #46683 and #46570 that refactors `local` and `eager` required fields in `CheckpointCommand` ### Why are the changes needed? To make the code easier to maintain. ### Does this PR introduce _any_ user-facing change? No, the main change has not been released yet. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46712 from HyukjinKwon/SPARK-48370-SPARK-48258-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? This PR is a followup of #46683 that replaces our custom cleaner to JDK's cleaner. ### Why are the changes needed? Reuse the standard builtin library. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I manually tested via reenabling `CheckpointSuite.checkpoint gc derived DataFrame` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46726 from HyukjinKwon/SPARK-48370-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? This PR is a followup of apache#46683 that replaces our custom cleaner to JDK's cleaner. ### Why are the changes needed? Reuse the standard builtin library. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I manually tested via reenabling `CheckpointSuite.checkpoint gc derived DataFrame` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#46726 from HyukjinKwon/SPARK-48370-followup. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR adds
Dataset.checkpoint
andDataset.localCheckpoint
into Scala Spark Connect client. Python API was implemented at #46570Why are the changes needed?
For API parity.
Does this PR introduce any user-facing change?
Yes, it adds
Dataset.checkpoint
andDataset.localCheckpoint
into Scala Spark Connect client.How was this patch tested?
Unittests added.
Was this patch authored or co-authored using generative AI tooling?
No.