Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wayneguow committed Jul 26, 2024
1 parent 5ccf9ba commit d503ce5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -111,6 +112,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)
// Don't push any filter which refers to the "virtual" column which cannot present in the input.
// Such filters will be applied later on the upper layer.

if (requiredSchema.length == 1 &&
requiredSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
throw QueryCompilationErrors.queryFromRawFilesIncludeCorruptRecordColumnError()
}

val actualFilters =
filters.filterNot(_.references.contains(parsedOptions.columnNameOfCorruptRecord))

Expand All @@ -120,6 +127,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val actualRequiredSchema = StructType(
requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

val parser = new UnivocityParser(
actualDataSchema,
actualRequiredSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprUtils}
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
Expand Down Expand Up @@ -68,6 +69,12 @@ case class CSVScan(
override def createReaderFactory(): PartitionReaderFactory = {
// Check a field requirement for corrupt records here to throw an exception in a driver side
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

if (readDataSchema.length == 1 &&
readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord) {
throw QueryCompilationErrors.queryFromRawFilesIncludeCorruptRecordColumnError()
}

// Don't push any filter which refers to the "virtual" column which cannot present in the input.
// Such filters will be applied later on the upper layer.
val actualFilters =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,32 @@ abstract class CSVSuite
Row(1, Date.valueOf("1983-08-04"), null) :: Nil)
}

test("SPARK-49016: Queries from raw CSV files are disallowed when the referenced columns only" +
" include the internal corrupt record column") {
val schema = new StructType()
.add("a", IntegerType)
.add("b", DateType)
.add("_corrupt_record", StringType)

// negative cases
checkError(
exception = intercept[AnalysisException] {
spark.read.schema(schema).csv(testFile(valueMalformedFile))
.select("_corrupt_record").collect()
},
errorClass = "_LEGACY_ERROR_TEMP_1285",
parameters = Map.empty
)
// workaround
val df2 = spark.read.schema(schema).csv(testFile(valueMalformedFile)).cache()
assert(df2.filter($"_corrupt_record".isNotNull).count() == 1)
assert(df2.filter($"_corrupt_record".isNull).count() == 1)
checkAnswer(
df2.select("_corrupt_record"),
Row("0,2013-111_11 12:13:14") :: Row(null) :: Nil
)
}

test("SPARK-40468: column pruning with the corrupt record column") {
withTempPath { path =>
Seq("1,a").toDF()
Expand Down

0 comments on commit d503ce5

Please sign in to comment.