Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34562][SQL] Add test and doc for Parquet Bloom filter push down #32123

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion docs/sql-data-sources-load-save-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ To load a CSV file you can use:
The extra options are also used during write operation.
For example, you can control bloom filters and dictionary encodings for ORC data sources.
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
For Parquet, there exists `parquet.enable.dictionary`, too.
For Parquet, there exists `parquet.bloom.filter.enabled` and `parquet.enable.dictionary`, too.
To find more detailed information about the extra ORC/Parquet options,
visit the official Apache ORC/Parquet websites.

ORC data source:

<div class="codetabs">

<div data-lang="scala" markdown="1">
Expand Down Expand Up @@ -146,6 +148,46 @@ OPTIONS (

</div>

Parquet data source:

<div class="codetabs">

<div data-lang="scala" markdown="1">
{% include_example manual_save_options_parquet scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example manual_save_options_parquet java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example manual_save_options_parquet python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example manual_save_options_parquet r/RSparkSQLExample.R %}
</div>

<div data-lang="SQL" markdown="1">

{% highlight sql %}
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
wangyum marked this conversation as resolved.
Show resolved Hide resolved
) USING parquet
OPTIONS (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.enable.dictionary true,
parquet.page.write-checksum.enabled true
)
{% endhighlight %}

</div>

</div>

### Run SQL on files directly

Instead of using read API to load a file into DataFrame and query it, you can also query that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ private static void runBasicDataSourceExample(SparkSession spark) {
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
// $example off:manual_save_options_orc$
// $example on:manual_save_options_parquet$
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
// $example off:manual_save_options_parquet$
// $example on:direct_sql$
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
Expand Down
10 changes: 10 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ def basic_datasource_example(spark):
.save("users_with_options.orc"))
# $example off:manual_save_options_orc$

# $example on:manual_save_options_parquet$
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
# $example off:manual_save_options_parquet$

# $example on:write_sorting_and_bucketing$
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
# $example off:write_sorting_and_bucketing$
Expand Down
5 changes: 5 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
# $example off:manual_save_options_orc$

# $example on:manual_save_options_parquet$
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
# $example off:manual_save_options_parquet$

# $example on:direct_sql$
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_sql$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ object SQLDataSourceExample {
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
// $example off:manual_save_options_orc$
// $example on:manual_save_options_parquet$
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
// $example off:manual_save_options_parquet$

// $example on:direct_sql$
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,34 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
}

test("SPARK-34562: Bloom filter push down") {
withTempPath { dir =>
val path = dir.toURI.toString
wangyum marked this conversation as resolved.
Show resolved Hide resolved
spark.range(100).selectExpr("id * 2 AS id").write
.option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#id", "true")
wangyum marked this conversation as resolved.
Show resolved Hide resolved
.parquet(path)

Seq(true, false).foreach { bloomFilterEnabled =>
withSQLConf(
ParquetInputFormat.DICTIONARY_FILTERING_ENABLED -> "false",
ParquetInputFormat.BLOOM_FILTERING_ENABLED -> bloomFilterEnabled.toString) {
val accu = new NumRowGroupsAcc
sparkContext.register(accu)

val df = spark.read.parquet(path).filter("id = 19")
df.foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
if (bloomFilterEnabled) {
assert(accu.value === 0)
} else {
assert(accu.value > 0)
}

AccumulatorContext.remove(accu.id)
}
}
}
}
}

@ExtendedSQLTest
Expand Down