From 2299ab68b5084352fef7ee8ac8ecbef00c8b5a11 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 24 May 2024 15:25:22 -0500 Subject: [PATCH] Use Scala regex, add executor-side logging on profile startup/shutdown --- .../com/nvidia/spark/rapids/profiler.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala index 2714c655b5b..b9787d539f0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/profiler.scala @@ -19,7 +19,6 @@ package com.nvidia.spark.rapids import java.nio.ByteBuffer import java.nio.channels.{Channels, WritableByteChannel} import java.util.concurrent.{ConcurrentHashMap, Future, ScheduledExecutorService, TimeUnit} -import java.util.regex.Pattern import scala.collection.mutable @@ -36,7 +35,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.util.SerializableConfiguration object ProfilerOnExecutor extends Logging { - private val jobPattern = Pattern.compile("SPARK_.*_JId_([0-9]+).*") + private val jobPattern = raw"SPARK_.*_JId_([0-9]+).*".r private var writer: Option[ProfileWriter] = None private var timeRanges: Option[Seq[(Long, Long)]] = None private var jobRanges: RangeConfMatcher = null @@ -88,16 +87,17 @@ object ProfilerOnExecutor extends Logging { if (jobRanges.nonEmpty) { val callerCtx = CallerContext.getCurrent if (callerCtx != null) { - val m = jobPattern.matcher(callerCtx.getContext) - if (m.matches()) { - val jobId = m.group(1).toInt - if (jobRanges.contains(jobId)) { - synchronized { - activeJobs.add(jobId) - enable() - startPollingDriver() + callerCtx.getContext match { + case jobPattern(jid) => + val jobId = jid.toInt + if (jobRanges.contains(jobId)) { + synchronized { + activeJobs.add(jobId) + enable() + startPollingDriver() + } } - } + case _ => } } } @@ -254,7 +254,7 @@ object ProfilerOnExecutor extends Logging { class ProfileWriter( val pluginCtx: PluginContext, profilePathPrefix: String, - codec: Option[CompressionCodec]) extends Profiler.DataWriter { + codec: Option[CompressionCodec]) extends Profiler.DataWriter with Logging { val executorId: String = pluginCtx.executorID() private val outPath = getOutputPath(profilePathPrefix, codec) private val out = openOutput(codec) @@ -272,6 +272,7 @@ class ProfileWriter( if (!isClosed) { isClosed = true out.close() + logWarning(s"Profiling completed, output written to $outPath") pluginCtx.send(ProfileEndMsg(executorId, outPath.toString)) } } @@ -293,6 +294,7 @@ class ProfileWriter( } private def openOutput(codec: Option[CompressionCodec]): WritableByteChannel = { + logWarning(s"Profiler initialized, output will be written to $outPath") val hadoopConf = pluginCtx.ask(ProfileInitMsg(executorId, outPath.toString)) .asInstanceOf[SerializableConfiguration].value val fs = outPath.getFileSystem(hadoopConf)