Skip to content

Commit

Permalink
[SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTim…
Browse files Browse the repository at this point in the history
…out to fix MiMa errors
  • Loading branch information
BryanCutler committed Jun 24, 2015
1 parent fa6ed82 commit fadaf6f
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 15 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
private val lostMasters = new HashSet[Address]
private var activeMasterActor: ActorSelection = null

val timeout = RpcUtils.askTimeout(conf)
val timeout = RpcUtils.askRpcTimeout(conf)

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private[spark] class AppClient(
def stop() {
if (actor != null) {
try {
val timeout = RpcUtils.askTimeout(conf)
val timeout = RpcUtils.askRpcTimeout(conf)
val future = actor.ask(StopAppClient)(timeout.duration)
timeout.awaitResult(future)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ private[deploy] object Master extends Logging {
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = RpcUtils.askTimeout(conf)
val timeout = RpcUtils.askRpcTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout.duration)
val portsResponse = timeout.awaitResult(portsRequest).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
with UIRoot {

val masterActorRef = master.self
val timeout = RpcUtils.askTimeout(master.conf)
val timeout = RpcUtils.askRpcTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)

val masterPage = new MasterPage(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: Sp
extends KillRequestServlet {

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
val askTimeout = RpcUtils.askRpcTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
val k = new KillSubmissionResponse
Expand All @@ -90,7 +90,7 @@ private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf:
extends StatusRequestServlet {

protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
val askTimeout = RpcUtils.askRpcTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
Expand Down Expand Up @@ -175,7 +175,7 @@ private[rest] class StandaloneSubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = RpcUtils.askTimeout(conf)
val askTimeout = RpcUtils.askRpcTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)

initialize()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)

private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
Expand Down Expand Up @@ -118,4 +118,5 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
throw new SparkException(
s"Error sending message [message = $message]", lastException)
}

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] object RpcEnv {
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {

private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class BlockManagerMaster(
isDriver: Boolean)
extends Logging {

val timeout = RpcUtils.askTimeout(conf)
val timeout = RpcUtils.askRpcTimeout(conf)

/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private[spark] object AkkaUtils extends Logging {
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
val timeout = RpcUtils.lookupTimeout(conf)
val timeout = RpcUtils.lookupRpcTimeout(conf)
logInfo(s"Connecting to $name: $url")
timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
}
Expand All @@ -211,7 +211,7 @@ private[spark] object AkkaUtils extends Logging {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
val timeout = RpcUtils.lookupTimeout(conf)
val timeout = RpcUtils.lookupRpcTimeout(conf)
logInfo(s"Connecting to $name: $url")
timeout.awaitResult(actorSystem.actorSelection(url).resolveOne(timeout.duration))
}
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.util

import scala.concurrent.duration.FiniteDuration
import scala.language.postfixOps

import org.apache.spark.{SparkEnv, SparkConf}
Expand Down Expand Up @@ -46,12 +47,22 @@ object RpcUtils {
}

/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): RpcTimeout = {
def askRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
}

@deprecated("use askRpcTimeout instead", "1.5.0")
def askTimeout(conf: SparkConf): FiniteDuration = {
askRpcTimeout(conf).duration
}

/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): RpcTimeout = {
def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}

@deprecated("use lookupRpcTimeout instead", "1.5.0")
def lookupTimeout(conf: SparkConf): FiniteDuration = {
lookupRpcTimeout(conf).duration
}
}

0 comments on commit fadaf6f

Please sign in to comment.