Skip to content

Commit

Permalink
[SPARK-48260][SQL] Disable output committer coordination in one test …
Browse files Browse the repository at this point in the history
…of ParquetIOSuite

### What changes were proposed in this pull request?

A test from `ParquetIOSuite` is flaky: `SPARK-7837 Do not close output writer twice when commitTask() fails`

It turns out to be a race condition. The test injects error to the task committing step, and the job may fail in two ways:
1. The task got the driver's permission to commit the task, but the committing failed and thus the task failed. This will trigger a stage failure as it means possible data duplication, see #36564
2. In test we disable task retry, so `TaskSetManager` will abort the stage.

Both these two failures are done by sending an event to `DAGScheduler`, so the final job failure depends on which event gets processed first. This is not a big deal, but that test in `ParquetIOSuite` checks the error class. This PR fixes the flaky test by running the test case in a new test suite with output committer coordination disabled

### Why are the changes needed?

fix flaky test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

GA test + manual test on lcoal

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46562 from gengliangwang/fixParquetIO.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
gengliangwang committed May 13, 2024
1 parent a101c48 commit d9ff78e
Showing 1 changed file with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, TestUtils}
import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow}
Expand Down Expand Up @@ -1206,43 +1206,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
errorMessage.contains("is not a valid DFS filename"))
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
if (m2.getErrorClass != null) {
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
} else {
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
}
}
}
}

test("SPARK-11044 Parquet writer version fixed as version1 ") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
Expand Down Expand Up @@ -1587,6 +1550,56 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
}
}

// Parquet IO test suite with output commit coordination disabled.
// This test suite is separated ParquetIOSuite to avoid race condition of failure events
// from `OutputCommitCoordination` and `TaskSetManager`.
class ParquetIOWithoutOutputCommitCoordinationSuite
extends QueryTest with ParquetTest with SharedSparkSession {
import testImplicits._

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.hadoop.outputCommitCoordination.enabled", "false")
}

test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
classOf[SQLHadoopMapReduceCommitProtocol].getCanonicalName) {
// Using a output committer that always fail when committing a task, so that both
// `commitTask()` and `abortTask()` are invoked.
val extraOptions = Map[String, String](
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key ->
classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName
)

// Before fixing SPARK-7837, the following code results in an NPE because both
// `commitTask()` and `abortTask()` try to close output writers.

withTempPath { dir =>
val m1 = intercept[SparkException] {
spark.range(1).coalesce(1).write.options(extraOptions).parquet(dir.getCanonicalPath)
}
assert(m1.getErrorClass == "TASK_WRITE_FAILED")
assert(m1.getCause.getMessage.contains("Intentional exception for testing purposes"))
}

withTempPath { dir =>
val m2 = intercept[SparkException] {
val df = spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b"))
.coalesce(1)
df.write.partitionBy("a").options(extraOptions).parquet(dir.getCanonicalPath)
}
if (m2.getErrorClass != null) {
assert(m2.getErrorClass == "TASK_WRITE_FAILED")
assert(m2.getCause.getMessage.contains("Intentional exception for testing purposes"))
} else {
assert(m2.getMessage.contains("TASK_WRITE_FAILED"))
}
}
}
}
}

class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {

Expand Down

0 comments on commit d9ff78e

Please sign in to comment.