Skip to content

Commit

Permalink
[SPARK-49016][SQL] Restore the behavior that queries from raw CSV fil…
Browse files Browse the repository at this point in the history
…es are disallowed when only include corrupt record column and assign name to `_LEGACY_ERROR_TEMP_1285`

### What changes were proposed in this pull request?

From SQL migration guide:https://spark.apache.org/docs/latest/sql-migration-guide.html#upgrading-from-spark-sql-22-to-23
<img width="891" alt="image" src="https://github.com/user-attachments/assets/7745ec28-6484-44b0-ace1-5c21927659f3">

But the behavior related to CSV is inconsistent with the description in the document. After PR #35817 , the related code has been removed.

### Why are the changes needed?

Maintain documentation and code consistency to avoid misunderstandings.

### Does this PR introduce _any_ user-facing change?

Yes, but correct the result and keep the same as docs.

### How was this patch tested?

Pass GA and add a test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47506 from wayneguow/SPARK-49016.

Authored-by: Wei Guo <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
wayneguow authored and MaxGekk committed Aug 19, 2024
1 parent 5b2d214 commit dd259b0
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 14 deletions.
24 changes: 12 additions & 12 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -4824,6 +4824,18 @@
"Python UDF in the ON clause of a <joinType> JOIN. In case of an INNER JOIN consider rewriting to a CROSS JOIN with a WHERE clause."
]
},
"QUERY_ONLY_CORRUPT_RECORD_COLUMN" : {
"message" : [
"Queries from raw JSON/CSV/XML files are disallowed when the",
"referenced columns only include the internal corrupt record column",
"(named _corrupt_record by default). For example:",
"spark.read.schema(schema).json(file).filter($\"_corrupt_record\".isNotNull).count()",
"and spark.read.schema(schema).json(file).select(\"_corrupt_record\").show().",
"Instead, you can cache or save the parsed results and then send the same query.",
"For example, val df = spark.read.schema(schema).json(file).cache() and then",
"df.filter($\"_corrupt_record\".isNotNull).count()."
]
},
"REMOVE_NAMESPACE_COMMENT" : {
"message" : [
"Remove a comment from the namespace <namespace>."
Expand Down Expand Up @@ -6305,18 +6317,6 @@
"It is not allowed to create a persisted view from the Dataset API."
]
},
"_LEGACY_ERROR_TEMP_1285" : {
"message" : [
"Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the",
"referenced columns only include the internal corrupt record column",
"(named _corrupt_record by default). For example:",
"spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()",
"and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().",
"Instead, you can cache or save the parsed results and then send the same query.",
"For example, val df = spark.read.schema(schema).csv(file).cache() and then",
"df.filter($\"_corrupt_record\".isNotNull).count()."
]
},
"_LEGACY_ERROR_TEMP_1286" : {
"message" : [
"User-defined partition column <columnName> not found in the JDBC relation: <schema>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3130,7 +3130,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat

def queryFromRawFilesIncludeCorruptRecordColumnError(): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1285",
errorClass = "UNSUPPORTED_FEATURE.QUERY_ONLY_CORRUPT_RECORD_COLUMN",
messageParameters = Map.empty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -109,6 +110,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {

// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

if (requiredSchema.length == 1 &&
requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
throw QueryCompilationErrors.queryFromRawFilesIncludeCorruptRecordColumnError()
}

// Don't push any filter which refers to the "virtual" column which cannot present in the input.
// Such filters will be applied later on the upper layer.
val actualFilters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprUtils}
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
Expand Down Expand Up @@ -68,6 +69,12 @@ case class CSVScan(
override def createReaderFactory(): PartitionReaderFactory = {
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

if (readDataSchema.length == 1 &&
readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
throw QueryCompilationErrors.queryFromRawFilesIncludeCorruptRecordColumnError()
}

// Don't push any filter which refers to the "virtual" column which cannot present in the input.
// Such filters will be applied later on the upper layer.
val actualFilters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1781,6 +1781,32 @@ abstract class CSVSuite
}
}

test("SPARK-49016: Queries from raw CSV files are disallowed when the referenced columns only" +
" include the internal corrupt record column") {
val schema = new StructType()
.add("a", IntegerType)
.add("b", DateType)
.add("_corrupt_record", StringType)

// negative cases
checkError(
exception = intercept[AnalysisException] {
spark.read.schema(schema).csv(testFile(valueMalformedFile))
.select("_corrupt_record").collect()
},
errorClass = "UNSUPPORTED_FEATURE.QUERY_ONLY_CORRUPT_RECORD_COLUMN",
parameters = Map.empty
)
// workaround
val df2 = spark.read.schema(schema).csv(testFile(valueMalformedFile)).cache()
assert(df2.filter($"_corrupt_record".isNotNull).count() == 1)
assert(df2.filter($"_corrupt_record".isNull).count() == 1)
checkAnswer(
df2.select("_corrupt_record"),
Row("0,2013-111_11 12:13:14") :: Row(null) :: Nil
)
}

test("SPARK-23846: schema inferring touches less data if samplingRatio < 1.0") {
// Set default values for the DataSource parameters to make sure
// that whole test file is mapped to only one partition. This will guarantee
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2207,7 +2207,7 @@ abstract class JsonSuite
exception = intercept[AnalysisException] {
spark.read.schema(schema).json(path).select("_corrupt_record").collect()
},
errorClass = "_LEGACY_ERROR_TEMP_1285",
errorClass = "UNSUPPORTED_FEATURE.QUERY_ONLY_CORRUPT_RECORD_COLUMN",
parameters = Map.empty
)

Expand Down

0 comments on commit dd259b0

Please sign in to comment.