Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6986][SQL] Use Serializer2 in more cases. #5849

Closed
wants to merge 5 commits into from
Closed

[SPARK-6986][SQL] Use Serializer2 in more cases. #5849

wants to merge 5 commits into from

Conversation

yhuai
Copy link
Contributor

@yhuai yhuai commented May 1, 2015

With 0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use SparkSqlSerializer2 in more cases.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31612 has started for PR 5849 at commit 5073c54.

@SparkQA
Copy link

SparkQA commented May 1, 2015

Test build #31612 has finished for PR 5849 at commit 5073c54.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • "public class " + className + extendsText + " implements java.io.Serializable
    • class DataFrameStatFunctions(object):

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31612/
Test FAILed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31657 has started for PR 5849 at commit 8627238.

@yhuai yhuai changed the title [SPARK-6368][SQL][Follow-up] Use Serializer2 in more cases. [SPARK-6986][SQL] Use Serializer2 in more cases. May 2, 2015
@SparkQA
Copy link

SparkQA commented May 2, 2015

Test build #31657 timed out for PR 5849 at commit 8627238 after a configured wait of 150m.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/31657/
Test FAILed.

@yhuai
Copy link
Contributor Author

yhuai commented May 4, 2015

Seems the reason of those test failures is that we are buffering records in the reader side of the shuffle process and we are currently using mutable rows, which require explicitly copy when we use buffer.

@sryza Is there any place in the sort based shuffle that we buffer key-value pairs?

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32025 has started for PR 5849 at commit 39179da.

@SparkQA
Copy link

SparkQA commented May 6, 2015

Test build #32025 has finished for PR 5849 at commit 39179da.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class JoinedRow6 extends Row
    • case class WindowSpecDefinition(
    • case class WindowSpecReference(name: String) extends WindowSpec
    • sealed trait FrameBoundary
    • case class ValuePreceding(value: Int) extends FrameBoundary
    • case class ValueFollowing(value: Int) extends FrameBoundary
    • case class SpecifiedWindowFrame(
    • trait WindowFunction extends Expression
    • case class UnresolvedWindowFunction(
    • case class UnresolvedWindowExpression(
    • case class WindowExpression(
    • case class WithWindowDefinition(
    • case class Window(
    • case class Window(
    • case class ComputedWindow(

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32025/
Test FAILed.

@yhuai
Copy link
Contributor Author

yhuai commented May 6, 2015

stack trace is

java.lang.UnsupportedOperationException
    at org.apache.spark.util.collection.ChainedBufferOutputStream.write(ChainedBuffer.scala:137)
    at java.io.DataOutputStream.writeByte(DataOutputStream.java:153)
    at org.apache.spark.sql.execution.SparkSqlSerializer2$$anonfun$createSerializationFunction$1.apply(SparkSqlSerializer2.scala:293)
    at org.apache.spark.sql.execution.SparkSqlSerializer2$$anonfun$createSerializationFunction$1.apply(SparkSqlSerializer2.scala:187)
    at org.apache.spark.sql.execution.Serializer2SerializationStream.writeKey(SparkSqlSerializer2.scala:65)
    at org.apache.spark.util.collection.PartitionedSerializedPairBuffer.insert(PartitionedSerializedPairBuffer.scala:74)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

@sryza Is there any particular reason that write(b: Int) is not supported in ChainedBufferOutputStream? Or, we just have not implemented it?

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 7, 2015

Test build #32084 has started for PR 5849 at commit 7d94b87.

@SparkQA
Copy link

SparkQA commented May 7, 2015

Test build #32084 has finished for PR 5849 at commit 7d94b87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32084/
Test PASSed.

}

override def readKey[T: ClassTag](): T = {
readKeyFunc()
key.asInstanceOf[T]
readKeyFunc().asInstanceOf[T]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make a performance difference if we move the cast to the line where we define readKeyFunc? If we did that, I think we'd be doing one cast vs. casting on each record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I guess we don't have the class tag for T when we create the deserialization function, so this approach looks fine to me.

() => {
// If the schema is null, the returned function does nothing when it get called.
if (schema != null) {
var i = 0
val mutableRow = new GenericMutableRow(schema.length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai and I chatted about this offline. The reason that we need to perform this copy is because this patch allows SqlSerializer2 to be used in cases where the shuffle performs a sort. In HashShuffleReader, Spark ends up passing the iterator returned from this deserializer to ExternalSorter, which buffers rows because it needs to sort them based on their contents.

I think that we only need to copy the row in cases where we're shuffling with a key ordering. To avoid unnecessary copying in other cases, I think that we can extend SparkSqlSerializer2's constructor to accept a boolean flag that indicates whether we should copy, and should thread that flag all the way down to here. In Exchange, where we create the serializer, we can check whether the shuffle will use a keyOrdering; if it does, then we'll enable copying. Avoiding this copy in other cases should provide a nice performance boost for aggregation queries.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@SparkQA
Copy link

SparkQA commented May 7, 2015

Test build #32158 has started for PR 5849 at commit 53a5eaa.

@SparkQA
Copy link

SparkQA commented May 8, 2015

Test build #32158 has finished for PR 5849 at commit 53a5eaa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ElementwiseProduct extends UnaryTransformer[Vector, Vector, ElementwiseProduct]
    • class ElementwiseProduct(val scalingVector: Vector) extends VectorTransformer
    • class RegressionMetrics(JavaModelWrapper):

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/32158/
Test PASSed.

@JoshRosen
Copy link
Contributor

LGTM overall, especially since this code seems to be well covered by tests.

@yhuai
Copy link
Contributor Author

yhuai commented May 8, 2015

Thanks for reviewing it. I am merging it to master and branch 1.4.

asfgit pushed a commit that referenced this pull request May 8, 2015
With 0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases.

Author: Yin Huai <[email protected]>

Closes #5849 from yhuai/serializer2MoreCases and squashes the following commits:

53a5eaa [Yin Huai] Josh's comments.
487f540 [Yin Huai] Use BufferedOutputStream.
8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join.
c7e2129 [Yin Huai] Update tests.
4513d13 [Yin Huai] Use Serializer2 in more places.

(cherry picked from commit 3af423c)
Signed-off-by: Yin Huai <[email protected]>
@asfgit asfgit closed this in 3af423c May 8, 2015
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 28, 2015
With apache@0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases.

Author: Yin Huai <[email protected]>

Closes apache#5849 from yhuai/serializer2MoreCases and squashes the following commits:

53a5eaa [Yin Huai] Josh's comments.
487f540 [Yin Huai] Use BufferedOutputStream.
8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join.
c7e2129 [Yin Huai] Update tests.
4513d13 [Yin Huai] Use Serializer2 in more places.
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
With apache@0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases.

Author: Yin Huai <[email protected]>

Closes apache#5849 from yhuai/serializer2MoreCases and squashes the following commits:

53a5eaa [Yin Huai] Josh's comments.
487f540 [Yin Huai] Use BufferedOutputStream.
8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join.
c7e2129 [Yin Huai] Update tests.
4513d13 [Yin Huai] Use Serializer2 in more places.
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
With apache@0a2b15c, the serialization stream and deserialization stream has enough information to determine it is handling a key-value pari, a key, or a value. It is safe to use `SparkSqlSerializer2` in more cases.

Author: Yin Huai <[email protected]>

Closes apache#5849 from yhuai/serializer2MoreCases and squashes the following commits:

53a5eaa [Yin Huai] Josh's comments.
487f540 [Yin Huai] Use BufferedOutputStream.
8385f95 [Yin Huai] Always create a new row at the deserialization side to work with sort merge join.
c7e2129 [Yin Huai] Update tests.
4513d13 [Yin Huai] Use Serializer2 in more places.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants