-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28089][SQL] File source v2: support reading output of file streaming Sink #24900
Conversation
This PR also resolves several TODO testing items in #24830. |
Test build #106609 has finished for PR 24900 at commit
|
Test build #106610 has finished for PR 24900 at commit
|
retest this please. |
} finally { | ||
q.stop() | ||
} | ||
withTempDir { output => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just removing withSQLConf
spark.read.load(outputDir.getCanonicalPath).as[Int] | ||
} | ||
assertMigrationError(e.getMessage, sparkMetadataDir, legacySparkMetadataDir) | ||
val e = intercept[SparkException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just removing withSQLConf
and one TODO comment.
Test build #106622 has finished for PR 24900 at commit
|
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet") | ||
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet") | ||
|
||
test("partitioned writing and batch reading") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you highlight the difference between this test case and the one in FileStreamSinkV2Suite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For V1 suite, it uses Parquet V1, and it matches HadoopFsRelation
/ FileScanRDD
to check if the plan is as expected. Also, partition pruning is tested.
For V2 suite, it uses Parquet V2, and it matches FileTable
/ BatchScanExec
to check if the plan is as expected. As partition pruning is not supported in V2 yet, we can't test it.
I can abstract the code if the two cases look duplicated to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, do you mean changing the test case name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's abstract the code, this test case is too long to duplicate.
Mute the thread
…On Tue 18 Jun, 2019, 4:55 PM Gengliang Wang ***@***.*** wrote:
***@***.**** commented on this pull request.
------------------------------
In
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
<#24900 (comment)>:
> - try {
- inputData.addData("a")
- q.processAllAvailable()
- checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
- inputData.addData("a") // Dropped
- q.processAllAvailable()
- checkDataset(spark.read.parquet(outputPath).as[String], "a")
-
- inputData.addData("b")
- q.processAllAvailable()
- checkDataset(spark.read.parquet(outputPath).as[String], "a", "b")
- } finally {
- q.stop()
- }
+ withTempDir { output =>
remove withSQLConf
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#24900?email_source=notifications&email_token=AF5YGWWRFIF5VX4C2LGM3STP3DA2ZA5CNFSM4HY4RZ7KYY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOB33FCII#pullrequestreview-251023649>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AF5YGWRGYZOM67PCHCNPTI3P3DA2ZANCNFSM4HY4RZ7A>
.
|
Test build #106658 has finished for PR 24900 at commit
|
retest this please. |
Test build #106678 has finished for PR 24900 at commit
|
retest this please. |
Test build #106683 has finished for PR 24900 at commit
|
thanks, merging to master! |
…eaming Sink ## What changes were proposed in this pull request? File source V1 supports reading output of FileStreamSink as batch. apache#11897 We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use `MetadataLogFileIndex` for listing files; Otherwise, we use `InMemoryFileIndex`. ## How was this patch tested? Unit test Closes apache#24900 from gengliangwang/FileStreamV2. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
File source V1 supports reading output of FileStreamSink as batch. #11897
We should support this in file source V2 as well. When reading with paths, we first check if there is metadata log of FileStreamSink. If yes, we use
MetadataLogFileIndex
for listing files; Otherwise, we useInMemoryFileIndex
.How was this patch tested?
Unit test