Skip to content

Commit

Permalink
[SPARK-6980] Added creation of RpcTimeout with Seq of property keys
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed May 19, 2015
1 parent 23d2f26 commit a294569
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,24 @@ object RpcTimeout {
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
new RpcTimeout(timeout, messagePrefix + timeoutProp)
}

/**
* Lookup prioritized list of timeout properties in the configuration
* and create a RpcTimeout with the first set property key in the
* description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
*/
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
require(timeoutPropList.nonEmpty)

// Find the first set property or use the default value with the first property
val foundProp = timeoutPropList.view.map(x => (x, conf.getOption(x))).filter(_._2.isDefined).
map(y => (y._1, y._2.get)).headOption.getOrElse(timeoutPropList.head, defaultValue)

val timeout = { Utils.timeStringAsSeconds(foundProp._2) seconds }
new RpcTimeout(timeout, messagePrefix + foundProp._1)
}
}
19 changes: 5 additions & 14 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util

import scala.language.postfixOps
import scala.concurrent.duration._

import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
Expand Down Expand Up @@ -47,23 +48,13 @@ object RpcUtils {

/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): RpcTimeout = {
try {
RpcTimeout(conf, "spark.rpc.askTimeout")
}
catch {
case _: Throwable =>
RpcTimeout(conf, "spark.network.timeout", "120s")
}
RpcTimeout(conf, Seq("spark.rpc.askTimeout",
"spark.network.timeout"), "120s")
}

/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): RpcTimeout = {
try {
RpcTimeout(conf, "spark.rpc.lookupTimeout")
}
catch {
case _: Throwable =>
RpcTimeout(conf, "spark.network.timeout", "120s")
}
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout",
"spark.network.timeout"), "120s")
}
}

0 comments on commit a294569

Please sign in to comment.