Skip to content

Commit

Permalink
Simply check the count via providing schema instead of checking excep…
Browse files Browse the repository at this point in the history
…tion message on schema inference
  • Loading branch information
HeartSaVioR committed Nov 18, 2020
1 parent 71456f2 commit d216fcb
Showing 1 changed file with 38 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import java.time.format.DateTimeFormatter

import scala.util.Random

import org.apache.spark.sql.{AnalysisException, DataFrameReader, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.{stringToFile, DateTimeUtils}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}

class PathFilterSuite extends QueryTest with SharedSparkSession {
import testImplicits._
Expand All @@ -34,17 +35,15 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
" and sharing same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formatTime(curTime)),
expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formatTime(curTime)))
}
}

test("SPARK-31962: modifiedAfter specified" +
" and sharing same timestamp with file last modified time.") {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
executeTest(dir, Seq(curTime), modifiedAfter = Some(formatTime(curTime)),
expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedAfter = Some(formatTime(curTime)))
}
}

Expand All @@ -53,8 +52,8 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -64,8 +63,8 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val fileTime = curTime.minusDays(3)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(fileTime), modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(fileTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -74,8 +73,8 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
withTempDir { dir =>
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime),
modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -84,8 +83,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime), modifiedAfter = Some(formattedTime),
expectedCount = Some(1))
executeTest(dir, Seq(curTime), 1, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -94,7 +92,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val futureTime = curTime.plusYears(1)
val formattedTime = formatTime(futureTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime), expectedCount = Some(1))
executeTest(dir, Seq(curTime), 1, modifiedBefore = Some(formattedTime))
}
}

Expand All @@ -103,7 +101,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime), modifiedBefore = Some(formattedTime), expectedCount = None)
executeTest(dir, Seq(curTime), 0, modifiedBefore = Some(formattedTime))
}
}

Expand All @@ -113,8 +111,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime2 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val pastTime = fileTime1.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(fileTime1, fileTime2), modifiedAfter = Some(formattedTime),
expectedCount = Some(1))
executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -123,8 +120,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val curTime = LocalDateTime.now(ZoneOffset.UTC)
val pastTime = curTime.minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(curTime, curTime), modifiedAfter = Some(formattedTime),
expectedCount = Some(2))
executeTest(dir, Seq(curTime, curTime), 2, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -133,8 +129,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val pastTime = LocalDateTime.now(ZoneOffset.UTC).minusYears(1)
val formattedTime = formatTime(pastTime)
executeTest(dir, Seq(fileTime, fileTime), modifiedAfter = Some(formattedTime),
expectedCount = None)
executeTest(dir, Seq(fileTime, fileTime), 0, modifiedAfter = Some(formattedTime))
}
}

Expand All @@ -143,8 +138,7 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val futureTime = LocalDateTime.now(ZoneOffset.UTC).plusYears(1)
val formattedTime = formatTime(futureTime)
executeTest(dir, Seq(fileTime, fileTime), modifiedBefore = Some(formattedTime),
expectedCount = Some(2))
executeTest(dir, Seq(fileTime, fileTime), 2, modifiedBefore = Some(formattedTime))
}
}

Expand All @@ -154,17 +148,15 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
val fileTime1 = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)
val fileTime2 = curTime.plusDays(3)
val formattedTime = formatTime(curTime)
executeTest(dir, Seq(fileTime1, fileTime2), modifiedBefore = Some(formattedTime),
expectedCount = Some(1))
executeTest(dir, Seq(fileTime1, fileTime2), 1, modifiedBefore = Some(formattedTime))
}
}

test("SPARK-31962: modifiedBefore specified with a future date, multiple files, none valid") {
withTempDir { dir =>
val fileTime = LocalDateTime.now(ZoneOffset.UTC).minusDays(1)
val formattedTime = formatTime(fileTime)
executeTest(dir, Seq(fileTime, fileTime), modifiedBefore = Some(formattedTime),
expectedCount = None)
executeTest(dir, Seq(fileTime, fileTime), 0, modifiedBefore = Some(formattedTime))
}
}

Expand Down Expand Up @@ -233,41 +225,35 @@ class PathFilterSuite extends QueryTest with SharedSparkSession {
private def executeTest(
dir: File,
fileDates: Seq[LocalDateTime],
expectedCount: Option[Long],
expectedCount: Long,
modifiedBefore: Option[String] = None,
modifiedAfter: Option[String] = None): Unit = {
fileDates.foreach { fileDate =>
val file = createSingleFile(dir)
setFileTime(fileDate, file)
}

var dfReader = spark.read.format("csv").option("timeZone", "UTC")
val schema = StructType(Seq(StructField("a", StringType)))

var dfReader = spark.read.format("csv").option("timeZone", "UTC").schema(schema)
modifiedBefore.foreach { opt => dfReader = dfReader.option("modifiedBefore", opt) }
modifiedAfter.foreach { opt => dfReader = dfReader.option("modifiedAfter", opt) }

def assertQueryFailure(dfReader: DataFrameReader): Unit = {
val exc = intercept[AnalysisException] {
dfReader.load(dir.getCanonicalPath)
}
assert(exc.getMessage.contains("Unable to infer schema for CSV"))
}

expectedCount match {
case Some(count) =>
// without pathGlobFilter
val df1 = dfReader.load(dir.getCanonicalPath)
assert(df1.count() === count)

// pathGlobFilter matched
val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath)
assert(df2.count() === count)

// pathGlobFilter mismatched
assertQueryFailure(dfReader.option("pathGlobFilter", "*.txt"))

case None =>
// expecting failure
assertQueryFailure(dfReader)
if (expectedCount > 0) {
// without pathGlobFilter
val df1 = dfReader.load(dir.getCanonicalPath)
assert(df1.count() === expectedCount)

// pathGlobFilter matched
val df2 = dfReader.option("pathGlobFilter", "*.csv").load(dir.getCanonicalPath)
assert(df2.count() === expectedCount)

// pathGlobFilter mismatched
val df3 = dfReader.option("pathGlobFilter", "*.txt").load(dir.getCanonicalPath)
assert(df3.count() === 0)
} else {
val df = dfReader.load(dir.getCanonicalPath)
assert(df.count() === 0)
}
}

Expand Down

0 comments on commit d216fcb

Please sign in to comment.