Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Nov 20, 2020
1 parent d216fcb commit ce00b6d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/sql-data-sources-generic-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ To load all files recursively, you can use:
`modifiedBefore` and `modifiedAfter` are options that can be
applied together or separately in order to achieve greater
granularity over which files may load during a Spark batch query.
(Structured Streaming file source doesn't support these options.)
(Note that Structured Streaming file sources don't support these options.)

* `modifiedBefore`: an optional timestamp to only include files with
modification times occurring before the specified time. The provided timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ object SQLDataSourceExample {
// $example off:load_with_path_glob_filter$
// $example on:load_with_modified_time_filter$
val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
Expand Down
25 changes: 8 additions & 17 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,7 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar
"""
if properties is None:
properties = dict()
jprop = JavaClass("java.util.Properties",
self._spark._sc._gateway._gateway_client)()
jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
if column is not None:
Expand All @@ -845,8 +844,7 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar
int(numPartitions), jprop))
if predicates is not None:
gateway = self._spark._sc._gateway
jpredicates = utils.toJArray(
gateway, gateway.jvm.java.lang.String, predicates)
jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
return self._df(self._jreader.jdbc(url, table, jprop))

Expand All @@ -859,7 +857,6 @@ class DataFrameWriter(OptionUtils):
.. versionadded:: 1.4
"""

def __init__(self, df):
self._df = df
self._spark = df.sql_ctx
Expand Down Expand Up @@ -1001,21 +998,18 @@ def bucketBy(self, numBuckets, col, *cols):
... .saveAsTable('bucketed_table'))
"""
if not isinstance(numBuckets, int):
raise TypeError(
"numBuckets should be an int, got {0}.".format(type(numBuckets)))
raise TypeError("numBuckets should be an int, got {0}.".format(type(numBuckets)))

if isinstance(col, (list, tuple)):
if cols:
raise ValueError(
"col is a {0} but cols are not empty".format(type(col)))
raise ValueError("col is a {0} but cols are not empty".format(type(col)))

col, cols = col[0], col[1:]

if not all(isinstance(c, str) for c in cols) or not(isinstance(col, str)):
raise TypeError("all names should be `str`")

self._jwrite = self._jwrite.bucketBy(
numBuckets, col, _to_seq(self._spark._sc, cols))
self._jwrite = self._jwrite.bucketBy(numBuckets, col, _to_seq(self._spark._sc, cols))
return self

def sortBy(self, col, *cols):
Expand All @@ -1040,8 +1034,7 @@ def sortBy(self, col, *cols):
"""
if isinstance(col, (list, tuple)):
if cols:
raise ValueError(
"col is a {0} but cols are not empty".format(type(col)))
raise ValueError("col is a {0} but cols are not empty".format(type(col)))

col, cols = col[0], col[1:]

Expand Down Expand Up @@ -1423,8 +1416,7 @@ def jdbc(self, url, table, mode=None, properties=None):
"""
if properties is None:
properties = dict()
jprop = JavaClass("java.util.Properties",
self._spark._sc._gateway._gateway_client)()
jprop = JavaClass("java.util.Properties", self._spark._sc._gateway._gateway_client)()
for k in properties:
jprop.setProperty(k, properties[k])
self.mode(mode)._jwrite.jdbc(url, table, jprop)
Expand Down Expand Up @@ -1590,8 +1582,7 @@ def _test():
globs['os'] = os
globs['sc'] = sc
globs['spark'] = spark
globs['df'] = spark.read.parquet(
'python/test_support/sql/parquet_partitioned')
globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned')
(failure_count, test_count) = doctest.testmod(
pyspark.sql.readwriter, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ abstract class PartitioningAwareFileIndex(
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]

private val caseInsensitiveMap = CaseInsensitiveMap(parameters)
protected val pathFilters = PathFilterFactory.create(caseInsensitiveMap)
private val pathFilters = PathFilterFactory.create(caseInsensitiveMap)

protected def matchPathPattern(file: FileStatus): Boolean =
pathFilters.forall(_.accept(file))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
private def checkDisallowedOptions(options: Map[String, String]): Unit = {
Seq(ModifiedBeforeFilter.PARAM_NAME, ModifiedAfterFilter.PARAM_NAME).foreach { param =>
if (parameters.contains(param)) {
throw new IllegalArgumentException(s"option '$param' is not allowed in file stream source")
throw new IllegalArgumentException(s"option '$param' is not allowed in file stream sources")
}
}
}
Expand Down

0 comments on commit ce00b6d

Please sign in to comment.