Skip to content

Commit

Permalink
Use Scala regex, add executor-side logging on profile startup/shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe committed May 24, 2024
1 parent 6e1a3b2 commit 2299ab6
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids
import java.nio.ByteBuffer
import java.nio.channels.{Channels, WritableByteChannel}
import java.util.concurrent.{ConcurrentHashMap, Future, ScheduledExecutorService, TimeUnit}
import java.util.regex.Pattern

import scala.collection.mutable

Expand All @@ -36,7 +35,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.util.SerializableConfiguration

object ProfilerOnExecutor extends Logging {
private val jobPattern = Pattern.compile("SPARK_.*_JId_([0-9]+).*")
private val jobPattern = raw"SPARK_.*_JId_([0-9]+).*".r
private var writer: Option[ProfileWriter] = None
private var timeRanges: Option[Seq[(Long, Long)]] = None
private var jobRanges: RangeConfMatcher = null
Expand Down Expand Up @@ -88,16 +87,17 @@ object ProfilerOnExecutor extends Logging {
if (jobRanges.nonEmpty) {
val callerCtx = CallerContext.getCurrent
if (callerCtx != null) {
val m = jobPattern.matcher(callerCtx.getContext)
if (m.matches()) {
val jobId = m.group(1).toInt
if (jobRanges.contains(jobId)) {
synchronized {
activeJobs.add(jobId)
enable()
startPollingDriver()
callerCtx.getContext match {
case jobPattern(jid) =>
val jobId = jid.toInt
if (jobRanges.contains(jobId)) {
synchronized {
activeJobs.add(jobId)
enable()
startPollingDriver()
}
}
}
case _ =>
}
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ object ProfilerOnExecutor extends Logging {
class ProfileWriter(
val pluginCtx: PluginContext,
profilePathPrefix: String,
codec: Option[CompressionCodec]) extends Profiler.DataWriter {
codec: Option[CompressionCodec]) extends Profiler.DataWriter with Logging {
val executorId: String = pluginCtx.executorID()
private val outPath = getOutputPath(profilePathPrefix, codec)
private val out = openOutput(codec)
Expand All @@ -272,6 +272,7 @@ class ProfileWriter(
if (!isClosed) {
isClosed = true
out.close()
logWarning(s"Profiling completed, output written to $outPath")
pluginCtx.send(ProfileEndMsg(executorId, outPath.toString))
}
}
Expand All @@ -293,6 +294,7 @@ class ProfileWriter(
}

private def openOutput(codec: Option[CompressionCodec]): WritableByteChannel = {
logWarning(s"Profiler initialized, output will be written to $outPath")
val hadoopConf = pluginCtx.ask(ProfileInitMsg(executorId, outPath.toString))
.asInstanceOf[SerializableConfiguration].value
val fs = outPath.getFileSystem(hadoopConf)
Expand Down

0 comments on commit 2299ab6

Please sign in to comment.