-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1630] Turn Null of Java/Scala into None of Python #1551
Conversation
QA tests have started for PR 1551. This patch merges cleanly. |
if (str == null) { | ||
dataOut.writeInt(SpecialLengths.NULL) | ||
} else { | ||
val bytes = str.getBytes(UTF8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alignment is off here
QA results for PR 1551: |
QA tests have started for PR 1551. This patch merges cleanly. |
@@ -344,7 +345,12 @@ private[spark] object PythonRDD extends Logging { | |||
throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass) | |||
} | |||
case other => | |||
throw new SparkException("Unexpected element type " + first.getClass) | |||
if (other == null) { | |||
dataOut.writeInt(SpecialLengths.NULL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it doesn't matter much here, but would it make sense to write a byte instead of an int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's header of var-length field, it's better to keep this header has fixed length, or you will need to deal with special var-length encoding.
Jenkins, retest this please. |
QA tests have started for PR 1551. This patch merges cleanly. |
QA results for PR 1551: |
We aren't passing completely arbitrary iterators of Java objects to writeIteratorToStream; instead, we only handle iterators of strings and byte arrays. Nulls in data read from Hadoop input formats should already be converted to None by the Java pickling code. Do you have an example where PythonRDD receives a null element and it's not due to a bug? I'm worried that this patch will mask the presence of bugs. |
throw new SparkException("Unexpected element type " + first.getClass) | ||
if (other == null) { | ||
dataOut.writeInt(SpecialLengths.NULL) | ||
writeIteratorToStream(iter, dataOut) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method isn't tail-recursive, so this will cause a StackOverflow if you try to write an iterator with thousands of consecutive nulls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we only have to worry about nulls when writing iterators from user-defined RDDs of strings. So, if we see an iterator that begins with null, we can assume that the remainder of the iterator contains only nulls or strings. Therefore, I think you can write out the first null followed by
iter.asInstanceOf[Iterator[String]].foreach { str =>
writeUTF(str, dataOut)
}
to process the remainder of the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wrong; this is tail-recursive. If we only expect nulls to occur in iterators of strings, then I think we should be able to remove the null checking here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to handle NPE as much as possible, until you can prove that NPE will not happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this is what I didn't understand about the whole PR: user code is not meant to call PythonRDD directly. Note that the whole PythonRDD object is private[spark]
. So where in the codebase today can we get nulls there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, but that's a private API, it doesn't matter. Does our own code do it?
Basically I'm worried that this significantly complicates our code for something that shouldn't happen. I'd rather have an NPE if our own code later passes nulls here (cause it really shouldn't be doing that since we control everything we pass in).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users want to call UDF in Java/Scala from PySpark, they have to use this private API to do it, so it's possible to have null in RDD[string] or RDD[Array[Byte]].
BTW, it will be helpful if we can skip some BAD rows during map/reduce, which was mentioned in MapReduce paper. This is not MUST have feature, but it really improve the robustness of whole framework, very useful for large scale jobs.
This PR try to improve the stability of PySpark, let users feel safer and happier to hack in PySpark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, sorry, I don't think this improves stability:
- Users are not supposed to call private APIs. In fact even Scala code can't call PythonRDD because that is private[spark] -- it's just an artifact of the way Scala implements package-private that the class becomes public in Java. If you'd like support for UDFs we need to add that as a separate, top-level feature.
- This change would mask bugs in the current way we write Python converters. Our current converters only pass in Strings and arrays of bytes, which shouldn't be null. (For datasets that contain null they convert it to a picked form of None already). This means that if someone introduces a bug in one of our existing code paths, that bug will be harder to fix because instead of being an NPE, it will be some weird value coming out in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW apart from the stability issue above with catching our own bugs, the reason I'm commenting is that this change also adds some moderately tricky code in a fairly important code path, increasing the chance of adding new bugs. That doesn't seem worth it to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's hold it.
QA tests have started for PR 1551. This patch merges cleanly. |
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) { | ||
// The right way to implement this would be to use TypeTags to get the full | ||
// type of T. Since I don't want to introduce breaking changes throughout the | ||
// entire Spark API, I have to use this hacky approach: | ||
def writeBytes(bytes: Array[Byte]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a legitimate case where a Iterator[Array[Byte]] will contain a null? I was hoping we'd only have to worry about nulls in Iterator[String].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Array[Byte] is similar to String, null can be generated by user's functions or RDDs, just like
RDD[String].map(x => if (x != null) x.toArray else x)
QA results for PR 1551: |
The failed tests cases is not related to this PR, how to retest it? |
Jenkins, retest this please. |
QA tests have started for PR 1551. This patch merges cleanly. |
QA results for PR 1551: |
Close this PR now, will reopen if needed. |
During serializing PythonRDD, it will cause an NPE if there null in it. This patch will handle it as None of Python.
This PR is based on #554, thanks to @kalpit.