Skip to content

Commit

Permalink
abstract test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jun 19, 2019
1 parent e666203 commit 1ed79cf
Showing 1 changed file with 106 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}

protected def checkQueryExecution(df: DataFrame): Unit

test("unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
Expand Down Expand Up @@ -114,6 +116,49 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}

test("partitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null

try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType()
.add(StructField("value", IntegerType, nullable = false))
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)

// Verify the data is correctly read
checkDatasetUnorderly(
outputDf.as[(Int, Int)],
(1000, 1), (2000, 2), (3000, 3))

checkQueryExecution(outputDf)
} finally {
if (query != null) {
query.stop()
}
}
}

test("partitioned writing and batch reading with 'basePath'") {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down Expand Up @@ -431,172 +476,90 @@ abstract class FileStreamSinkSuite extends StreamTest {
}

class FileStreamSinkV1Suite extends FileStreamSinkSuite {
import testImplicits._

override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet")

test("partitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null

try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType()
.add(StructField("value", IntegerType, nullable = false))
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)

// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))

// Verify the data is correctly read
checkDatasetUnorderly(
outputDf.as[(Int, Int)],
(1000, 1), (2000, 2), (3000, 3))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(getFileScanRDD.filePartitions)
override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = df.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(getFileScanRDD.filePartitions)
}

// Read without pruning
checkFileScanPartitions(outputDf) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// Read without pruning
checkFileScanPartitions(df) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}

// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(outputDf.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}
// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(df.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}

// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
} finally {
if (query != null) {
query.stop()
}
// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
}
}

class FileStreamSinkV2Suite extends FileStreamSinkSuite {
import testImplicits._

override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")

test("partitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null

try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType()
.add(StructField("value", IntegerType, nullable = false))
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)

// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val table = outputDf.queryExecution.analyzed.collect {
case DataSourceV2Relation(table: FileTable, _, _) => table
}
assert(table.size === 1)
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
assert(table.head.fileIndex.partitionSchema.exists(_.name == "id"))
assert(table.head.dataSchema.exists(_.name == "value"))

// Verify the data is correctly read
checkDatasetUnorderly(
outputDf.as[(Int, Int)],
(1000, 1), (2000, 2), (3000, 3))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
batch.scan.asInstanceOf[FileScan]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition]))
override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val table = df.queryExecution.analyzed.collect {
case DataSourceV2Relation(table: FileTable, _, _) => table
}
assert(table.size === 1)
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
assert(table.head.fileIndex.partitionSchema.exists(_.name == "id"))
assert(table.head.dataSchema.exists(_.name == "value"))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
batch.scan.asInstanceOf[FileScan]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition]))
}

// Read without pruning
checkFileScanPartitions(outputDf) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// TODO: test partition pruning when file source V2 supports it.
} finally {
if (query != null) {
query.stop()
}
// Read without pruning
checkFileScanPartitions(df) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// TODO: test partition pruning when file source V2 supports it.
}
}

0 comments on commit 1ed79cf

Please sign in to comment.