From ce00b6dc54ed681e7172db2856fb444d51d3f75c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 20 Nov 2020 12:05:21 +0900 Subject: [PATCH] Address review comments --- docs/sql-data-sources-generic-options.md | 2 +- .../examples/sql/SQLDataSourceExample.scala | 12 ++++----- python/pyspark/sql/readwriter.py | 25 ++++++------------- .../PartitioningAwareFileIndex.scala | 2 +- .../streaming/FileStreamOptions.scala | 2 +- 5 files changed, 17 insertions(+), 26 deletions(-) diff --git a/docs/sql-data-sources-generic-options.md b/docs/sql-data-sources-generic-options.md index 02d0fdd7dfa66..2e4fc879a435f 100644 --- a/docs/sql-data-sources-generic-options.md +++ b/docs/sql-data-sources-generic-options.md @@ -125,7 +125,7 @@ To load all files recursively, you can use: `modifiedBefore` and `modifiedAfter` are options that can be applied together or separately in order to achieve greater granularity over which files may load during a Spark batch query. -(Structured Streaming file source doesn't support these options.) +(Note that Structured Streaming file sources don't support these options.) * `modifiedBefore`: an optional timestamp to only include files with modification times occurring before the specified time. The provided timestamp diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala index feb7611e1c2bc..90c0eeb5ba888 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala @@ -83,9 +83,9 @@ object SQLDataSourceExample { // $example off:load_with_path_glob_filter$ // $example on:load_with_modified_time_filter$ val beforeFilterDF = spark.read.format("parquet") - // Files modified before 07/01/2020 at 05:30 are allowed - .option("modifiedBefore", "2020-07-01T05:30:00") - .load("examples/src/main/resources/dir1"); + // Files modified before 07/01/2020 at 05:30 are allowed + .option("modifiedBefore", "2020-07-01T05:30:00") + .load("examples/src/main/resources/dir1"); beforeFilterDF.show(); // +-------------+ // | file| @@ -93,9 +93,9 @@ object SQLDataSourceExample { // |file1.parquet| // +-------------+ val afterFilterDF = spark.read.format("parquet") - // Files modified after 06/01/2020 at 05:30 are allowed - .option("modifiedAfter", "2020-06-01T05:30:00") - .load("examples/src/main/resources/dir1"); + // Files modified after 06/01/2020 at 05:30 are allowed + .option("modifiedAfter", "2020-06-01T05:30:00") + .load("examples/src/main/resources/dir1"); afterFilterDF.show(); // +-------------+ // | file| diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 28206da0eca46..bb31e6a3e09f8 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -832,8 +832,7 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar """ if properties is None: properties = dict() - jprop = JavaClass("java.util.Properties", - self._spark._sc._gateway._gateway_client)() + jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) if column is not None: @@ -845,8 +844,7 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar int(numPartitions), jprop)) if predicates is not None: gateway = self._spark._sc._gateway - jpredicates = utils.toJArray( - gateway, gateway.jvm.java.lang.String, predicates) + jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates) return self._df(self._jreader.jdbc(url, table, jpredicates, jprop)) return self._df(self._jreader.jdbc(url, table, jprop)) @@ -859,7 +857,6 @@ class DataFrameWriter(OptionUtils): .. versionadded:: 1.4 """ - def __init__(self, df): self._df = df self._spark = df.sql_ctx @@ -1001,21 +998,18 @@ def bucketBy(self, numBuckets, col, *cols): ... .saveAsTable('bucketed_table')) """ if not isinstance(numBuckets, int): - raise TypeError( - "numBuckets should be an int, got {0}.".format(type(numBuckets))) + raise TypeError("numBuckets should be an int, got {0}.".format(type(numBuckets))) if isinstance(col, (list, tuple)): if cols: - raise ValueError( - "col is a {0} but cols are not empty".format(type(col))) + raise ValueError("col is a {0} but cols are not empty".format(type(col))) col, cols = col[0], col[1:] if not all(isinstance(c, str) for c in cols) or not(isinstance(col, str)): raise TypeError("all names should be `str`") - self._jwrite = self._jwrite.bucketBy( - numBuckets, col, _to_seq(self._spark._sc, cols)) + self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols)) return self def sortBy(self, col, *cols): @@ -1040,8 +1034,7 @@ def sortBy(self, col, *cols): """ if isinstance(col, (list, tuple)): if cols: - raise ValueError( - "col is a {0} but cols are not empty".format(type(col))) + raise ValueError("col is a {0} but cols are not empty".format(type(col))) col, cols = col[0], col[1:] @@ -1423,8 +1416,7 @@ def jdbc(self, url, table, mode=None, properties=None): """ if properties is None: properties = dict() - jprop = JavaClass("java.util.Properties", - self._spark._sc._gateway._gateway_client)() + jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)() for k in properties: jprop.setProperty(k, properties[k]) self.mode(mode)._jwrite.jdbc(url, table, jprop) @@ -1590,8 +1582,7 @@ def _test(): globs['os'] = os globs['sc'] = sc globs['spark'] = spark - globs['df'] = spark.read.parquet( - 'python/test_support/sql/parquet_partitioned') + globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned') (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 2f85efdc94e89..b805a925d0fbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -57,7 +57,7 @@ abstract class PartitioningAwareFileIndex( protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] private val caseInsensitiveMap = CaseInsensitiveMap(parameters) - protected val pathFilters = PathFilterFactory.create(caseInsensitiveMap) + private val pathFilters = PathFilterFactory.create(caseInsensitiveMap) protected def matchPathPattern(file: FileStatus): Boolean = pathFilters.forall(_.accept(file)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 2f983d5165043..6f43542fd6595 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -38,7 +38,7 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging private def checkDisallowedOptions(options: Map[String, String]): Unit = { Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param => if (parameters.contains(param)) { - throw new IllegalArgumentException(s"option '$param' is not allowed in file stream source") + throw new IllegalArgumentException(s"option '$param' is not allowed in file stream sources") } } }