-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1630: Make PythonRDD handle NULL elements and strings gracefully #554
Conversation
@@ -301,15 +301,25 @@ private[spark] object PythonRDD { | |||
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) | |||
} | |||
case other => | |||
throw new SparkException("Unexpected element type " + first.getClass) | |||
Option(other) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more obvious to just do
if (other == null) {
} else {
}
then a pattern matching.
Can one of the admins verify this patch? |
val bytes = str.getBytes(UTF8) | ||
dataOut.writeInt(bytes.length) | ||
dataOut.write(bytes) | ||
Option(str) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here also
Thanks, @kalpit. This looks pretty good. I left a couple comments on style. |
if (str == null) { | ||
logDebug("Encountered NULL string. We skip writing NULL to stream.") | ||
} else { | ||
val bytes = str.getBytes(UTF8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the indent is off here (2-space indent)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this one also?
Thanks - just one more tiny thing about indent ... |
Jenkins, test this please |
Merged build triggered. |
Merged build started. |
I'm curious, when did you get nulls in practice? Wouldn't it be better to pass a null to Python and have it display as None? |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@mateiz I ran into this when my custom RDD produced nulls for some elements within a partition/split (during compute()). It would indeed be better to pass a null to Python and have it display it as None. One solution is to a pick a TOKEN that we write into the tmp file and then translate it to a "None" during read. This, however, is not failsafe because there is a remote possibility of string data being identical to the TOKEN. Perhaps we could address that by fencing regular data by a special character and treating data lacking that fence as tokens. In any case, the above solution (or an alternative) would be a relatively larger change, and I preferred fixing at least the NPEs in PythonRDD for short term (stack trace is in JIRA ticket). What do you think ? |
But that means that the NPEs are only happening with your custom RDD, right? They won't happen for regular Spark users. I think we should pass None here. One way to do it is to select a negative length (e.g. -3) to represent null, and pass that to Python. We already use other negative lengths for other special flags. |
I suspect that the NPEs will happen for any PySpark User who has an RDD that returns null for some input "x" based on the lambda/transform. Check out the test case I added to "PythonRDDSuite.scala" to reproduce the NPE. I considered the idea of using negative length (-4) to pass "None" to python (PythonRDD.SpecialLengths -1 to -3 are taken). The tricky part however is that the read() method returns an array of bytes based on the length. Existing code treats empty array as end of data/stream. So I am not sure how we would communicate "None" to python. Thoughts ? |
Lambdas in Python that return None will work fine because we use pickling for all data after that. The only way this problem can happen is if a Java RDD has null in it. Do you have an example in Python only (with the current PySpark) where this happens? |
I see your point. I don't have a Python-only use-case that can trigger the NPE. My custom RDD implementation had a corner-case in which RDD's compute() method returned a "null" in the iterator stream. I have fixed my custom RDD implementation to not do that, so I don't run into this NPE anymore. However, should anyone else out there ever implement a custom RDD of similar nature (has nulls for some elements in a partition's iterator stream) and tries accessing such an RDD from PySpark, he/she would run into the NPE, so I thought it would be nicer if we handled nulls in the stream gracefully. |
Yeah, but in that case I think we have to figure out a way with the lengths. I haven't had time to look into it, but basically the UTF decoder in Python needs to deal with negative lengths sent from Scala. |
@kalpit pls take a look at #644, where I propose to use |
…che#554. SPARK-1056. Fix header comment in Executor to not imply that it's only u... ...sed for Mesos and Standalone. Author: Sandy Ryza <[email protected]> == Merge branch commits == commit 1f2443d902a26365a5c23e4af9077e1539ed2eab Author: Sandy Ryza <[email protected]> Date: Thu Feb 6 15:03:50 2014 -0800 SPARK-1056. Fix header comment in Executor to not imply that it's only used for Mesos and Standalone
Can one of the admins verify this patch? |
I've closed this since it was fixed separately. Thanks for sending a patch here. |
Can one of the admins verify this patch? |
Perform apt-get update before install
Have added a unit test that validates the fix. We no longer NPE.