Skip to content

Commit

Permalink
[SPARK-18579][SQL] Use ignoreLeadingWhiteSpace and ignoreTrailingWhit…
Browse files Browse the repository at this point in the history
…eSpace options in CSV writing

## What changes were proposed in this pull request?

This PR proposes to support _not_ trimming the white spaces when writing out. These are `false` by default in CSV reading path but these are `true` by default in CSV writing in univocity parser.

Both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace` options are not being used for writing and therefore, we are always trimming the white spaces.

It seems we should provide a way to keep this white spaces easily.

WIth the data below:

```scala
val df = spark.read.csv(Seq("a , b  , c").toDS)
df.show()
```

```
+---+----+---+
|_c0| _c1|_c2|
+---+----+---+
| a | b  |  c|
+---+----+---+
```

**Before**

```scala
df.write.csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```

```
+-----+
|value|
+-----+
|a,b,c|
+-----+
```

It seems this can't be worked around via `quoteAll` too.

```scala
df.write.option("quoteAll", true).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```
```
+-----------+
|      value|
+-----------+
|"a","b","c"|
+-----------+
```

**After**

```scala
df.write.option("ignoreLeadingWhiteSpace", false).option("ignoreTrailingWhiteSpace", false).csv("/tmp/text.csv")
spark.read.text("/tmp/text.csv").show()
```

```
+----------+
|     value|
+----------+
|a , b  , c|
+----------+
```

Note that this case is possible in R

```r
> system("cat text.csv")
f1,f2,f3
a , b  , c
> df <- read.csv(file="text.csv")
> df
  f1   f2 f3
1 a   b    c
> write.csv(df, file="text1.csv", quote=F, row.names=F)
> system("cat text1.csv")
f1,f2,f3
a , b  , c
```

## How was this patch tested?

Unit tests in `CSVSuite` and manual tests for Python.

Author: hyukjinkwon <[email protected]>

Closes #17310 from HyukjinKwon/SPARK-18579.
  • Loading branch information
HyukjinKwon authored and Felix Cheung committed Mar 23, 2017
1 parent 12cd007 commit 07c12c0
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 27 deletions.
28 changes: 18 additions & 10 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
default value, ``false``.
:param inferSchema: infers the input schema automatically from data. It requires one extra
pass over the data. If None is set, it uses the default value, ``false``.
:param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from
values being read should be skipped. If None is set, it
uses the default value, ``false``.
:param ignoreTrailingWhiteSpace: A flag indicating whether or not trailing whitespaces from
values being read should be skipped. If None is set, it
uses the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string. Since 2.0.1, this ``nullValue`` param
applies to all supported types including the string type.
Expand Down Expand Up @@ -706,7 +706,7 @@ def text(self, path, compression=None):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None):
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
Expand All @@ -728,10 +728,10 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
empty string.
:param escape: sets the single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``
:param escapeQuotes: A flag indicating whether values containing quotes should always
:param escapeQuotes: a flag indicating whether values containing quotes should always
be enclosed in quotes. If None is set, it uses the default value
``true``, escaping all values containing a quote character.
:param quoteAll: A flag indicating whether all values should always be enclosed in
:param quoteAll: a flag indicating whether all values should always be enclosed in
quotes. If None is set, it uses the default value ``false``,
only escaping values containing a quote character.
:param header: writes the names of columns as the first line. If None is set, it uses
Expand All @@ -746,13 +746,21 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
:param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
values being written should be skipped. If None is set, it
uses the default value, ``true``.
:param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
values being written should be skipped. If None is set, it
uses the default value, ``true``.
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
dateFormat=dateFormat, timestampFormat=timestampFormat)
dateFormat=dateFormat, timestampFormat=timestampFormat,
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace)
self._jwrite.csv(path)

@since(1.5)
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,12 +597,12 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
default value, ``false``.
:param inferSchema: infers the input schema automatically from data. It requires one extra
pass over the data. If None is set, it uses the default value, ``false``.
:param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values
being read should be skipped. If None is set, it uses
the default value, ``false``.
:param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
values being read should be skipped. If None is set, it
uses the default value, ``false``.
:param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
values being read should be skipped. If None is set, it
uses the default value, ``false``.
:param nullValue: sets the string representation of a null value. If None is set, it uses
the default value, empty string. Since 2.0.1, this ``nullValue`` param
applies to all supported types including the string type.
Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,19 @@ def test_wholefile_csv(self):
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
self.assertEqual(ages_newlines.collect(), expected)

def test_ignorewhitespace_csv(self):
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.spark.createDataFrame([[" a", "b ", " c "]]).write.csv(
tmpPath,
ignoreLeadingWhiteSpace=False,
ignoreTrailingWhiteSpace=False)

expected = [Row(value=u' a,b , c ')]
readback = self.spark.read.text(tmpPath)
self.assertEqual(readback.collect(), expected)
shutil.rmtree(tmpPath)

def test_read_multiple_orc_file(self):
df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0",
"python/test_support/sql/orc_partitioned/b=1/c=1"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`header` (default `false`): uses the first line as names of columns.</li>
* <li>`inferSchema` (default `false`): infers the input schema automatically from data. It
* requires one extra pass over the data.</li>
* <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces
* from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
* <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading
* whitespaces from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): a flag indicating whether or not trailing
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`escapeQuotes` (default `true`): a flag indicating whether values containing
* quotes should always be enclosed in quotes. Default is to escape all values containing
* a quote character.</li>
* <li>`quoteAll` (default `false`): A flag indicating whether all values should always be
* <li>`quoteAll` (default `false`): a flag indicating whether all values should always be
* enclosed in quotes. Default is to only escape values containing a quote character.</li>
* <li>`header` (default `false`): writes the names of columns as the first line.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value.</li>
Expand All @@ -586,6 +586,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`ignoreLeadingWhiteSpace` (default `true`): a flag indicating whether or not leading
* whitespaces from values being written should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not
* trailing whitespaces from values being written should be skipped.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,13 @@ class CSVOptions(

val headerFlag = getBool("header")
val inferSchemaFlag = getBool("inferSchema")
val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace")
val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace")
val ignoreLeadingWhiteSpaceInRead = getBool("ignoreLeadingWhiteSpace", default = false)
val ignoreTrailingWhiteSpaceInRead = getBool("ignoreTrailingWhiteSpace", default = false)

// For write, both options were `true` by default. We leave it as `true` for
// backwards compatibility.
val ignoreLeadingWhiteSpaceFlagInWrite = getBool("ignoreLeadingWhiteSpace", default = true)
val ignoreTrailingWhiteSpaceFlagInWrite = getBool("ignoreTrailingWhiteSpace", default = true)

val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
Expand Down Expand Up @@ -144,6 +149,8 @@ class CSVOptions(
format.setQuote(quote)
format.setQuoteEscape(escape)
format.setComment(comment)
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
writerSettings.setNullValue(nullValue)
writerSettings.setEmptyValue(nullValue)
writerSettings.setSkipEmptyLines(true)
Expand All @@ -159,8 +166,8 @@ class CSVOptions(
format.setQuote(quote)
format.setQuoteEscape(escape)
format.setComment(comment)
settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlag)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlag)
settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
settings.setReadInputOnSeparateThread(false)
settings.setInputBufferSize(inputBufferSize)
settings.setMaxColumns(maxColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`header` (default `false`): uses the first line as names of columns.</li>
* <li>`inferSchema` (default `false`): infers the input schema automatically from data. It
* requires one extra pass over the data.</li>
* <li>`ignoreLeadingWhiteSpace` (default `false`): defines whether or not leading whitespaces
* from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): defines whether or not trailing
* <li>`ignoreLeadingWhiteSpace` (default `false`): a flag indicating whether or not leading
* whitespaces from values being read should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `false`): a flag indicating whether or not trailing
* whitespaces from values being read should be skipped.</li>
* <li>`nullValue` (default empty string): sets the string representation of a null value. Since
* 2.0.1, this applies to all supported types including the string type.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,4 +1117,61 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(df2.schema === schema)
}

test("ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options - read") {
val input = " a,b , c "

// For reading, default of both `ignoreLeadingWhiteSpace` and`ignoreTrailingWhiteSpace`
// are `false`. So, these are excluded.
val combinations = Seq(
(true, true),
(false, true),
(true, false))

// Check if read rows ignore whitespaces as configured.
val expectedRows = Seq(
Row("a", "b", "c"),
Row(" a", "b", " c"),
Row("a", "b ", "c "))

combinations.zip(expectedRows)
.foreach { case ((ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace), expected) =>
val df = spark.read
.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
.csv(Seq(input).toDS())

checkAnswer(df, expected)
}
}

test("SPARK-18579: ignoreLeadingWhiteSpace and ignoreTrailingWhiteSpace options - write") {
val df = Seq((" a", "b ", " c ")).toDF()

// For writing, default of both `ignoreLeadingWhiteSpace` and `ignoreTrailingWhiteSpace`
// are `true`. So, these are excluded.
val combinations = Seq(
(false, false),
(false, true),
(true, false))

// Check if written lines ignore each whitespaces as configured.
val expectedLines = Seq(
" a,b , c ",
" a,b, c",
"a,b ,c ")

combinations.zip(expectedLines)
.foreach { case ((ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace), expected) =>
withTempPath { path =>
df.write
.option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
.option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
.csv(path.getAbsolutePath)

// Read back the written lines.
val readBack = spark.read.text(path.getAbsolutePath)
checkAnswer(readBack, Row(expected))
}
}
}
}

0 comments on commit 07c12c0

Please sign in to comment.