Skip to content

Commit

Permalink
[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetri…
Browse files Browse the repository at this point in the history
…cs to decrease the memory usage and GC overhead

Hostname in TaskMetrics will be created through deserialization, mostly the number of hostname is only the order of number of cluster node, so adding a cache layer to dedup the object could reduce the memory usage and alleviate GC overhead, especially for long-running and fast job generation applications like Spark Streaming.

Author: jerryshao <[email protected]>
Author: Saisai Shao <[email protected]>

Closes apache#5064 from jerryshao/SPARK-5523 and squashes the following commits:

3e2412a [jerryshao] Address the comments
b092a81 [Saisai Shao] Add a pool to cache the hostname
  • Loading branch information
jerryshao authored and tdas committed Jul 15, 2015
1 parent f957796 commit bb870e7
Showing 1 changed file with 20 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.executor

import java.io.{IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -210,10 +214,26 @@ class TaskMetrics extends Serializable {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
// Get the hostname from cached data, since hostname is the order of number of nodes in
// cluster, so using cached hostname will decrease the object number and alleviate the GC
// overhead.
_hostname = TaskMetrics.getCachedHostName(_hostname)
}
}

private[spark] object TaskMetrics {
private val hostNameCache = new ConcurrentHashMap[String, String]()

def empty: TaskMetrics = new TaskMetrics

def getCachedHostName(host: String): String = {
val canonicalHost = hostNameCache.putIfAbsent(host, host)
if (canonicalHost != null) canonicalHost else host
}
}

/**
Expand Down

0 comments on commit bb870e7

Please sign in to comment.