-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-19794 Release HDFS Client after read/write checkpoint #17135
Conversation
I get the idea, but I'm not sure any of these are valid |
logInfo(s"Final output path $finalOutputPath already exists; not overwriting it") | ||
if (!fs.delete(tempOutputPath, false)) { | ||
logWarning(s"Error deleting ${tempOutputPath}") | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this doesn't encompass the span of usage for fs
-- better to just call fs.close()
at the end and not worry about manually closing in an error case? or expand the try-finally?
Actually, I am not sure we are supposed to call FileSystem.close()
because they are shared instances, cached and reused across the whole application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with @srowen , FileSystem
is a cached object, closing it means removed it from cache. I don't think we need to call this explicitly. Because by default it is designed to be shared.
@@ -216,6 +221,8 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |||
serializeStream.writeObject(partitioner) | |||
} { | |||
serializeStream.close() | |||
fileOutputStream.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, this is OK if serializeStream.close()
doesn't actually close the underlying stream (?) but not sure about the next line.
@@ -279,8 +287,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { | |||
|
|||
// Register an on-task-completion callback to close the input stream. | |||
context.addTaskCompletionListener(context => deserializeStream.close()) | |||
|
|||
deserializeStream.asIterator.asInstanceOf[Iterator[T]] | |||
Utils.tryWithSafeFinally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you can close it here, right? you're returning an iterator on the stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code will introduce issue, deserializaStream
should be called after finished, the code here will close this stream prematurely.
Also please look at L289, it already takes care of close
after the task is finished.
Test build #3592 has finished for PR 17135 at commit
|
I remember FileSystem will be cached internally by default. Closing it probably will introduce some performance regression. Did you see any case that FileSystem cache doesn't work properly? |
Yes this is substantially not something we can merge, so let's close this. |
It's not just a matter of performance regression - it will brake any other code that has references to the file system being closed. -1. |
Closes apache#16819 Closes apache#13467 Closes apache#16083 Closes apache#17135 Closes apache#8785 Closes apache#16278 Closes apache#16997 Closes apache#17073 Closes apache#17220
What changes were proposed in this pull request?
Close HDFS client and streams after reading and writing from HDFS .
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.