Skip to content

Commit

Permalink
[SPARK-34562][SQL] Add test and doc for Parquet Bloom filter push down
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This pr add test and document for Parquet Bloom filter push down.

### Why are the changes needed?

Improve document.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Generating docs:
![image](https://user-images.githubusercontent.com/5399861/114327472-c131bb80-9b6b-11eb-87a0-6f9a74eb1097.png)

Closes #32123 from wangyum/SPARK-34562.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
wangyum authored and MaxGekk committed Apr 12, 2021
1 parent 1be1012 commit e40fce9
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 2 deletions.
46 changes: 44 additions & 2 deletions docs/sql-data-sources-load-save-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ To load a CSV file you can use:
The extra options are also used during write operation.
For example, you can control bloom filters and dictionary encodings for ORC data sources.
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
For Parquet, there exists `parquet.enable.dictionary`, too.
For Parquet, there exists `parquet.bloom.filter.enabled` and `parquet.enable.dictionary`, too.
To find more detailed information about the extra ORC/Parquet options,
visit the official Apache ORC/Parquet websites.
visit the official Apache [ORC](https://orc.apache.org/docs/spark-config.html) / [Parquet](https://github.com/apache/parquet-mr/tree/master/parquet-hadoop) websites.

ORC data source:

<div class="codetabs">

Expand Down Expand Up @@ -146,6 +148,46 @@ OPTIONS (

</div>

Parquet data source:

<div class="codetabs">

<div data-lang="scala" markdown="1">
{% include_example manual_save_options_parquet scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example manual_save_options_parquet java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example manual_save_options_parquet python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example manual_save_options_parquet r/RSparkSQLExample.R %}
</div>

<div data-lang="SQL" markdown="1">

{% highlight sql %}
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
OPTIONS (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.enable.dictionary true,
parquet.page.write-checksum.enabled true
)
{% endhighlight %}

</div>

</div>

### Run SQL on files directly

Instead of using read API to load a file into DataFrame and query it, you can also query that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ private static void runBasicDataSourceExample(SparkSession spark) {
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
// $example off:manual_save_options_orc$
// $example on:manual_save_options_parquet$
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
// $example off:manual_save_options_parquet$
// $example on:direct_sql$
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
Expand Down
10 changes: 10 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ def basic_datasource_example(spark):
.save("users_with_options.orc"))
# $example off:manual_save_options_orc$

# $example on:manual_save_options_parquet$
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
# $example off:manual_save_options_parquet$

# $example on:write_sorting_and_bucketing$
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
# $example off:write_sorting_and_bucketing$
Expand Down
5 changes: 5 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
# $example off:manual_save_options_orc$

# $example on:manual_save_options_parquet$
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
# $example off:manual_save_options_parquet$

# $example on:direct_sql$
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_sql$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ object SQLDataSourceExample {
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
// $example off:manual_save_options_orc$
// $example on:manual_save_options_parquet$
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
// $example off:manual_save_options_parquet$

// $example on:direct_sql$
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1634,6 +1634,35 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}
}
}

test("SPARK-34562: Bloom filter push down") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark.range(100).selectExpr("id * 2 AS id")
.write
.option(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#id", true)
// Disable dictionary because the distinct values less than 40000.
.option(ParquetOutputFormat.ENABLE_DICTIONARY, false)
.parquet(path)

Seq(true, false).foreach { bloomFilterEnabled =>
withSQLConf(ParquetInputFormat.BLOOM_FILTERING_ENABLED -> bloomFilterEnabled.toString) {
val accu = new NumRowGroupsAcc
sparkContext.register(accu)

val df = spark.read.parquet(path).filter("id = 19")
df.foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
if (bloomFilterEnabled) {
assert(accu.value === 0)
} else {
assert(accu.value > 0)
}

AccumulatorContext.remove(accu.id)
}
}
}
}
}

@ExtendedSQLTest
Expand Down

0 comments on commit e40fce9

Please sign in to comment.