Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1612: Fix potential resource leaks #535

Closed
wants to merge 2 commits into from
Closed

SPARK-1612: Fix potential resource leaks #535

wants to merge 2 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Apr 24, 2014

JIRA: https://issues.apache.org/jira/browse/SPARK-1612

Move the "close" statements into a "finally" block.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented Apr 24, 2014

That's a good change. As an aside, the guts of the first method could be a call to Guava's ByteStreams.copy(). And looking a invocations of this method, they're mostly just copying a stream to a byte[] which Guava also does directly. The other usage also has a direct analog in Guava.

In fact a number of methods in the Utils class are covered in Guava, which does manage to get a lot of the weird details completely right.

Probably a different topic but also a way to avoid this particular issue.

@zsxwing
Copy link
Member Author

zsxwing commented Apr 25, 2014

Vote for using Guava directly.

@ash211
Copy link
Contributor

ash211 commented Apr 27, 2014

+1 to using Guava

pwendell pushed a commit to pwendell/spark that referenced this pull request May 12, 2014
Fixed typo in scaladoc

Author: Stevo Slavić <[email protected]>

== Merge branch commits ==

commit 0a77f789e281930f4168543cc0d3b3ffbf5b3764
Author: Stevo Slavić <[email protected]>
Date:   Tue Feb 4 15:30:27 2014 +0100

    Fixed typo in scaladoc
@mateiz
Copy link
Contributor

mateiz commented Aug 1, 2014

Jenkins, test this please

@mateiz
Copy link
Contributor

mateiz commented Aug 1, 2014

I'm going to merge this as-is because it's a useful fix for 1.1

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA tests have started for PR 535. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17691/consoleFull

@SparkQA
Copy link

SparkQA commented Aug 1, 2014

QA results for PR 535:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17691/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Aug 1, 2014

Thanks, merged this in.

@asfgit asfgit closed this in f5d9bea Aug 1, 2014
@zsxwing zsxwing deleted the SPARK-1612 branch August 2, 2014 14:52
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
JIRA: https://issues.apache.org/jira/browse/SPARK-1612

Move the "close" statements into a "finally" block.

Author: zsxwing <[email protected]>

Closes apache#535 from zsxwing/SPARK-1612 and squashes the following commits:

ae52f50 [zsxwing] Update to follow the code style
549ba13 [zsxwing] SPARK-1612: Fix potential resource leaks
andrewor14 pushed a commit to andrewor14/spark that referenced this pull request Jan 8, 2015
Fixed typo in scaladoc

Author: Stevo Slavić <[email protected]>

== Merge branch commits ==

commit 0a77f789e281930f4168543cc0d3b3ffbf5b3764
Author: Stevo Slavić <[email protected]>
Date:   Tue Feb 4 15:30:27 2014 +0100

    Fixed typo in scaladoc

(cherry picked from commit 0c05cd3)
Signed-off-by: Reynold Xin <[email protected]>
whatlulumomo pushed a commit to whatlulumomo/spark_src that referenced this pull request Jul 9, 2019
…pache#535)

* Propose a new NIO transfer API for partition writing.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.

* Migrate unsafe shuffle writer to use new byte channel API.

* More sane implementation for unsafe

* Fix style

* Address comments

* Fix imports

* Fix build

* Fix more build problems

* Address comments.
squito pushed a commit to squito/spark that referenced this pull request Jul 18, 2019
…pache#535)

* Propose a new NIO transfer API for partition writing.

This solves the consistency and resource leakage concerns with the first iteration of thie API, where it
would not be obvious that the streamable resources created by ShufflePartitionWriter needed to be closed by
ShuffleParittionWriter#close as opposed to closing the resources directly.

This introduces the following adjustments:

- Channel-based writes are separated out to their own module, SupportsTransferTo. This allows the transfer-to
  APIs to be modified independently, and users that only provide output streams can ignore the NIO APIs entirely.
  This also allows us to mark the base ShufflePartitionWriter as a stable API eventually while keeping the NIO
  APIs marked as experimental or developer-api.

- We add APIs that explicitly encodes the notion of transferring bytes from one source to another. The partition
  writer returns an instance of TransferrableWritableByteChannel, which has APIs for accepting a
  TransferrableReadableByteChannel and can tell the readable byte channel to transfer its bytes out to some
  destination sink.

- The resources returned by ShufflePartitionWriter are always closed. Internally, DefaultMapOutputWriter keeps
  resources open until commitAllPartitions() is called.

* Migrate unsafe shuffle writer to use new byte channel API.

* More sane implementation for unsafe

* Fix style

* Address comments

* Fix imports

* Fix build

* Fix more build problems

* Address comments.
arjunshroff pushed a commit to arjunshroff/spark that referenced this pull request Nov 24, 2020
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Dec 16, 2022
…ta type for In predicate (apache#535)

When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

bug fix

no

new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: huaxingao <[email protected]>
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Dec 16, 2022
…ta type for In predicate (apache#535)

### What changes were proposed in this pull request?
When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: huaxingao <[email protected]>
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Apr 7, 2023
…ta type for In predicate (apache#535)

When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

bug fix

no

new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: huaxingao <[email protected]>
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Apr 7, 2023
* [SPARK-39857][SQL] V2ExpressionBuilder uses the wrong LiteralValue data type for In predicate (apache#535)

### What changes were proposed in this pull request?
When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: huaxingao <[email protected]>

* [SPARK-32268][SQL] Row-level Runtime Filtering

* [SPARK-32268][SQL] Row-level Runtime Filtering

This PR proposes row-level runtime filters in Spark to reduce intermediate data volume for operators like shuffle, join and aggregate, and hence improve performance. We propose two mechanisms to do this: semi-join filters or bloom filters, and both mechanisms are proposed to co-exist side-by-side behind feature configs.
[Design Doc](https://docs.google.com/document/d/16IEuyLeQlubQkH8YuVuXWKo2-grVIoDJqQpHZrE7q04/edit?usp=sharing) with more details.

With Semi-Join, we see 9 queries improve for the TPC DS 3TB benchmark, and no regressions.
With Bloom Filter, we see 10 queries improve for the TPC DS 3TB benchmark, and no regressions.

No

Added tests

Closes apache#35789 from somani/rf.

Lead-authored-by: Abhishek Somani <[email protected]>
Co-authored-by: Abhishek Somani <[email protected]>
Co-authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 1f4e4c8)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-32268][TESTS][FOLLOWUP] Fix `BloomFilterAggregateQuerySuite` failed in ansi mode

`Test that might_contain errors out non-constant Bloom filter` in `BloomFilterAggregateQuerySuite ` failed in ansi mode due to `Numeric <=> Binary` is [not allowed in ansi mode](apache#30260),  so the content of  `exception.getMessage` is different from that of non-ans mode.

This pr change the case to ensure that the error messages of `ansi` mode and `non-ansi` are consistent.

Bug fix.

No

- Pass GA
- Local Test

**Before**

```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 23 seconds, 537 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
- Test that might_contain errors out non-constant Bloom filter *** FAILED ***
  "cannot resolve 'CAST(t.a AS BINARY)' due to data type mismatch:
   cannot cast bigint to binary with ANSI mode on.
   If you have to cast bigint to binary, you can set spark.sql.ansi.enabled as false.
  ; line 2 pos 21;
  'Project [unresolvedalias('might_contain(cast(a#2424L as binary), cast(5 as bigint)), None)]
  +- SubqueryAlias t
     +- LocalRelation [a#2424L]
  " did not contain "The Bloom filter binary input to might_contain should be either a constant value or a scalar subquery expression" (BloomFilterAggregateQuerySuite.scala:171)
```

**After**
```
export SPARK_ANSI_SQL_MODE=false
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 26 seconds, 544 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

```

```
export SPARK_ANSI_SQL_MODE=true
mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.sql.BloomFilterAggregateQuerySuite
```

```
Run completed in 25 seconds, 289 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes apache#35953 from LuciferYang/SPARK-32268-FOLLOWUP.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit 7165123)
Signed-off-by: Yuming Wang <[email protected]>

* [SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the InjectRuntimeFilter

Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`.

It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said.

This pr fixes the issue.

No, not released

Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path.

Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c0c52dd)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-32268][SQL][FOLLOWUP] Add ColumnPruning in injectBloomFilter

Add `ColumnPruning` in `InjectRuntimeFilter.injectBloomFilter` to optimize the BoomFilter creation query.

It seems BloomFilter subqueries injected by `InjectRuntimeFilter` will read as many columns as filterCreationSidePlan. This does not match "Only scan the required columns" as the design said. We can check this by a simple case in `InjectRuntimeFilterSuite`:
```scala
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_ENABLED.key -> "true",
  SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "2000") {
  val query = "select * from bf1 join bf2 on bf1.c1 = bf2.c2 where bf2.a2 = 62"
  sql(query).explain()
}
```
The reason is subqueries have not been optimized by `ColumnPruning`, and this pr will fix it.

No, not released

Improve the test by adding `columnPruningTakesEffect` to check the optimizedPlan of bloom filter join.

Closes apache#36047 from Flyangz/SPARK-32268-FOllOWUP.

Authored-by: Yang Liu <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit c98725a)
Signed-off-by: Yuming Wang <[email protected]>

* [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession

This PR proposes:
1. Use the function registry in the Spark Session being used
2. Move function registration into `beforeAll`

Registration of the function without `beforeAll` at `builtin` can affect other tests. See also https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1.

No, test-only.

Unittests fixed.

Closes apache#36576 from HyukjinKwon/SPARK-32268-followup.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit c5351f8)
Signed-off-by: Hyukjin Kwon <[email protected]>

* KE-29673 add segment prune function for bloom runtime filter

fix min/max for UTF8String collection

valid the runtime filter if need when broadcast join is valid

* AL-6084 in Cast for method of canCast, when DecimalType cast to DoubleType add transformable logic (apache#542)

* AL-6084 in Cast for method of canCast, when DecimalType cast DecimalType to DoubleType add suit logical

Signed-off-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Zhixiong Chen <[email protected]>
Co-authored-by: huaxingao <[email protected]>
Co-authored-by: Bowen Song <[email protected]>
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Dec 8, 2023
…ta type for In predicate (apache#535)

When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

bug fix

no

new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: huaxingao <[email protected]>
RolatZhang pushed a commit to RolatZhang/spark that referenced this pull request Dec 8, 2023
…ta type for In predicate (apache#535)

### What changes were proposed in this pull request?
When building V2 `In` Predicate in `V2ExpressionBuilder`, `InSet.dataType` (which is `BooleanType`) is used to build the `LiteralValue`, `InSet.child.dataType` should be used instead.

### Why are the changes needed?
bug fix

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
new test

Closes apache#37271 from huaxingao/inset.

Authored-by: huaxingao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>

Signed-off-by: Dongjoon Hyun <[email protected]>
Co-authored-by: huaxingao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants