diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0e19143411e96..47019c04aada2 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -271,6 +271,18 @@ package object config { .toSequence .createWithDefault(GarbageCollectionMetrics.OLD_GENERATION_BUILTIN_GARBAGE_COLLECTORS) + private[spark] val EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS = + ConfigBuilder("spark.eventLog.includeTaskMetricsAccumulators") + .doc("Whether to include TaskMetrics' underlying accumulator values in the event log (as " + + "part of the Task/Stage/Job metrics' 'Accumulables' fields. This configuration defaults " + + "to false because the TaskMetrics values are already logged in the 'Task Metrics' " + + "fields (so the accumulator updates are redundant). This flag exists only as a " + + "backwards-compatibility escape hatch for applications that might rely on the old " + + "behavior. See SPARK-42204 for details.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val EVENT_LOG_OVERWRITE = ConfigBuilder("spark.eventLog.overwrite") .version("1.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index efd8fecb974e8..1e46142fab255 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -31,7 +31,7 @@ import org.apache.spark.deploy.history.EventLogFileWriter import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.util.{JsonProtocol, JsonProtocolOptions, Utils} /** * A SparkListener that logs events to persistent storage. @@ -74,6 +74,8 @@ private[spark] class EventLoggingListener( private val liveStageExecutorMetrics = mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]] + private[this] val jsonProtocolOptions = new JsonProtocolOptions(sparkConf) + /** * Creates the log file in the configured log directory. */ @@ -84,7 +86,7 @@ private[spark] class EventLoggingListener( private def initEventLog(): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) - val eventJson = JsonProtocol.sparkEventToJsonString(metadata) + val eventJson = JsonProtocol.sparkEventToJsonString(metadata, jsonProtocolOptions) logWriter.writeEvent(eventJson, flushLogger = true) if (testing && loggedEvents != null) { loggedEvents += eventJson @@ -93,7 +95,7 @@ private[spark] class EventLoggingListener( /** Log the event as JSON. */ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = { - val eventJson = JsonProtocol.sparkEventToJsonString(event) + val eventJson = JsonProtocol.sparkEventToJsonString(event, jsonProtocolOptions) logWriter.writeEvent(eventJson, flushLogger) if (testing) { loggedEvents += eventJson diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 19cefbc0479a9..e30380f41566a 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -28,6 +28,7 @@ import org.json4s.jackson.JsonMethods.compact import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest} @@ -37,6 +38,16 @@ import org.apache.spark.storage._ import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils.weakIntern +/** + * Helper class for passing configuration options to JsonProtocol. + * We use this instead of passing SparkConf directly because it lets us avoid + * repeated re-parsing of configuration values on each read. + */ +private[spark] class JsonProtocolOptions(conf: SparkConf) { + val includeTaskMetricsAccumulators: Boolean = + conf.get(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS) +} + /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output @@ -55,30 +66,41 @@ import org.apache.spark.util.Utils.weakIntern private[spark] object JsonProtocol extends JsonUtils { // TODO: Remove this file and put JSON serialization into each individual class. + private[util] + val defaultOptions: JsonProtocolOptions = new JsonProtocolOptions(new SparkConf(false)) + /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | * -------------------------------------------------- */ + // Only for use in tests. Production code should use the two-argument overload defined below. def sparkEventToJsonString(event: SparkListenerEvent): String = { + sparkEventToJsonString(event, defaultOptions) + } + + def sparkEventToJsonString(event: SparkListenerEvent, options: JsonProtocolOptions): String = { toJsonString { generator => - writeSparkEventToJson(event, generator) + writeSparkEventToJson(event, generator, options) } } - def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = { + def writeSparkEventToJson( + event: SparkListenerEvent, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => - stageSubmittedToJson(stageSubmitted, g) + stageSubmittedToJson(stageSubmitted, g, options) case stageCompleted: SparkListenerStageCompleted => - stageCompletedToJson(stageCompleted, g) + stageCompletedToJson(stageCompleted, g, options) case taskStart: SparkListenerTaskStart => - taskStartToJson(taskStart, g) + taskStartToJson(taskStart, g, options) case taskGettingResult: SparkListenerTaskGettingResult => - taskGettingResultToJson(taskGettingResult, g) + taskGettingResultToJson(taskGettingResult, g, options) case taskEnd: SparkListenerTaskEnd => - taskEndToJson(taskEnd, g) + taskEndToJson(taskEnd, g, options) case jobStart: SparkListenerJobStart => - jobStartToJson(jobStart, g) + jobStartToJson(jobStart, g, options) case jobEnd: SparkListenerJobEnd => jobEndToJson(jobEnd, g) case environmentUpdate: SparkListenerEnvironmentUpdate => @@ -112,12 +134,15 @@ private[spark] object JsonProtocol extends JsonUtils { } } - def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: JsonGenerator): Unit = { + def stageSubmittedToJson( + stageSubmitted: SparkListenerStageSubmitted, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) g.writeFieldName("Stage Info") // SPARK-42205: don't log accumulables in start events: - stageInfoToJson(stageSubmitted.stageInfo, g, includeAccumulables = false) + stageInfoToJson(stageSubmitted.stageInfo, g, options, includeAccumulables = false) Option(stageSubmitted.properties).foreach { properties => g.writeFieldName("Properties") propertiesToJson(properties, g) @@ -125,38 +150,48 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeEndObject() } - def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: JsonGenerator): Unit = { + def stageCompletedToJson( + stageCompleted: SparkListenerStageCompleted, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) g.writeFieldName("Stage Info") - stageInfoToJson(stageCompleted.stageInfo, g, includeAccumulables = true) + stageInfoToJson(stageCompleted.stageInfo, g, options, includeAccumulables = true) g.writeEndObject() } - def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): Unit = { + def taskStartToJson( + taskStart: SparkListenerTaskStart, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) g.writeNumberField("Stage ID", taskStart.stageId) g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId) g.writeFieldName("Task Info") // SPARK-42205: don't log accumulables in start events: - taskInfoToJson(taskStart.taskInfo, g, includeAccumulables = false) + taskInfoToJson(taskStart.taskInfo, g, options, includeAccumulables = false) g.writeEndObject() } def taskGettingResultToJson( taskGettingResult: SparkListenerTaskGettingResult, - g: JsonGenerator): Unit = { + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { val taskInfo = taskGettingResult.taskInfo g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) g.writeFieldName("Task Info") // SPARK-42205: don't log accumulables in "task getting result" events: - taskInfoToJson(taskInfo, g, includeAccumulables = false) + taskInfoToJson(taskInfo, g, options, includeAccumulables = false) g.writeEndObject() } - def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = { + def taskEndToJson( + taskEnd: SparkListenerTaskEnd, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) g.writeNumberField("Stage ID", taskEnd.stageId) @@ -165,7 +200,7 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeFieldName("Task End Reason") taskEndReasonToJson(taskEnd.reason, g) g.writeFieldName("Task Info") - taskInfoToJson(taskEnd.taskInfo, g, includeAccumulables = true) + taskInfoToJson(taskEnd.taskInfo, g, options, includeAccumulables = true) g.writeFieldName("Task Executor Metrics") executorMetricsToJson(taskEnd.taskExecutorMetrics, g) Option(taskEnd.taskMetrics).foreach { m => @@ -175,7 +210,10 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeEndObject() } - def jobStartToJson(jobStart: SparkListenerJobStart, g: JsonGenerator): Unit = { + def jobStartToJson( + jobStart: SparkListenerJobStart, + g: JsonGenerator, + options: JsonProtocolOptions): Unit = { g.writeStartObject() g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) g.writeNumberField("Job ID", jobStart.jobId) @@ -186,7 +224,7 @@ private[spark] object JsonProtocol extends JsonUtils { // the job was submitted: it is technically possible for a stage to belong to multiple // concurrent jobs, so this situation can arise even without races occurring between // event logging and stage completion. - jobStart.stageInfos.foreach(stageInfoToJson(_, g, includeAccumulables = true)) + jobStart.stageInfos.foreach(stageInfoToJson(_, g, options, includeAccumulables = true)) g.writeEndArray() g.writeArrayFieldStart("Stage IDs") jobStart.stageIds.foreach(g.writeNumber) @@ -386,6 +424,7 @@ private[spark] object JsonProtocol extends JsonUtils { def stageInfoToJson( stageInfo: StageInfo, g: JsonGenerator, + options: JsonProtocolOptions, includeAccumulables: Boolean): Unit = { g.writeStartObject() g.writeNumberField("Stage ID", stageInfo.stageId) @@ -404,7 +443,10 @@ private[spark] object JsonProtocol extends JsonUtils { stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _)) g.writeFieldName("Accumulables") if (includeAccumulables) { - accumulablesToJson(stageInfo.accumulables.values, g) + accumulablesToJson( + stageInfo.accumulables.values, + g, + includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators) } else { g.writeStartArray() g.writeEndArray() @@ -418,6 +460,7 @@ private[spark] object JsonProtocol extends JsonUtils { def taskInfoToJson( taskInfo: TaskInfo, g: JsonGenerator, + options: JsonProtocolOptions, includeAccumulables: Boolean): Unit = { g.writeStartObject() g.writeNumberField("Task ID", taskInfo.taskId) @@ -435,7 +478,10 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeBooleanField("Killed", taskInfo.killed) g.writeFieldName("Accumulables") if (includeAccumulables) { - accumulablesToJson(taskInfo.accumulables, g) + accumulablesToJson( + taskInfo.accumulables, + g, + includeTaskMetricsAccumulators = options.includeTaskMetricsAccumulators) } else { g.writeStartArray() g.writeEndArray() @@ -443,13 +489,23 @@ private[spark] object JsonProtocol extends JsonUtils { g.writeEndObject() } - private lazy val accumulableExcludeList = Set("internal.metrics.updatedBlockStatuses") + private[util] val accumulableExcludeList = Set(InternalAccumulator.UPDATED_BLOCK_STATUSES) + + private[this] val taskMetricAccumulableNames = TaskMetrics.empty.nameToAccums.keySet.toSet - def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = { + def accumulablesToJson( + accumulables: Iterable[AccumulableInfo], + g: JsonGenerator, + includeTaskMetricsAccumulators: Boolean = true): Unit = { g.writeStartArray() accumulables - .filterNot(_.name.exists(accumulableExcludeList.contains)) - .toList.sortBy(_.id).foreach(a => accumulableInfoToJson(a, g)) + .filterNot { acc => + acc.name.exists(accumulableExcludeList.contains) || + (!includeTaskMetricsAccumulators && acc.name.exists(taskMetricAccumulableNames.contains)) + } + .toList + .sortBy(_.id) + .foreach(a => accumulableInfoToJson(a, g)) g.writeEndArray() } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index cdee6ccda706e..30c9693e6dee3 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.internal.config._ import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope} import org.apache.spark.resource._ @@ -276,7 +277,8 @@ class JsonProtocolSuite extends SparkFunSuite { test("StageInfo backward compatibility (details, accumulables)") { val info = makeStageInfo(1, 2, 3, 4L, 5L) - val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _, includeAccumulables = true)) + val newJson = toJsonString( + JsonProtocol.stageInfoToJson(info, _, defaultOptions, includeAccumulables = true)) // Fields added after 1.0.0. assert(info.details.nonEmpty) @@ -294,7 +296,8 @@ class JsonProtocolSuite extends SparkFunSuite { test("StageInfo resourceProfileId") { val info = makeStageInfo(1, 2, 3, 4L, 5L, 5) - val json = toJsonString(JsonProtocol.stageInfoToJson(info, _, includeAccumulables = true)) + val json = toJsonString( + JsonProtocol.stageInfoToJson(info, _, defaultOptions, includeAccumulables = true)) // Fields added after 1.0.0. assert(info.details.nonEmpty) @@ -471,7 +474,7 @@ class JsonProtocolSuite extends SparkFunSuite { stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)) val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties) - val oldEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _)).removeField("Stage Infos") + val oldEvent = sparkEventToJsonString(jobStart).removeField("Stage Infos") val expectedJobStart = SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties) assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent)) @@ -483,8 +486,7 @@ class JsonProtocolSuite extends SparkFunSuite { val stageIds = Seq[Int](1, 2, 3, 4) val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40L, x * 50L)) val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties) - val oldStartEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _)) - .removeField("Submission Time") + val oldStartEvent = sparkEventToJsonString(jobStart).removeField("Submission Time") val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties) assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent)) @@ -519,8 +521,9 @@ class JsonProtocolSuite extends SparkFunSuite { val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) val oldStageInfo = - toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _, includeAccumulables = true)) - .removeField("Parent IDs") + toJsonString( + JsonProtocol.stageInfoToJson(stageInfo, _, defaultOptions, includeAccumulables = true) + ).removeField("Parent IDs") val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details", resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo)) @@ -785,6 +788,87 @@ class JsonProtocolSuite extends SparkFunSuite { assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected) } + test("SPARK-42204: spark.eventLog.includeTaskMetricsAccumulators config") { + val includeConf = new JsonProtocolOptions( + new SparkConf().set(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS, true)) + val excludeConf = new JsonProtocolOptions( + new SparkConf().set(EVENT_LOG_INCLUDE_TASK_METRICS_ACCUMULATORS, false)) + + val taskMetricsAccumulables = TaskMetrics + .empty + .nameToAccums + .view + .filterKeys(!JsonProtocol.accumulableExcludeList.contains(_)) + .values + .map(_.toInfo(Some(1), None)) + .toSeq + + val taskInfoWithTaskMetricsAccums = makeTaskInfo(222L, 333, 1, 333, 444L, false) + taskInfoWithTaskMetricsAccums.setAccumulables(taskMetricsAccumulables) + val taskInfoWithoutTaskMetricsAccums = makeTaskInfo(222L, 333, 1, 333, 444L, false) + taskInfoWithoutTaskMetricsAccums.setAccumulables(Seq.empty) + + val stageInfoWithTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 500L) + stageInfoWithTaskMetricsAccums.accumulables.clear() + stageInfoWithTaskMetricsAccums.accumulables ++= taskMetricsAccumulables.map(x => (x.id, x)) + val stageInfoWithoutTaskMetricsAccums = makeStageInfo(100, 200, 300, 400L, 500L) + stageInfoWithoutTaskMetricsAccums.accumulables.clear() + + // Test events which should be impacted by the config. + + // TaskEnd + { + val originalEvent = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success, + taskInfoWithTaskMetricsAccums, + new ExecutorMetrics(Array(12L, 23L, 45L, 67L, 78L, 89L, + 90L, 123L, 456L, 789L, 40L, 20L, 20L, 10L, 20L, 10L, 301L)), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, 0, + hasHadoopInput = false, hasOutput = false)) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(taskInfo = taskInfoWithoutTaskMetricsAccums) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // StageCompleted + { + val originalEvent = SparkListenerStageCompleted(stageInfoWithTaskMetricsAccums) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(stageInfo = stageInfoWithoutTaskMetricsAccums) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // JobStart + { + val originalEvent = + SparkListenerJobStart(1, 1, Seq(stageInfoWithTaskMetricsAccums), properties) + assertEquals( + originalEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, includeConf))) + val trimmedEvent = originalEvent.copy(stageInfos = Seq(stageInfoWithoutTaskMetricsAccums)) + assertEquals( + trimmedEvent, + sparkEventFromJson(sparkEventToJsonString(originalEvent, excludeConf))) + } + + // ExecutorMetricsUpdate events should be unaffected by the config: + val executorMetricsUpdate = + SparkListenerExecutorMetricsUpdate("0", Seq((0, 0, 0, taskMetricsAccumulables))) + assert( + sparkEventToJsonString(executorMetricsUpdate, includeConf) === + sparkEventToJsonString(executorMetricsUpdate, excludeConf)) + assertEquals( + JsonProtocol.sparkEventFromJson(sparkEventToJsonString(executorMetricsUpdate, includeConf)), + executorMetricsUpdate) + } + test("SPARK-42403: properly handle null string values") { // Null string values can appear in a few different event types, // so we test multiple known cases here: @@ -966,7 +1050,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def testStageInfo(info: StageInfo): Unit = { val newInfo = JsonProtocol.stageInfoFromJson( - toJsonString(JsonProtocol.stageInfoToJson(info, _, includeAccumulables = true))) + toJsonString( + JsonProtocol.stageInfoToJson(info, _, defaultOptions, includeAccumulables = true))) assertEquals(info, newInfo) } @@ -990,7 +1075,8 @@ private[spark] object JsonProtocolSuite extends Assertions { private def testTaskInfo(info: TaskInfo): Unit = { val newInfo = JsonProtocol.taskInfoFromJson( - toJsonString(JsonProtocol.taskInfoToJson(info, _, includeAccumulables = true))) + toJsonString( + JsonProtocol.taskInfoToJson(info, _, defaultOptions, includeAccumulables = true))) assertEquals(info, newInfo) }