diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 9fb490dd823ad..ba8fef0b3a8d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{MessageType, MessageTypeParser} -import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils} +import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} @@ -1206,43 +1206,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession errorMessage.contains("is not a valid DFS filename")) } - test("SPARK-7837 Do not close output writer twice when commitTask() fails") { - withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> - classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { - // Using a output committer that always fail when committing a task, so that both - // `commitTask()` and `abortTask()` are invoked. - val extraOptions = Map[String, String]( - SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> - classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName - ) - - // Before fixing SPARK-7837, the following code results in an NPE because both - // `commitTask()` and `abortTask()` try to close output writers. - - withTempPath { dir => - val m1 = intercept[SparkException] { - spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) - } - assert(m1.getErrorClass == "TASK_WRITE_FAILED") - assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes")) - } - - withTempPath { dir => - val m2 = intercept[SparkException] { - val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b")) - .coalesce(1) - df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) - } - if (m2.getErrorClass != null) { - assert(m2.getErrorClass == "TASK_WRITE_FAILED") - assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes")) - } else { - assert(m2.getMessage.contains("TASK_WRITE_FAILED")) - } - } - } - } - test("SPARK-11044 Parquet writer version fixed as version1 ") { withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { @@ -1587,6 +1550,56 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } +// Parquet IO test suite with output commit coordination disabled. +// This test suite is separated ParquetIOSuite to avoid race condition of failure events +// from `OutputCommitCoordination` and `TaskSetManager`. +class ParquetIOWithoutOutputCommitCoordinationSuite + extends QueryTest with ParquetTest with SharedSparkSession { + import testImplicits._ + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.hadoop.outputCommitCoordination.enabled", "false") + } + + test("SPARK-7837 Do not close output writer twice when commitTask() fails") { + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) { + // Using a output committer that always fail when committing a task, so that both + // `commitTask()` and `abortTask()` are invoked. + val extraOptions = Map[String, String]( + SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key -> + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName + ) + + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. + + withTempPath { dir => + val m1 = intercept[SparkException] { + spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath) + } + assert(m1.getErrorClass == "TASK_WRITE_FAILED") + assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes")) + } + + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b")) + .coalesce(1) + df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath) + } + if (m2.getErrorClass != null) { + assert(m2.getErrorClass == "TASK_WRITE_FAILED") + assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes")) + } else { + assert(m2.getMessage.contains("TASK_WRITE_FAILED")) + } + } + } + } +} + class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) {