Skip to content

Commit

Permalink
Support wildcards in eventlogs arguments (#465)
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>

Fixes #462

- Added a pre-processing step which checks for wildcards in eventlogs
- For wildcards, the path is flatten and replaced with the files
  applicable to the criteria
  • Loading branch information
amahussein authored Jul 27, 2023
1 parent 2912619 commit e5bfaa7
Showing 1 changed file with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import java.io.FileNotFoundException
import java.time.LocalDateTime
import java.util.zip.ZipOutputStream

import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.{LinkedHashMap, ListBuffer}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, PathFilter}

import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -84,6 +84,37 @@ object EventLogPathProcessor extends Logging {
(dbLogFiles.size > 1)
}

// If the user provides wildcard in the eventlogs, then the path processor needs
// to process the path as a pattern. Otherwise, HDFS throws an exception mistakenly that
// no such file exists.
// Once the glob path is checked, the list of eventlogs is the result of the flattenMap
// of all the processed files.
private def processWildcardsLogs(eventLogsPaths: List[String],
hadoopConf: Configuration): List[String] = {
val processedLogs = ListBuffer[String]()
eventLogsPaths.foreach { rawPath =>
if (!rawPath.contains("*")) {
processedLogs += rawPath
} else {
try {
val globPath = new Path(rawPath)
val fileContext = FileContext.getFileContext(globPath.toUri(), hadoopConf)
val fileStatuses = fileContext.util().globStatus(globPath)
processedLogs ++= fileStatuses.map(_.getPath.toString)
} catch {
case _ : Throwable =>
// Do not fail in this block.
// Instead, ignore the error and add the file as is; then the caller should fail when
// processing the file.
// This will make handling errors more consistent during the processing of the analysis
logWarning(s"Processing pathLog with wildCard has failed: $rawPath")
processedLogs += rawPath
}
}
}
processedLogs.toList
}

def getEventLogInfo(pathString: String, hadoopConf: Configuration): Map[EventLogInfo, Long] = {
val inputPath = new Path(pathString)
try {
Expand Down Expand Up @@ -164,8 +195,8 @@ object EventLogPathProcessor extends Logging {
matchlogs: Option[String],
eventLogsPaths: List[String],
hadoopConf: Configuration): (Seq[EventLogInfo], Seq[EventLogInfo]) = {

val logsWithTimestamp = eventLogsPaths.flatMap(getEventLogInfo(_, hadoopConf)).toMap
val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf)
val logsWithTimestamp = logsPathNoWildCards.flatMap(getEventLogInfo(_, hadoopConf)).toMap

logDebug("Paths after stringToPath: " + logsWithTimestamp)
// Filter the event logs to be processed based on the criteria. If it is not provided in the
Expand Down

0 comments on commit e5bfaa7

Please sign in to comment.