Skip to content

Commit

Permalink
error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Pei-Lun Lee committed Apr 27, 2015
1 parent 472870e commit 54c6b15
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
override def setupTask(taskContext: TaskAttemptContext): Unit = {}

override def commitJob(jobContext: JobContext) {
try {
val configuration = ContextUtil.getConfiguration(jobContext)
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
val configuration = ContextUtil.getConfiguration(jobContext)
val fileSystem = outputPath.getFileSystem(configuration)

if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
try {
val outputStatus = fileSystem.getFileStatus(outputPath)
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
Expand All @@ -54,15 +55,19 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
}
}
}
} catch {
case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
}
if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
}

if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
try {
val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fileSystem.create(successPath).close()
} catch {
case e: Exception => LOG.warn("could not write success file for " + outputPath, e)
}
} catch {
case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
}
}

}

0 comments on commit 54c6b15

Please sign in to comment.