Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV #30518

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ class CSVOptions(
}
val lineSeparatorInWrite: Option[String] = lineSeparator

/**
* The handling method to be used when unescaped quotes are found in the input.
*/
val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
.getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down Expand Up @@ -258,7 +264,7 @@ class CSVOptions(
settings.setNullValue(nullValue)
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
* will handle values with unescaped quotes.
* <ul>
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
* the quote character and proceed parsing the value as a quoted value, until a closing
* quote is found.</li>
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters of the current
* parsed value until the delimiter is found. If no
* delimiter is found in the value, the parser will continue accumulating characters from
* the input until a delimiter or line ending is found.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters until the
* delimiter or a line ending is found in the input.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
* for the given value will be skipped and the value set in nullValue will be produced
* instead.</li>
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
* will be thrown.</li>
* </ul>
* </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes. Note that Spark tries
* to parse only required columns in CSV under column pruning. Therefore, corrupt records
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
c1,c2
"a,""b,c","xyz"
"a,b,c","x""yz"
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ abstract class CSVSuite
private val valueMalformedFile = "test-data/value-malformed.csv"
private val badAfterGoodFile = "test-data/bad_after_good.csv"
private val malformedRowFile = "test-data/malformedRow.csv"
private val unescapedQuotesAndUnescapedDelimiterFile =
"test-data/unescaped-quotes-unescaped-delimiter.csv"

/** Verifies data and schema. */
private def verifyCars(
Expand Down Expand Up @@ -2428,6 +2430,19 @@ abstract class CSVSuite
assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
}
}

test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
"unescapedQuotesAndUnescapedDelimiterFile correctly") {
// Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
// the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
val result = spark.read
.option("inferSchema", "true")
.option("header", "true")
.option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
.csv(testFile(unescapedQuotesAndUnescapedDelimiterFile)).collect()
val exceptResults = Array(Row("""a,""b,c""", "xyz"), Row("""a,b,c""", """x""yz"""))
assert(result.sameElements(exceptResults))
}
}

class CSVv1Suite extends CSVSuite {
Expand Down