Skip to content

Commit

Permalink
[SPARK-20519][SQL][CORE] Modify to prevent some possible runtime exce…
Browse files Browse the repository at this point in the history
…ptions

Signed-off-by: liuxian <liu.xian3zte.com.cn>

## What changes were proposed in this pull request?

When the input parameter is null, may be a runtime exception occurs

## How was this patch tested?
Existing unit tests

Author: liuxian <[email protected]>

Closes apache#17796 from 10110346/wip_lx_0428.
  • Loading branch information
10110346 authored and liyichao committed May 24, 2017
1 parent 1055955 commit 17fd9b1
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ private[spark] class PythonAccumulatorV2(
private val serverPort: Int)
extends CollectionAccumulator[Array[Byte]] {

Utils.checkHost(serverHost, "Expected hostname")
Utils.checkHost(serverHost)

val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[deploy] object DeployMessages {
memory: Int,
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ private[deploy] object DeployMessages {

// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Utils.checkHostPort(hostPort, "Required hostport")
Utils.checkHostPort(hostPort)
}

case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
Expand Down Expand Up @@ -183,7 +183,7 @@ private[deploy] object DeployMessages {
completedDrivers: Array[DriverInfo],
status: MasterState) {

Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)

def uri: String = "spark://" + host + ":" + port
Expand All @@ -201,7 +201,7 @@ private[deploy] object DeployMessages {
drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String,
cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {

Utils.checkHost(host, "Required hostname")
Utils.checkHost(host)
assert (port > 0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[deploy] class Master(
private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0

Utils.checkHost(address.host, "Expected hostname")
Utils.checkHost(address.host)

private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) exte
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class WorkerInfo(
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Utils.checkHost(host)
assert (port > 0)

@transient var executors: mutable.HashMap[String, ExecutorDesc] = _ // executorId => info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[deploy] class Worker(
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port

Utils.checkHost(host, "Expected hostname")
Utils.checkHost(host)
assert (port > 0)

// A scheduled executor used to send messages at the specified time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ private[worker] class WorkerArguments(args: Array[String], conf: SparkConf) {
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class Executor(
private val conf = env.conf

// No ip or host:port - just hostname
Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
Utils.checkHost(executorHostname)
// must not have port specified.
assert (0 == Utils.parseHostPort(executorHostname)._2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BlockManagerId private (
def executorId: String = executorId_

if (null != host_) {
Utils.checkHost(host_, "Expected hostname")
Utils.checkHost(host_)
assert (port_ > 0)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] object RpcUtils {
def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): RpcEndpointRef = {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
Utils.checkHost(driverHost)
rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -937,12 +937,13 @@ private[spark] object Utils extends Logging {
customHostname.getOrElse(InetAddresses.toUriString(localIpAddress))
}

def checkHost(host: String, message: String = "") {
assert(host.indexOf(':') == -1, message)
def checkHost(host: String) {
assert(host != null && host.indexOf(':') == -1, s"Expected hostname (not IP) but got $host")
}

def checkHostPort(hostPort: String, message: String = "") {
assert(hostPort.indexOf(':') != -1, message)
def checkHostPort(hostPort: String) {
assert(hostPort != null && hostPort.indexOf(':') != -1,
s"Expected host and port but got $hostPort")
}

// Typically, this will be of order of number of nodes in cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Utils.checkHost(value)
host = value
parse(tail)

Expand Down

0 comments on commit 17fd9b1

Please sign in to comment.