Skip to content

Commit

Permalink
Reduce number of event logs flushes (apache#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
SirOibaf authored and Ackuq committed May 3, 2022
1 parent 816e9de commit 4e74a2b
Showing 1 changed file with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,96 +155,96 @@ private[spark] class EventLoggingListener(
}

// log stage completed event
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onJobStart(event: SparkListenerJobStart): Unit = {
logEvent(event.copy(properties = redactProperties(event.properties)), flushLogger = true)
logEvent(event.copy(properties = redactProperties(event.properties)))
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)

override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
logEvent(event, flushLogger = true)
}
override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorExcluded(event: SparkListenerExecutorExcluded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorBlacklistedForStage(
event: SparkListenerExecutorBlacklistedForStage): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorExcludedForStage(
event: SparkListenerExecutorExcludedForStage): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onNodeBlacklistedForStage(event: SparkListenerNodeBlacklistedForStage): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onNodeExcludedForStage(event: SparkListenerNodeExcludedForStage): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onExecutorUnexcluded(event: SparkListenerExecutorUnexcluded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}


override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onNodeExcluded(event: SparkListenerNodeExcluded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onNodeUnexcluded(event: SparkListenerNodeUnexcluded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
if (shouldLogBlockUpdates) {
logEvent(event, flushLogger = true)
logEvent(event)
}
}

Expand All @@ -269,12 +269,12 @@ private[spark] class EventLoggingListener(
}

override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
logEvent(event, flushLogger = true)
logEvent(event)
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
if (event.logEvent) {
logEvent(event, flushLogger = true)
logEvent(event)
}
}

Expand Down

0 comments on commit 4e74a2b

Please sign in to comment.