Skip to content

Commit

Permalink
[SPARK-49444][SQL] Modified UnivocityParser to throw runtime exceptio…
Browse files Browse the repository at this point in the history
…ns caused by ArrayIndexOutOfBounds with more user-oriented messages

### What changes were proposed in this pull request?

I propose to catch and rethrow runtime `ArrayIndexOutOfBounds` exceptions in the `UnivocityParser` class - `parse` method, but with more user-oriented messages. Instead of throwing exceptions in the original format, I propose to inform the users which csv record caused the error.

### Why are the changes needed?

Proper informing of users' errors improves user experience. Instead of throwing `ArrayIndexOutOfBounds` exception without clear reason why it happened, proposed changes throw `SparkRuntimeException` with the message that includes original csv line which caused the error.

### Does this PR introduce _any_ user-facing change?

This PR introduces a user-facing change which happens when `UnivocityParser` parses malformed csv line with from the input. More specifically, the change is reproduces in the test case within `UnivocityParserSuite` when user specifies `maxColumns` in parser options and parsed csv record has more columns. Instead of resulting in `ArrayIndexOutOfBounds` like mentioned in the HMR ticket, users now get `SparkRuntimeException` with message that contains the input line which caused the error.

### How was this patch tested?

This patch was tested in `UnivocityParserSuite`. Test named "Array index out of bounds when parsing CSV with more columns than expected" covers this patch. Additionally, test for bad records in `UnivocityParser`'s `PERMISSIVE` mode is added to confirm that `BadRecordException` is being thrown properly.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #47906 from vladanvasi-db/vladanvasi-db/univocity-parser-index-out-of-bounds-handling.

Authored-by: Vladan Vasić <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
vladanvasi-db authored and cloud-fan committed Sep 26, 2024
1 parent 87b5ffb commit 624eda5
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.io.InputStream

import scala.util.control.NonFatal

import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.CsvParser

import org.apache.spark.SparkUpgradeException
import org.apache.spark.{SparkRuntimeException, SparkUpgradeException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
Expand Down Expand Up @@ -294,6 +295,20 @@ class UnivocityParser(
}
}

private def parseLine(line: String): Array[String] = {
try {
tokenizer.parseLine(line)
}
catch {
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
throw new SparkRuntimeException(
errorClass = "MALFORMED_CSV_RECORD",
messageParameters = Map("badRecord" -> line),
cause = e
)
}
}

/**
* Parses a single CSV string and turns it into either one resulting row or no row (if the
* the record is malformed).
Expand All @@ -306,7 +321,7 @@ class UnivocityParser(
(_: String) => Some(InternalRow.empty)
} else {
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
(input: String) => convert(tokenizer.parseLine(input))
(input: String) => convert(parseLine(input))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import java.util.{Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkRuntimeException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -323,6 +323,41 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
parameters = Map("fieldName" -> "`i`", "fields" -> ""))
}

test("Bad records test in permissive mode") {
def checkBadRecord(
input: String = "1,a",
dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING, d DOUBLE"),
requiredSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"),
options: Map[String, String] = Map("mode" -> "PERMISSIVE")): BadRecordException = {
val csvOptions = new CSVOptions(options, false, "UTC")
val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions, Seq())
intercept[BadRecordException] {
parser.parse(input)
}
}

// Bad record exception caused by conversion error
checkBadRecord(input = "1.5,a,10.3")

// Bad record exception caused by insufficient number of columns
checkBadRecord(input = "2")
}

test("Array index out of bounds when parsing CSV with more columns than expected") {
val input = "1,string,3.14,5,7"
val dataSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
val requiredSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
val options = new CSVOptions(Map("maxColumns" -> "2"), false, "UTC")
val filters = Seq()
val parser = new UnivocityParser(dataSchema, requiredSchema, options, filters)
checkError(
exception = intercept[SparkRuntimeException] {
parser.parse(input)
},
condition = "MALFORMED_CSV_RECORD",
parameters = Map("badRecord" -> "1,string,3.14,5,7"))
}

test("SPARK-30960: parse date/timestamp string with legacy format") {
def check(parser: UnivocityParser): Unit = {
// The legacy format allows 1 or 2 chars for some fields.
Expand Down
1 change: 1 addition & 0 deletions sql/core/src/test/resources/test-data/more-columns.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,3.14,string,5,7
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.spark.{SparkException, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.{SparkException, SparkRuntimeException,
SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -234,7 +235,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
val schema = new StructType().add("str", StringType)
val options = Map("maxCharsPerColumn" -> "2")

val exception = intercept[SparkException] {
val exception = intercept[SparkRuntimeException] {
df.select(from_csv($"value", schema, options)).collect()
}.getCause.getMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ abstract class CSVSuite
private val badAfterGoodFile = "test-data/bad_after_good.csv"
private val malformedRowFile = "test-data/malformedRow.csv"
private val charFile = "test-data/char.csv"
private val moreColumnsFile = "test-data/more-columns.csv"

/** Verifies data and schema. */
private def verifyCars(
Expand Down Expand Up @@ -3439,6 +3440,39 @@ abstract class CSVSuite
expected)
}
}

test("SPARK-49444: CSV parsing failure with more than max columns") {
val schema = new StructType()
.add("intColumn", IntegerType, nullable = true)
.add("decimalColumn", DecimalType(10, 2), nullable = true)

val fileReadException = intercept[SparkException] {
spark
.read
.schema(schema)
.option("header", "false")
.option("maxColumns", "2")
.csv(testFile(moreColumnsFile))
.collect()
}

checkErrorMatchPVals(
exception = fileReadException,
condition = "FAILED_READ_FILE.NO_HINT",
parameters = Map("path" -> s".*$moreColumnsFile"))

val malformedCSVException = fileReadException.getCause.asInstanceOf[SparkRuntimeException]

checkError(
exception = malformedCSVException,
condition = "MALFORMED_CSV_RECORD",
parameters = Map("badRecord" -> "1,3.14,string,5,7"),
sqlState = "KD000")

assert(malformedCSVException.getCause.isInstanceOf[TextParsingException])
val textParsingException = malformedCSVException.getCause.asInstanceOf[TextParsingException]
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}
}

class CSVv1Suite extends CSVSuite {
Expand Down

0 comments on commit 624eda5

Please sign in to comment.