Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for self-contained profiling #10870

Merged
merged 5 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
}
rapidsShuffleHeartbeatManager.executorHeartbeat(id)
case m: GpuCoreDumpMsg => GpuCoreDumpHandler.handleMsg(m)
case m: ProfileMsg => ProfilerOnDriver.handleMsg(m)
case m => throw new IllegalStateException(s"Unknown message $m")
}
}
Expand All @@ -458,6 +459,7 @@ class RapidsDriverPlugin extends DriverPlugin with Logging {
RapidsPluginUtils.detectMultipleJars(conf)
RapidsPluginUtils.logPluginMode(conf)
GpuCoreDumpHandler.driverInit(sc, conf)
ProfilerOnDriver.init(sc, conf)

if (GpuShuffleEnv.isRapidsShuffleAvailable(conf)) {
GpuShuffleEnv.initShuffleManager()
Expand Down Expand Up @@ -507,6 +509,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
val sparkConf = pluginContext.conf()
val numCores = RapidsPluginUtils.estimateCoresOnExec(sparkConf)
val conf = new RapidsConf(extraConf.asScala.toMap)
ProfilerOnExecutor.init(pluginContext, conf)

// Checks if the current GPU architecture is supported by the
// spark-rapids-jni and cuDF libraries.
Expand Down Expand Up @@ -656,6 +659,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
GpuSemaphore.shutdown()
PythonWorkerSemaphore.shutdown()
GpuDeviceManager.shutdown()
ProfilerOnExecutor.shutdown()
Option(rapidsShuffleHeartbeatEndpoint).foreach(_.close())
extraExecutorPlugins.foreach(_.shutdown())
FileCache.shutdown()
Expand Down Expand Up @@ -692,6 +696,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
override def onTaskStart(): Unit = {
startTaskNvtx(TaskContext.get)
extraExecutorPlugins.foreach(_.onTaskStart())
ProfilerOnExecutor.onTaskStart()
}

override def onTaskSucceeded(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.util.Try

/**
* Determines if a value is in a comma-separated list of values and/or
* hyphenated ranges provided by the user for a configuration setting.
*/
class RangeConfMatcher(configKey: String, configValue: Option[String]) {
def this(conf: RapidsConf, entry: ConfEntry[String]) = {
this(entry.key, Some(conf.get(entry)))
}

def this(conf: RapidsConf, entry: OptionalConfEntry[String]) = {
this(entry.key, conf.get(entry))
}

private val (stringSet, intRanges) = {
configValue.map { cv =>
val parts = cv.split(',')
val (rangeParts, singleParts) = parts.partition(_.contains('-'))
val ranges = try {
rangeParts.map(RangeConfMatcher.parseRange)
} catch {
case e: IllegalArgumentException =>
throw new IllegalArgumentException(s"Invalid range settings for $configKey: $cv", e)
}
(singleParts.map(_.trim).toSet, ranges)
}.getOrElse((Set.empty[String], Array.empty[(Int, Int)]))
}

val isEmpty: Boolean = stringSet.isEmpty && intRanges.isEmpty
val nonEmpty: Boolean = !isEmpty

def size: Int = {
stringSet.size + intRanges.map {
case (start, end) => end - start + 1
}.sum
}

/** Returns true if the string value is in the configured values or ranges. */
def contains(v: String): Boolean = {
stringSet.contains(v) || (intRanges.nonEmpty && Try(v.toInt).map(checkRanges).getOrElse(false))
}

/** Returns true if the integer value is in the configured values or ranges. */
def contains(v: Int): Boolean = {
checkRanges(v) || stringSet.contains(v.toString)
}

private def checkRanges(v: Int): Boolean = {
intRanges.exists {
case (start, end) => start <= v && v <= end
}
}
}

object RangeConfMatcher {
def parseRange(rangeStr: String): (Int,Int) = {
val rangePair = rangeStr.split('-')
if (rangePair.length != 2) {
throw new IllegalArgumentException(s"Invalid range: $rangeStr")
}
val start = rangePair.head.trim.toInt
val end = rangePair.last.trim.toInt
if (end < start) {
throw new IllegalArgumentException(s"Invalid range: $rangeStr")
}
(start, end)
}
}
83 changes: 83 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,71 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.checkValues(Set("DEBUG", "MODERATE", "ESSENTIAL"))
.createWithDefault("MODERATE")

val PROFILE_PATH = conf("spark.rapids.profile.pathPrefix")
.doc("Enables profiling and specifies a URI path to use when writing profile data")
.internal()
.stringConf
.createOptional

val PROFILE_EXECUTORS = conf("spark.rapids.profile.executors")
.doc("Comma-separated list of executors IDs and hyphenated ranges of executor IDs to " +
"profile when profiling is enabled")
.internal()
.stringConf
.createWithDefault("0")

val PROFILE_TIME_RANGES_SECONDS = conf("spark.rapids.profile.timeRangesInSeconds")
.doc("Comma-separated list of start-end ranges of time, in seconds, since executor startup " +
"to start and stop profiling. For example, a value of 10-30,100-110 will have the profiler " +
"wait for 10 seconds after executor startup then profile for 20 seconds, then wait for " +
"70 seconds then profile again for the next 10 seconds")
.internal()
.stringConf
.createOptional

val PROFILE_JOBS = conf("spark.rapids.profile.jobs")
.doc("Comma-separated list of job IDs and hyphenated ranges of job IDs to " +
"profile when profiling is enabled")
.internal()
.stringConf
.createOptional

val PROFILE_STAGES = conf("spark.rapids.profile.stages")
.doc("Comma-separated list of stage IDs and hyphenated ranges of stage IDs to " +
"profile when profiling is enabled")
.internal()
.stringConf
.createOptional

val PROFILE_DRIVER_POLL_MILLIS = conf("spark.rapids.profile.driverPollMillis")
.doc("Interval in milliseconds the executors will poll for job and stage completion when " +
"stage-level profiling is used.")
.internal()
.integerConf
.createWithDefault(1000)

val PROFILE_COMPRESSION = conf("spark.rapids.profile.compression")
.doc("Specifies the compression codec to use when writing profile data, one of " +
"zstd or none")
.internal()
.stringConf
.transform(_.toLowerCase(java.util.Locale.ROOT))
.checkValues(Set("zstd", "none"))
.createWithDefault("zstd")

val PROFILE_FLUSH_PERIOD_MILLIS = conf("spark.rapids.profile.flushPeriodMillis")
.doc("Specifies the time period in milliseconds to flush profile records. " +
"A value <= 0 will disable time period flushing.")
.internal()
.integerConf
.createWithDefault(0)

val PROFILE_WRITE_BUFFER_SIZE = conf("spark.rapids.profile.writeBufferSize")
.doc("Buffer size to use when writing profile records.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(1024 * 1024)

// ENABLE/DISABLE PROCESSING

val SQL_ENABLED = conf("spark.rapids.sql.enabled")
Expand Down Expand Up @@ -2397,6 +2462,24 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val metricsLevel: String = get(METRICS_LEVEL)

lazy val profilePath: Option[String] = get(PROFILE_PATH)

lazy val profileExecutors: String = get(PROFILE_EXECUTORS)

lazy val profileTimeRangesSeconds: Option[String] = get(PROFILE_TIME_RANGES_SECONDS)

lazy val profileJobs: Option[String] = get(PROFILE_JOBS)

lazy val profileStages: Option[String] = get(PROFILE_STAGES)

lazy val profileDriverPollMillis: Int = get(PROFILE_DRIVER_POLL_MILLIS)

lazy val profileCompression: String = get(PROFILE_COMPRESSION)

lazy val profileFlushPeriodMillis: Int = get(PROFILE_FLUSH_PERIOD_MILLIS)

lazy val profileWriteBufferSize: Long = get(PROFILE_WRITE_BUFFER_SIZE)

lazy val isSqlEnabled: Boolean = get(SQL_ENABLED)

lazy val isSqlExecuteOnGPU: Boolean = get(SQL_MODE).equals("executeongpu")
Expand Down
Loading