From a2945691c8dc8d157c8984233b5931f9ae95a528 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 19 May 2015 10:14:27 -0700 Subject: [PATCH] [SPARK-6980] Added creation of RpcTimeout with Seq of property keys --- .../scala/org/apache/spark/rpc/RpcEnv.scala | 20 +++++++++++++++++++ .../org/apache/spark/util/RpcUtils.scala | 19 +++++------------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 1f00547d9e013..71924475826d9 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -250,4 +250,24 @@ object RpcTimeout { val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds } new RpcTimeout(timeout, messagePrefix + timeoutProp) } + + /** + * Lookup prioritized list of timeout properties in the configuration + * and create a RpcTimeout with the first set property key in the + * description. + * Uses the given default value if property is not set + * @param conf configuration properties containing the timeout + * @param timeoutPropList prioritized list of property keys for the timeout in seconds + * @param defaultValue default timeout value in seconds if no properties found + */ + def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = { + require(timeoutPropList.nonEmpty) + + // Find the first set property or use the default value with the first property + val foundProp = timeoutPropList.view.map(x => (x, conf.getOption(x))).filter(_._2.isDefined). + map(y => (y._1, y._2.get)).headOption.getOrElse(timeoutPropList.head, defaultValue) + + val timeout = { Utils.timeStringAsSeconds(foundProp._2) seconds } + new RpcTimeout(timeout, messagePrefix + foundProp._1) + } } diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index a853f39d9a493..b028dc1e3a031 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import scala.language.postfixOps +import scala.concurrent.duration._ import org.apache.spark.{SparkEnv, SparkConf} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} @@ -47,23 +48,13 @@ object RpcUtils { /** Returns the default Spark timeout to use for RPC ask operations. */ def askTimeout(conf: SparkConf): RpcTimeout = { - try { - RpcTimeout(conf, "spark.rpc.askTimeout") - } - catch { - case _: Throwable => - RpcTimeout(conf, "spark.network.timeout", "120s") - } + RpcTimeout(conf, Seq("spark.rpc.askTimeout", + "spark.network.timeout"), "120s") } /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ def lookupTimeout(conf: SparkConf): RpcTimeout = { - try { - RpcTimeout(conf, "spark.rpc.lookupTimeout") - } - catch { - case _: Throwable => - RpcTimeout(conf, "spark.network.timeout", "120s") - } + RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", + "spark.network.timeout"), "120s") } }