Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool - Add option to filter by minimum event log size #1291

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ object EventLogPathProcessor extends Logging {
matchlogs: Option[String],
eventLogsPaths: List[String],
hadoopConf: Configuration,
maxEventLogSize: Option[String] = None): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
maxEventLogSize: Option[String] = None,
minEventLogSize: Option[String] = None): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf)
val logsWithTimestamp = logsPathNoWildCards.flatMap {
case (rawPath, processedPaths) if processedPaths.isEmpty =>
Expand All @@ -252,24 +253,40 @@ object EventLogPathProcessor extends Logging {
}.getOrElse(logsWithTimestamp)

val filteredLogs = if ((filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) ||
maxEventLogSize.isDefined) {
maxEventLogSize.isDefined || minEventLogSize.isDefined) {
val validMatchedLogs = matchedLogs.collect {
case (info, Some(ts)) => info -> ts
}
val filteredBySize = if (maxEventLogSize.isDefined) {
val filteredByMinSize = if (minEventLogSize.isDefined) {
val minSizeInBytes = if (StringUtils.isMemorySize(minEventLogSize.get)) {
// if it is memory return the bytes unit
StringUtils.convertMemorySizeToBytes(minEventLogSize.get)
} else {
// size is assumed to be mb
StringUtils.convertMemorySizeToBytes(minEventLogSize.get + "m")
}
val (matched, filtered) = validMatchedLogs.partition(info => info._2.size >= minSizeInBytes)
logInfo(s"Filtering eventlogs by size, minimum size is ${minSizeInBytes}b. The logs " +
s"filtered out include: ${filtered.keys.map(_.eventLog.toString).mkString(",")}")
matched
} else {
validMatchedLogs
}
val filteredByMaxSize = if (maxEventLogSize.isDefined) {
val maxSizeInBytes = if (StringUtils.isMemorySize(maxEventLogSize.get)) {
// if it is memory return the bytes unit
StringUtils.convertMemorySizeToBytes(maxEventLogSize.get)
} else {
// size is assumed to be mb
StringUtils.convertMemorySizeToBytes(maxEventLogSize.get + "m")
}
val (matched, filtered) = validMatchedLogs.partition(info => info._2.size <= maxSizeInBytes)
val (matched, filtered) =
filteredByMinSize.partition(info => info._2.size <= maxSizeInBytes)
logInfo(s"Filtering eventlogs by size, max size is ${maxSizeInBytes}b. The logs filtered " +
s"out include: ${filtered.keys.map(_.eventLog.toString).mkString(",")}")
matched
} else {
validMatchedLogs
filteredByMinSize
}
if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) {
val filteredInfo = filterNLogs.get.split("-")
Expand All @@ -278,16 +295,16 @@ object EventLogPathProcessor extends Logging {
// Before filtering based on user criteria, remove the failed event logs
// (i.e. logs without timestamp) from the list.
val matched = if (criteria.equals("newest")) {
LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp > _._2.timestamp): _*)
LinkedHashMap(filteredByMaxSize.toSeq.sortWith(_._2.timestamp > _._2.timestamp): _*)
} else if (criteria.equals("oldest")) {
LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp < _._2.timestamp): _*)
LinkedHashMap(filteredByMaxSize.toSeq.sortWith(_._2.timestamp < _._2.timestamp): _*)
} else {
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
}
matched.take(numberofEventLogs)
} else {
filteredBySize
filteredByMaxSize
}
} else {
matchedLogs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
"If no units are specified, the size is assumed to be m. Note, this does not support " +
"event log rolling which puts multiple event logs for the same application into a " +
"single directory.")
val minEventLogSize: ScallopOption[String] =
opt[String](required = false,
descr = "Process only application event logs whose size is greater than or equal to the " +
"size specified. Valid units of size are " +
"b(bytes),k(kilobytes),m(megabytes),g(gigabytes). If no units are specified, the " +
"size is assumed to be m. Note, this does not support event log rolling which puts " +
"multiple event logs for the same application into a single directory.")
val matchEventLogs: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose filenames contain the input string. Filesystem " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object QualificationMain extends Logging {

val eventlogPaths = appArgs.eventlog()
val filterN = appArgs.filterCriteria
val minEventLogSize = appArgs.minEventLogSize.toOption
val maxEventLogSize = appArgs.maxEventLogSize.toOption
val matchEventLogs = appArgs.matchEventLogs
val outputDirectory = appArgs.outputDirectory().stripSuffix("/")
Expand Down Expand Up @@ -87,7 +88,8 @@ object QualificationMain extends Logging {
}

val (eventLogFsFiltered, allEventLogs) = EventLogPathProcessor.processAllPaths(
filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf, maxEventLogSize)
filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf,
maxEventLogSize, minEventLogSize)

val filteredLogs = if (argsContainsAppFilters(appArgs)) {
val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads)
Expand Down Expand Up @@ -122,9 +124,10 @@ object QualificationMain extends Logging {
def argsContainsFSFilters(appArgs: QualificationArgs): Boolean = {
val filterCriteria = appArgs.filterCriteria.toOption
val maxEventLogSize = appArgs.maxEventLogSize.toOption
val minEventLogSize = appArgs.minEventLogSize.toOption
appArgs.matchEventLogs.isSupplied ||
(filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem")) ||
maxEventLogSize.isDefined
maxEventLogSize.isDefined || minEventLogSize.isDefined
}

def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ sparkRapids:
- no-html-report
- m
- match-event-logs
- min-event-log-size
- max-event-log-size
- max-sql-desc-length
- ml-functions
Expand Down