Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV #30518

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,
unescapedQuoteHandling=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -685,6 +686,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
unescapedQuoteHandling : str, optional
defines how the CsvParser will handle values with unescaped quotes. If None is
set, it uses the default value, ``STOP_AT_DELIMITER``.

* ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
the quote character and proceed parsing the value as a quoted value, until a closing
quote is found.
* ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters of the current
parsed value until the delimiter is found. If no delimiter is found in the value, the
parser will continue accumulating characters from the input until a delimiter or line
ending is found.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters until the
delimiter or a line ending is found in the input.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
for the given value will be skipped and the value set in nullValue will be produced
instead.
* ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
will be thrown.

Examples
--------
Expand All @@ -708,7 +729,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
unescapedQuoteHandling=unescapedQuoteHandling)
if isinstance(path, str):
path = [path]
if type(path) == list:
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/readwriter.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils):
lineSep: Optional[str] = ...,
pathGlobFilter: Optional[Union[bool, str]] = ...,
recursiveFileLookup: Optional[Union[bool, str]] = ...,
unescapedQuoteHandling: Optional[str] = ...,
) -> DataFrame: ...
def orc(
self,
Expand Down
25 changes: 23 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
pathGlobFilter=None, recursiveFileLookup=None):
pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -900,6 +900,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
recursiveFileLookup : str or bool, optional
recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
unescapedQuoteHandling : str, optional
defines how the CsvParser will handle values with unescaped quotes. If None is
set, it uses the default value, ``STOP_AT_DELIMITER``.

* ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
the quote character and proceed parsing the value as a quoted value, until a closing
quote is found.
* ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters of the current
parsed value until the delimiter is found. If no delimiter is found in the value, the
parser will continue accumulating characters from the input until a delimiter or line
ending is found.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
as an unquoted value. This will make the parser accumulate all characters until the
delimiter or a line ending is found in the input.
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
for the given value will be skipped and the value set in nullValue will be produced
instead.
* ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
will be thrown.

.. versionadded:: 2.0.0

Expand All @@ -926,7 +946,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
unescapedQuoteHandling=unescapedQuoteHandling)
if isinstance(path, str):
return self._df(self._jreader.csv(path))
else:
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/sql/streaming.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils):
lineSep: Optional[str] = ...,
pathGlobFilter: Optional[Union[bool, str]] = ...,
recursiveFileLookup: Optional[Union[bool, str]] = ...,
unescapedQuoteHandling: Optional[str] = ...,
) -> DataFrame: ...

class DataStreamWriter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ class CSVOptions(
}
val lineSeparatorInWrite: Option[String] = lineSeparator

/**
* The handling method to be used when unescaped quotes are found in the input.
*/
val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
.getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down Expand Up @@ -258,7 +264,7 @@ class CSVOptions(
settings.setNullValue(nullValue)
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
Expand Down
21 changes: 21 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
* will handle values with unescaped quotes.
* <ul>
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
* the quote character and proceed parsing the value as a quoted value, until a closing
* quote is found.</li>
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters of the current
* parsed value until the delimiter is found. If no
* delimiter is found in the value, the parser will continue accumulating characters from
* the input until a delimiter or line ending is found.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters until the
* delimiter or a line ending is found in the input.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
* for the given value will be skipped and the value set in nullValue will be produced
* instead.</li>
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
* will be thrown.</li>
* </ul>
* </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes. Note that Spark tries
* to parse only required columns in CSV under column pruning. Therefore, corrupt records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
* will handle values with unescaped quotes.
* <ul>
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
* the quote character and proceed parsing the value as a quoted value, until a closing
* quote is found.</li>
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters of the current
* parsed value until the delimiter is found. If no delimiter is found in the value, the
* parser will continue accumulating characters from the input until a delimiter or line
* ending is found.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
* as an unquoted value. This will make the parser accumulate all characters until the
* delimiter or a line ending is found in the input.</li>
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
* for the given value will be skipped and the value set in nullValue will be produced
* instead.</li>
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
* will be thrown.</li>
* </ul>
* </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2428,6 +2428,30 @@ abstract class CSVSuite
assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
}
}

test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
"unescaped quotes and unescaped delimiter data correctly") {
withTempPath { path =>
val dataPath = path.getCanonicalPath
val row1 = Row("""a,""b,c""", "xyz")
val row2 = Row("""a,b,c""", """x""yz""")
// Generate the test data, use `,` as delimiter and `"` as quotes, but they didn't escape.
Seq(
"""c1,c2""",
s""""${row1.getString(0)}","${row1.getString(1)}"""",
s""""${row2.getString(0)}","${row2.getString(1)}"""")
.toDF().repartition(1).write.text(dataPath)
// Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
// the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
val result = spark.read
.option("inferSchema", "true")
.option("header", "true")
.option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
.csv(dataPath).collect()
val exceptResults = Array(row1, row2)
assert(result.sameElements(exceptResults))
}
}
}

class CSVv1Suite extends CSVSuite {
Expand Down