Skip to content

Commit

Permalink
[SPARK-28089][SQL] File source v2: support reading output of file str…
Browse files Browse the repository at this point in the history
…eaming Sink

## What changes were proposed in this pull request?

File source V1 supports reading output of FileStreamSink as batch. apache#11897
We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`.

## How was this patch tested?

Unit test

Closes apache#24900 from gengliangwang/FileStreamV2.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
gengliangwang authored and kiku-jw committed Jun 26, 2019
1 parent e75330b commit 4b7f437
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.{DataType, StructType}
Expand All @@ -44,23 +45,37 @@ abstract class FileTable(
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
// We are reading from the results of a streaming query. We will load files from
// the metadata log instead of listing them using HDFS APIs.
new MetadataLogFileIndex(sparkSession, new Path(paths.head),
options.asScala.toMap, userSpecifiedSchema)
} else {
// This is a non-streaming file based datasource.
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = true)
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, rootPathsSpecified, caseSensitiveMap, userSpecifiedSchema, fileStatusCache)
}
}

lazy val dataSchema: StructType = userSpecifiedSchema.map { schema =>
val partitionSchema = fileIndex.partitionSchema
val resolver = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
}.orElse {
inferSchema(fileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $formatName. It must be specified manually.")
}.asNullable
lazy val dataSchema: StructType = {
val schema = userSpecifiedSchema.map { schema =>
val partitionSchema = fileIndex.partitionSchema
val resolver = sparkSession.sessionState.conf.resolver
StructType(schema.filterNot(f => partitionSchema.exists(p => resolver(p.name, f.name))))
}.orElse {
inferSchema(fileIndex.allFiles())
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $formatName. It must be specified manually.")
}
fileIndex match {
case _: MetadataLogFileIndex => schema
case _ => schema.asNullable
}
}

override lazy val schema: StructType = {
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils

class FileStreamSinkSuite extends StreamTest {
abstract class FileStreamSinkSuite extends StreamTest {
import testImplicits._

override def beforeAll(): Unit = {
Expand All @@ -51,6 +53,8 @@ class FileStreamSinkSuite extends StreamTest {
}
}

protected def checkQueryExecution(df: DataFrame): Unit

test("unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
Expand Down Expand Up @@ -121,78 +125,36 @@ class FileStreamSinkSuite extends StreamTest {

var query: StreamingQuery = null

// TODO: test file source V2 as well.
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)

inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}
try {
query =
ds.map(i => (i, i * 1000))
.toDF("id", "value")
.writeStream
.partitionBy("id")
.option("checkpointLocation", checkpointDir)
.format("parquet")
.start(outputDir)

val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType()
.add(StructField("value", IntegerType, nullable = false))
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)

// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))

// Verify the data is correctly read
checkDatasetUnorderly(
outputDf.as[(Int, Int)],
(1000, 1), (2000, 2), (3000, 3))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(getFileScanRDD.filePartitions)
}
inputData.addData(1, 2, 3)
failAfter(streamingTimeout) {
query.processAllAvailable()
}

// Read without pruning
checkFileScanPartitions(outputDf) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
val outputDf = spark.read.parquet(outputDir)
val expectedSchema = new StructType()
.add(StructField("value", IntegerType, nullable = false))
.add(StructField("id", IntegerType))
assert(outputDf.schema === expectedSchema)

// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(outputDf.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}
// Verify the data is correctly read
checkDatasetUnorderly(
outputDf.as[(Int, Int)],
(1000, 1), (2000, 2), (3000, 3))

// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(outputDf.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
} finally {
if (query != null) {
query.stop()
}
checkQueryExecution(outputDf)
} finally {
if (query != null) {
query.stop()
}
}
}
Expand Down Expand Up @@ -512,3 +474,92 @@ class FileStreamSinkSuite extends StreamTest {
}
}
}

class FileStreamSinkV1Suite extends FileStreamSinkSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet")

override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val hadoopdFsRelations = df.queryExecution.analyzed.collect {
case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation
}
assert(hadoopdFsRelations.size === 1)
assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val getFileScanRDD = df.queryExecution.executedPlan.collect {
case scan: DataSourceScanExec if scan.inputRDDs().head.isInstanceOf[FileScanRDD] =>
scan.inputRDDs().head.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(getFileScanRDD.filePartitions)
}

// Read without pruning
checkFileScanPartitions(df) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}

// Read with pruning, should read only files in partition dir id=1
checkFileScanPartitions(df.filter("id = 1")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
}

// Read with pruning, should read only files in partition dir id=1 and id=2
checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
val filesToBeRead = partitions.flatMap(_.files)
assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
}
}
}

class FileStreamSinkV2Suite extends FileStreamSinkSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")

override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
// been inferred
val table = df.queryExecution.analyzed.collect {
case DataSourceV2Relation(table: FileTable, _, _) => table
}
assert(table.size === 1)
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
assert(table.head.fileIndex.partitionSchema.exists(_.name == "id"))
assert(table.head.dataSchema.exists(_.name == "value"))

/** Check some condition on the partitions of the FileScanRDD generated by a DF */
def checkFileScanPartitions(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
case batch: BatchScanExec if batch.scan.isInstanceOf[FileScan] =>
batch.scan.asInstanceOf[FileScan]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
func(fileScan.planInputPartitions().map(_.asInstanceOf[FilePartition]))
}

// Read without pruning
checkFileScanPartitions(df) { partitions =>
// There should be as many distinct partition values as there are distinct ids
assert(partitions.flatMap(_.files.map(_.partitionValues)).distinct.size === 3)
}
// TODO: test partition pruning when file source V2 supports it.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ class StreamSuite extends StreamTest {
}
}

// TODO: fix file source V2 as well.
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
assertDF(df)
assertDF(df)
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
Seq("", "parquet").foreach { useV1SourceReader =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReader) {
assertDF(df)
assertDF(df)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,32 +197,30 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
}

test("deduplicate with file sink") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") {
withTempDir { output =>
withTempDir { checkpointDir =>
val outputPath = output.getAbsolutePath
val inputData = MemoryStream[String]
val result = inputData.toDS().dropDuplicates()
val q = result.writeStream
.format("parquet")
.outputMode(Append)
.option("checkpointLocation", checkpointDir.getPath)
.start(outputPath)
try {
inputData.addData("a")
q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a")

inputData.addData("a") // Dropped
q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a")

inputData.addData("b")
q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
} finally {
q.stop()
}
withTempDir { output =>
withTempDir { checkpointDir =>
val outputPath = output.getAbsolutePath
val inputData = MemoryStream[String]
val result = inputData.toDS().dropDuplicates()
val q = result.writeStream
.format("parquet")
.outputMode(Append)
.option("checkpointLocation", checkpointDir.getPath)
.start(outputPath)
try {
inputData.addData("a")
q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a")

inputData.addData("a") // Dropped
q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a")

inputData.addData("b")
q.processAllAvailable()
checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
} finally {
q.stop()
}
}
}
Expand Down
Loading

0 comments on commit 4b7f437

Please sign in to comment.