From 11e4ae8ef2f70bc36e6d7203d28515ff9d406220 Mon Sep 17 00:00:00 2001 From: Bowen Song Date: Tue, 16 Aug 2022 17:42:37 +0800 Subject: [PATCH] [SPARK-32268][SQL] Row-level Runtime Filtering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [SPARK-32268][SQL] Row-level Runtime Filtering This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs. [Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details. With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions. With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions. No Added tests Closes #35789 from somani/rf. Lead-authored-by: Abhishek Somani Co-authored-by: Abhishek Somani Co-authored-by: Yuming Wang Signed-off-by: Wenchen Fan (cherry picked from commit 1f4e4c812a9dc6d7e35631c1663c1ba6f6d9b721) Signed-off-by: Wenchen Fan * [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode `Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](https://github.com/apache/spark/pull/30260),  so the content of  `exception.getMessage` is different from that of non-ans mode. This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent. Bug fix. No - Pass GA - Local Test **Before** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 23 seconds, 537 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` - Test that might_contain errors out non-constant Bloom filter *** FAILED ***   "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch:    cannot cast bigint to binary with ANSI mode on.    If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false.   ; line 2 pos 21;   'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)]   +- SubqueryAlias t      +- LocalRelation [a#2424L]   " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171) ``` **After** ``` export SPARK_ANSI_SQL_MODE=false mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 26 seconds, 544 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` ``` export SPARK_ANSI_SQL_MODE=true mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite ``` ``` Run completed in 25 seconds, 289 milliseconds. Total number of tests run: 8 Suites: completed 2, aborted 0 Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` Closes #35953 from LuciferYang/SPARK-32268-FOLLOWUP. Authored-by: yangjie01 Signed-off-by: Yuming Wang (cherry picked from commit 716512364468cef3c12a85403661de2837cc6fe5) Signed-off-by: Yuming Wang * [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`. It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said. This pr fixes the issue. No, not released Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path. Closes #35998 from ulysses-you/SPARK-32268-FOllOWUP. Authored-by: ulysses-you Signed-off-by: Wenchen Fan (cherry picked from commit c0c52dd2eb06e9cd315bc5b9ff95763c4f61ca89) Signed-off-by: Wenchen Fan * [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query. It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`: ```scala withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",   SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",   SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {   val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"   sql(query).explain() } ``` The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it. No, not released Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join. Closes #36047 from Flyangz/SPARK-32268-FOllOWUP. Authored-by: Yang Liu Signed-off-by: Yuming Wang (cherry picked from commit c98725a2b9574ba3c9a10567af740db7467df59d) Signed-off-by: Yuming Wang * [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession This PR proposes: 1. Use the function registry in the Spark Session being used 2. Move function registration into `beforeAll` Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1. No, test-only. Unittests fixed. Closes #36576 from HyukjinKwon/SPARK-32268-followup. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit c5351f85dec628a5c806893aa66777cbd77a4d65) Signed-off-by: Hyukjin Kwon