Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T] #3378

Closed
wants to merge 1 commit into from
Closed

[SPARK-4505][Core] Add a ClassTag parameter to CompactBuffer[T] #3378

wants to merge 1 commit into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Nov 20, 2014

Added a ClassTag parameter to CompactBuffer. So CompactBuffer[T] can create primitive arrays for primitive types. It will reduce the memory usage for primitive types significantly and only pay minor performance lost.

Here is my test code:

  // Call org.apache.spark.util.SizeEstimator.estimate
  def estimateSize(obj: AnyRef): Long = {
    val c = Class.forName("org.apache.spark.util.SizeEstimator$")
    val f = c.getField("MODULE$")
    val o = f.get(c)
    val m = c.getMethod("estimate", classOf[Object])
    m.setAccessible(true)
    m.invoke(o, obj).asInstanceOf[Long]
  }

  sc.parallelize(1 to 10000).groupBy(_ => 1).foreach {
    case (k, v) =>
      println(v.getClass() + " size: " + estimateSize(v))
  }

Using the previous CompactBuffer outputed

class org.apache.spark.util.collection.CompactBuffer size: 313358

Using the new CompactBuffer outputed

class org.apache.spark.util.collection.CompactBuffer size: 65712

In this case, the new CompactBuffer only used 20% memory of the previous one. It's really helpful for groupByKey when using a primitive value.

@SparkQA
Copy link

SparkQA commented Nov 20, 2014

Test build #23661 has started for PR 3378 at commit 4abdbba.

  • This patch merges cleanly.

@sryza
Copy link
Contributor

sryza commented Nov 20, 2014

This seems like probably a great idea. Do you know what the overhead of including a classtag is? Does it mean an extra pointer per object?

@zsxwing
Copy link
Member Author

zsxwing commented Nov 20, 2014

Does it mean an extra pointer per object?

No. E.g., ClassTag.Int will be shared by all CompactBuffer[Int]. Same approach has already bean used in RDD.

Sorry. Yes the CompactBuffer will has one extra pointer for ClassTag.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 20, 2014

It's weird. I just found both the sizes of old and new CompactBuffer(1) are 56. I cannot explain why.

Then I added a field to the old CompactBuffer like this:

class CompactBuffer[T] extends Seq[T] with Serializable {
  val dummy: AnyRef = null

  // First two elements
  private var element0: T = _
  private var element1: T = _

println(estimateSize(CompactBuffer[Int](1))) also outputs 56.

@aarondav
Copy link
Contributor

This does seem like a good change, though I'll note that I think groupBy is the only current user of this API that is able to have a primitive ClassTag. Still worthwhile, especially for future usage. I do wonder if it could have a runtime impact due to increased primitive wrapping, possibly creating a lot of short-lived garbage if it were iterated over many times.

@SparkQA
Copy link

SparkQA commented Nov 20, 2014

Test build #23661 has finished for PR 3378 at commit 4abdbba.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23661/
Test PASSed.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 20, 2014

It's weird. I just found both the sizes of old and new CompactBuffer(1) are 56.

Found the cause. My JVM enables UseCompressedOops. So in such case, due to alignment, the sizes are same.

@JoshRosen
Copy link
Contributor

Ping @rxin, since this seems like the sort of optimization that you'd be interested in.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 20, 2014

My motivation is that we encountered a skew data set that a special hot key has too many values and could not fit into memory. Spilling helps nothing in this case since groupBy will put all values of a key into a CompactBuffer. After this optimization, at least, my job could run using the same memory limitation.

@rxin
Copy link
Contributor

rxin commented Nov 20, 2014

We should definitely add a ClassTag since this can be used for primitive types. However, there might be places where we create a lot of CompactBuffers. I haven't had a chance to look at where CompactBuffers are used yet, but for those places, would it be possible to create a single ClassTag reference?

@zsxwing
Copy link
Member Author

zsxwing commented Nov 20, 2014

Cogroup uses CompactBuffer. However, it cannot add ClassTag due to its signature:

class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil)

Here rdds is Seq[RDD[_ <: Product2[K, _]]] without the real template type of RDDs

@zsxwing
Copy link
Member Author

zsxwing commented Nov 25, 2014

@rxin Is it OK to merge?

@pwendell
Copy link
Contributor

I don't understand the architecture here as well as @rxin but this change seems like a strict improvement in its current form, so I'm gonna pull it in. LGTM.

@asfgit asfgit closed this in c062224 Nov 30, 2014
@zsxwing zsxwing deleted the SPARK-4505 branch November 30, 2014 08:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants