Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge upstream #64

Merged
merged 201 commits into from
Nov 21, 2016
Merged

merge upstream #64

merged 201 commits into from
Nov 21, 2016

Conversation

robert3005
Copy link

merge upstream

fixes #63

zhengruifeng and others added 30 commits November 1, 2016 10:46
## What changes were proposed in this pull request?

1, move cast to `Predictor`
2, and then, remove unnecessary cast
## How was this patch tested?

existing tests

Author: Zheng RuiFeng <[email protected]>

Closes apache#15414 from zhengruifeng/move_cast.
…to `MetadataLogFileIndex`

## What changes were proposed in this pull request?

This is a follow-up to apache#15634.

## How was this patch tested?

N/A

Author: Liwei Lin <[email protected]>

Closes apache#15712 from lw-lin/18103.
## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues like

```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe <[email protected]>

Closes apache#15706 from seyfe/keyvaluegrouped_serialization.
…indow/GroupBy

## What changes were proposed in this pull request?

Aggregation Without Window/GroupBy expressions will fail in `checkAnalysis`, the error message is a bit misleading, we should generate a more specific error message for this case.

For example,

```
spark.read.load("/some-data")
  .withColumn("date_dt", to_date($"date"))
  .withColumn("year", year($"date_dt"))
  .withColumn("week", weekofyear($"date_dt"))
  .withColumn("user_count", count($"userId"))
  .withColumn("daily_max_in_week", max($"user_count").over(weeklyWindow))
)
```

creates the following output:

```
org.apache.spark.sql.AnalysisException: expression '`randomColumn`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
```

In the error message above, `randomColumn` doesn't appear in the query(acturally it's added by function `withColumn`), so the message is not enough for the user to address the problem.
## How was this patch tested?

Manually test

Before:

```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: expression 'tbl.`col`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
```

After:

```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and 'tbl.`col`' is not an aggregate function. Wrap '(count(col#231L) AS count(col)#239L)' in windowing function(s) or wrap 'tbl.`col`' in first() (or first_value) if you don't care which value you get.;;
```

Also add new test sqls in `group-by.sql`.

Author: jiangxingbo <[email protected]>

Closes apache#15672 from jiangxb1987/groupBy-empty.
We now know it's a persistent environmental issue that is causing this test to sometimes fail. One hypothesis is that some configuration is leaked from another suite, and depending on suite ordering this can cause this test to fail.

I am planning on mining the jenkins logs to try to narrow down which suite could be causing this. For now, disable the test.

Author: Eric Liang <[email protected]>

Closes apache#15720 from ericl/disable-flaky-test.
…column to JSON string

## What changes were proposed in this pull request?

This PR proposes to add `to_json` function in contrast with `from_json` in Scala, Java and Python.

It'd be useful if we can convert a same column from/to json. Also, some datasources do not support nested types. If we are forced to save a dataframe into those data sources, we might be able to work around by this function.

The usage is as below:

``` scala
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
df.select(to_json($"a").as("json")).show()
```

``` bash
+--------+
|    json|
+--------+
|{"_1":1}|
+--------+
```
## How was this patch tested?

Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`.

Author: hyukjinkwon <[email protected]>

Closes apache#15354 from HyukjinKwon/SPARK-17764.
…rver

In SPARK-4761 / apache#3621 (December 2014) we enabled Kryo serialization by default in the Spark Thrift Server. However, I don't think that the original rationale for doing this still holds now that most Spark SQL serialization is now performed via encoders and our UnsafeRow format.

In addition, the use of Kryo as the default serializer can introduce performance problems because the creation of new KryoSerializer instances is expensive and we haven't performed instance-reuse optimizations in several code paths (including DirectTaskResult deserialization).

Given all of this, I propose to revert back to using JavaSerializer as the default serializer in the Thrift Server.

/cc liancheng

Author: Josh Rosen <[email protected]>

Closes apache#14906 from JoshRosen/disable-kryo-in-thriftserver.
…ring iterator

The `ReplayListenerBus.read()` method is used when implementing a custom `ApplicationHistoryProvider`. The current interface only exposes a `read()` method which takes an `InputStream` and performs stream-to-lines conversion itself, but it would also be useful to expose an overloaded method which accepts an iterator of strings, thereby enabling events to be provided from non-`InputStream` sources.

Author: Josh Rosen <[email protected]>

Closes apache#15698 from JoshRosen/replay-listener-bus-interface.
## What changes were proposed in this pull request?
- Renamed kbest to numTopFeatures
- Renamed alpha to fpr
- Added missing Since annotations
- Doc cleanups
## How was this patch tested?

Added new standardized unit tests for spark.ml.
Improved existing unit test coverage a bit.

Author: Joseph K. Bradley <[email protected]>

Closes apache#15647 from jkbradley/chisqselector-follow-ups.
## What changes were proposed in this pull request?
This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path.

This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <[email protected]>

Closes apache#15710 from rxin/SPARK-18025.
## What changes were proposed in this pull request?
Column.expr is private[sql], but it's an actually really useful field to have for debugging. We should open it up, similar to how we use QueryExecution.

## How was this patch tested?
N/A - this is a simple visibility change.

Author: Reynold Xin <[email protected]>

Closes apache#15724 from rxin/SPARK-18216.
…tted R friendly message from JVM exception message

## What changes were proposed in this pull request?

This PR proposes to
- improve the R-friendly error messages rather than raw JVM exception one.

  As `read.json`, `read.text`, `read.orc`, `read.parquet` and `read.jdbc` are executed in the same  path with `read.df`, and `write.json`, `write.text`, `write.orc`, `write.parquet` and `write.jdbc` shares the same path with `write.df`, it seems it is safe to call `handledCallJMethod` to handle
  JVM messages.
-  prevent `zero-length variable name` and prints the ignored options as an warning message.

**Before**

``` r
> read.json("path", a = 1, 2, 3, "a")
Error in env[[name]] <- value :
  zero-length variable name
```

``` r
> read.json("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...

> read.orc("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...

> read.text("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...

> read.parquet("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...
```

``` r
> write.json(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)

> write.orc(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)

> write.text(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)

> write.parquet(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)
```

**After**

``` r
read.json("arbitrary_path", a = 1, 2, 3, "a")
Unnamed arguments ignored: 2, 3, a.
```

``` r
> read.json("arbitrary_path")
Error in json : analysis error - Path does not exist: file:/...

> read.orc("arbitrary_path")
Error in orc : analysis error - Path does not exist: file:/...

> read.text("arbitrary_path")
Error in text : analysis error - Path does not exist: file:/...

> read.parquet("arbitrary_path")
Error in parquet : analysis error - Path does not exist: file:/...
```

``` r
> write.json(df, "existing_path")
Error in json : analysis error - path file:/... already exists.;

> write.orc(df, "existing_path")
Error in orc : analysis error - path file:/... already exists.;

> write.text(df, "existing_path")
Error in text : analysis error - path file:/... already exists.;

> write.parquet(df, "existing_path")
Error in parquet : analysis error - path file:/... already exists.;
```
## How was this patch tested?

Unit tests in `test_utils.R` and `test_sparkSQL.R`.

Author: hyukjinkwon <[email protected]>

Closes apache#15608 from HyukjinKwon/SPARK-17838.
…ws a metastore exception when attempting to fetch partitions by filter

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992)
## What changes were proposed in this pull request?

We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`.

I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail.

Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored.
## How was this patch tested?

A unit test was added.

Author: Michael Allman <[email protected]>

Closes apache#15673 from mallman/spark-17992-catch_hive_partition_filter_exception.
…se checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored on non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves hidden HDFS-style checksum (CRC) files in the log directory, one file per batch. This PR modifies HDFSMetadataLog so that it detects the use of a filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem.  Ran the entire regression suite.

Author: frreiss <[email protected]>

Closes apache#15027 from frreiss/fred-17475.
…ITION for Datasource tables

## What changes were proposed in this pull request?

There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive.

(1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition.
(2) INSERT|OVERWRITE does not work with partitions that have custom locations.

This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged.

There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release.

## How was this patch tested?

Unit tests.

Author: Eric Liang <[email protected]>

Closes apache#15705 from ericl/sc-4942.
## What changes were proposed in this pull request?
This patch adds support for all file formats in structured streaming sinks. This is actually a very small change thanks to all the previous refactoring done using the new internal commit protocol API.

## How was this patch tested?
Updated FileStreamSinkSuite to add test cases for json, text, and parquet.

Author: Reynold Xin <[email protected]>

Closes apache#15711 from rxin/SPARK-18192.
## What changes were proposed in this pull request?

The PR fixes the bug that the QueryStartedEvent is not logged

the postToAll() in the original code is actually calling StreamingQueryListenerBus.postToAll() which has no listener at all....we shall post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local listeners as well as the listeners registered in LiveListenerBus

zsxwing
## How was this patch tested?

The following snapshot shows that QueryStartedEvent has been logged correctly

![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)

Author: CodingCat <[email protected]>

Closes apache#15675 from CodingCat/SPARK-18144.
## What changes were proposed in this pull request?

This adds information to the web UI thread dump page about the JVM locks
held by threads and the locks that threads are blocked waiting to
acquire. This should help find cases where lock contention is causing
Spark applications to run slowly.
## How was this patch tested?

Tested by applying this patch and viewing the change in the web UI.

![thread-lock-info](https://cloud.githubusercontent.com/assets/87915/18493057/6e5da870-79c3-11e6-8c20-f54c18a37544.png)

Additions:
- A "Thread Locking" column with the locks held by the thread or that are blocking the thread
- Links from the a blocked thread to the thread holding the lock
- Stack frames show where threads are inside `synchronized` blocks, "holding Monitor(...)"

Author: Ryan Blue <[email protected]>

Closes apache#15088 from rdblue/SPARK-17532-add-thread-lock-info.
## What changes were proposed in this pull request?

If my understanding is correct we should be rather looking at closed disk than the opened one.
## How was this patch tested?

Run simple comparison, of the mean squared error of approaches with closed and opened disk.
https://gist.github.com/mrydzy/1cf0e5c316ef9d6fbd91426b91f1969f
The closed one performed slightly better, but the tested sample wasn't too big, so I rely mostly on the algorithm  understanding.

Author: Maria Rydzy <[email protected]>

Closes apache#15687 from mrydzy/master.
## What changes were proposed in this pull request?

This patch uses `{% highlight lang %}...{% endhighlight %}` to highlight code snippets in the `Structured Streaming Kafka010 integration doc` and the `Spark Streaming Kafka010 integration doc`.

This patch consists of two commits:
- the first commit fixes only the leading spaces -- this is large
- the second commit adds the highlight instructions -- this is much simpler and easier to review

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshots

**Before**

![snip20161101_3](https://cloud.githubusercontent.com/assets/15843379/19894258/47746524-a087-11e6-9a2a-7bff2d428d44.png)

**After**

![snip20161101_1](https://cloud.githubusercontent.com/assets/15843379/19894324/8bebcd1e-a087-11e6-835b-88c4d2979cfa.png)

Author: Liwei Lin <[email protected]>

Closes apache#15715 from lw-lin/doc-highlight-code-snippet.
## What changes were proposed in this pull request?

Removing `appUIAddress` attribute since it is no longer in use.
## How was this patch tested?

Local build

Author: Jacek Laskowski <[email protected]>

Closes apache#15603 from jaceklaskowski/sparkui-fixes.
…rFormat to Locale.US

## What changes were proposed in this pull request?

Fix `Locale.US` for all usages of `DateFormat`, `NumberFormat`
## How was this patch tested?

Existing tests.

Author: Sean Owen <[email protected]>

Closes apache#15610 from srowen/SPARK-18076.
## What changes were proposed in this pull request?

Simplify struct creation, especially the aspect of `CleanupAliases` which missed some aliases when handling trees created by `CreateStruct`.

This PR includes:

1. A failing test (create struct with nested aliases, some of the aliases survive `CleanupAliases`).
2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, effectively eliminating `CreateStruct` from all expression trees.
3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be extracted from unresolved `NamedExpression`.
4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal once the `NamedExpression` is resolved.
5. `CleanupAliases` code was simplified as it no longer has to deal with `CreateStruct`'s top level columns.

## How was this patch tested?
Running all tests-suits in package org.apache.spark.sql, especially including the analysis suite, making sure added test initially fails, after applying suggested fix rerun the entire analysis package successfully.

Modified few tests that expected `CreateStruct` which is now transformed into `CreateNamedStruct`.

Author: eyal farago <eyal farago>
Author: Herman van Hovell <[email protected]>
Author: eyal farago <[email protected]>
Author: Eyal Farago <[email protected]>
Author: Hyukjin Kwon <[email protected]>
Author: eyalfa <[email protected]>

Closes apache#15718 from hvanhovell/SPARK-16839-2.
## What changes were proposed in this pull request?

This pr is to add pattern-matching entries for array data in `Literal.apply`.
## How was this patch tested?

Added tests in `LiteralExpressionSuite`.

Author: Takeshi YAMAMURO <[email protected]>

Closes apache#15257 from maropu/SPARK-17683.
## What changes were proposed in this pull request?

Copied description for row and range based frame boundary from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala#L56

Added examples to show different behavior of rangeBetween and rowsBetween when involving duplicate values.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

Author: buzhihuojie <[email protected]>

Closes apache#15727 from david-weiluo-ren/improveDocForRangeAndRowsBetween.
…ouldn't change after coalesce or union

## What changes were proposed in this pull request?

When a user appended a column using a "nondeterministic" function to a DataFrame, e.g., `rand`, `randn`, and `monotonically_increasing_id`, the expected semantic is the following:
- The value in each row should remain unchanged, as if we materialize the column immediately, regardless of later DataFrame operations.

However, since we use `TaskContext.getPartitionId` to get the partition index from the current thread, the values from nondeterministic columns might change if we call `union` or `coalesce` after. `TaskContext.getPartitionId` returns the partition index of the current Spark task, which might not be the corresponding partition index of the DataFrame where we defined the column.

See the unit tests below or JIRA for examples.

This PR uses the partition index from `RDD.mapPartitionWithIndex` instead of `TaskContext` and fixes the partition initialization logic in whole-stage codegen, normal codegen, and codegen fallback. `initializeStatesForPartition(partitionIndex: Int)` was added to `Projection`, `Nondeterministic`, and `Predicate` (codegen) and initialized right after object creation in `mapPartitionWithIndex`. `newPredicate` now returns a `Predicate` instance rather than a function for proper initialization.
## How was this patch tested?

Unit tests. (Actually I'm not very confident that this PR fixed all issues without introducing new ones ...)

cc: rxin davies

Author: Xiangrui Meng <[email protected]>

Closes apache#15567 from mengxr/SPARK-14393.
…ed to driver in yarn mode

## What changes were proposed in this pull request?

spark.files is still passed to driver in yarn mode, so SparkContext will still handle it which cause the error in the jira desc.

## How was this patch tested?

Tested manually in a 5 node cluster. As this issue only happens in multiple node cluster, so I didn't write test for it.

Author: Jeff Zhang <[email protected]>

Closes apache#15669 from zjffdu/SPARK-18160.
…/test against staging artifacts

## What changes were proposed in this pull request?

Adds a `snapshots-and-staging profile` so that  RCs of projects like Hadoop and HBase can be used in developer-only build and test runs. There's a comment above the profile telling people not to use this in production.

There's no attempt to do the same for SBT, as Ivy is different.
## How was this patch tested?

Tested by building against the Hadoop 2.7.3 RC 1 JARs

without the profile (and without any local copy of the 2.7.3 artifacts), the build failed

```
mvn install -DskipTests -Pyarn,hadoop-2.7,hive -Dhadoop.version=2.7.3

...

[INFO] ------------------------------------------------------------------------
[INFO] Building Spark Project Launcher 2.1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
Downloading: https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.pom
[WARNING] The POM for org.apache.hadoop:hadoop-client:jar:2.7.3 is missing, no dependency information available
Downloading: https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/2.7.3/hadoop-client-2.7.3.jar
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ........................... SUCCESS [  4.482 s]
[INFO] Spark Project Tags ................................. SUCCESS [ 17.402 s]
[INFO] Spark Project Sketch ............................... SUCCESS [ 11.252 s]
[INFO] Spark Project Networking ........................... SUCCESS [ 13.458 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  9.043 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [ 16.027 s]
[INFO] Spark Project Launcher ............................. FAILURE [  1.653 s]
[INFO] Spark Project Core ................................. SKIPPED
...
```

With the profile, the build completed

```
mvn install -DskipTests -Pyarn,hadoop-2.7,hive,snapshots-and-staging -Dhadoop.version=2.7.3
```

Author: Steve Loughran <[email protected]>

Closes apache#14646 from steveloughran/stevel/SPARK-17058-support-asf-snapshots.
## What changes were proposed in this pull request?
RuntimeReplaceable is used to create aliases for expressions, but the way it deals with type coercion is pretty weird (each expression is responsible for how to handle type coercion, which does not obey the normal implicit type cast rules).

This patch simplifies its handling by allowing the analyzer to traverse into the actual expression of a RuntimeReplaceable.

## How was this patch tested?
- Correctness should be guaranteed by existing unit tests already
- Removed SQLCompatibilityFunctionSuite and moved it sql-compatibility-functions.sql
- Added a new test case in sql-compatibility-functions.sql for verifying explain behavior.

Author: Reynold Xin <[email protected]>

Closes apache#15723 from rxin/SPARK-18214.
…or hive serde table

## What changes were proposed in this pull request?

Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties.

This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field.

This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog.

For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm.
For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`.

To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options.
## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes apache#15024 from cloud-fan/path.
HyukjinKwon and others added 23 commits November 18, 2016 21:45
…aAPISuite

## What changes were proposed in this pull request?

This PR fixes the test `wholeTextFiles` in `JavaAPISuite.java`. This is failed due to the different path format on Windows.

For example, the path in `container` was

```
C:\projects\spark\target\tmp\1478967560189-0/part-00000
```

whereas `new URI(res._1()).getPath()` was as below:

```
/C:/projects/spark/target/tmp/1478967560189-0/part-00000
```

## How was this patch tested?

Tests in `JavaAPISuite.java`.

Tested via AppVeyor.

**Before**
Build: https://ci.appveyor.com/project/spark-test/spark/build/63-JavaAPISuite-1
Diff: apache/spark@master...spark-test:JavaAPISuite-1

```
[info] Test org.apache.spark.JavaAPISuite.wholeTextFiles started
[error] Test org.apache.spark.JavaAPISuite.wholeTextFiles failed: java.lang.AssertionError: expected:<spark is easy to use.
[error] > but was:<null>, took 0.578 sec
[error]     at org.apache.spark.JavaAPISuite.wholeTextFiles(JavaAPISuite.java:1089)
...
```

**After**
Build started: [CORE] `org.apache.spark.JavaAPISuite` [![PR-15866](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=198DDA52-F201-4D2B-BE2F-244E0C1725B2&svg=true)](https://ci.appveyor.com/project/spark-test/spark/branch/198DDA52-F201-4D2B-BE2F-244E0C1725B2)
Diff: apache/spark@master...spark-test:198DDA52-F201-4D2B-BE2F-244E0C1725B2

```
[info] Test org.apache.spark.JavaAPISuite.wholeTextFiles started
...
```

Author: hyukjinkwon <[email protected]>

Closes apache#15866 from HyukjinKwon/SPARK-18422.
## What changes were proposed in this pull request?

HDFS `write` may just hang until timeout if some network error happens. It's better to enable interrupts to allow stopping the query fast on HDFS.

This PR just changes the logic to only disable interrupts for local file system, as HADOOP-10622 only happens for local file system.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes apache#15911 from zsxwing/interrupt-on-dfs.
## What changes were proposed in this pull request?
I'm spending more time at the design & code level for cost-based optimizer now, and have found a number of issues related to maintainability and compatibility that I will like to address.

This is a small pull request to clean up AnalyzeColumnCommand:

1. Removed warning on duplicated columns. Warnings in log messages are useless since most users that run SQL don't see them.
2. Removed the nested updateStats function, by just inlining the function.
3. Renamed a few functions to better reflect what they do.
4. Removed the factory apply method for ColumnStatStruct. It is a bad pattern to use a apply method that returns an instantiation of a class that is not of the same type (ColumnStatStruct.apply used to return CreateNamedStruct).
5. Renamed ColumnStatStruct to just AnalyzeColumnCommand.
6. Added more documentation explaining some of the non-obvious return types and code blocks.

In follow-up pull requests, I'd like to address the following:

1. Get rid of the Map[String, ColumnStat] map, since internally we should be using Attribute to reference columns, rather than strings.
2. Decouple the fields exposed by ColumnStat and internals of Spark SQL's execution path. Currently the two are coupled because ColumnStat takes in an InternalRow.
3. Correctness: Remove code path that stores statistics in the catalog using the base64 encoding of the UnsafeRow format, which is not stable across Spark versions.
4. Clearly document the data representation stored in the catalog for statistics.

## How was this patch tested?
Affected test cases have been updated.

Author: Reynold Xin <[email protected]>

Closes apache#15933 from rxin/SPARK-18505.
## What changes were proposed in this pull request?

The issue in ForeachSink is the new created DataSet still uses the old QueryExecution. When `foreachPartition` is called, `QueryExecution.toString` will be called and then fail because it doesn't know how to plan EventTimeWatermark.

This PR just replaces the QueryExecution with IncrementalExecution to fix the issue.

## How was this patch tested?

`test("foreach with watermark")`.

Author: Shixiong Zhu <[email protected]>

Closes apache#15934 from zsxwing/SPARK-18497.
…able like JavaSparkContext

## What changes were proposed in this pull request?

Just adds `close()` + `Closeable` as a synonym for `stop()`. This makes it usable in Java in try-with-resources, as suggested by ash211  (`Closeable` extends `AutoCloseable` BTW)

## How was this patch tested?

Existing tests

Author: Sean Owen <[email protected]>

Closes apache#15932 from srowen/SPARK-18448.
… that`/`'''Note:'''` across Scala/Java API documentation

## What changes were proposed in this pull request?

It seems in Scala/Java,

- `Note:`
- `NOTE:`
- `Note that`
- `'''Note:'''`
- `note`

This PR proposes to fix those to `note` to be consistent.

**Before**

- Scala
  ![2016-11-17 6 16 39](https://cloud.githubusercontent.com/assets/6477701/20383180/1a7aed8c-acf2-11e6-9611-5eaf6d52c2e0.png)

- Java
  ![2016-11-17 6 14 41](https://cloud.githubusercontent.com/assets/6477701/20383096/c8ffc680-acf1-11e6-914a-33460bf1401d.png)

**After**

- Scala
  ![2016-11-17 6 16 44](https://cloud.githubusercontent.com/assets/6477701/20383167/09940490-acf2-11e6-937a-0d5e1dc2cadf.png)

- Java
  ![2016-11-17 6 13 39](https://cloud.githubusercontent.com/assets/6477701/20383132/e7c2a57e-acf1-11e6-9c47-b849674d4d88.png)

## How was this patch tested?

The notes were found via

```bash
grep -r "NOTE: " . | \ # Note:|NOTE:|Note that|'''Note:'''
grep -v "// NOTE: " | \  # starting with // does not appear in API documentation.
grep -E '.scala|.java' | \ # java/scala files
grep -v Suite | \ # exclude tests
grep -v Test | \ # exclude tests
grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
-e 'org.apache.spark.api.java.function' \ # note that this is a regular expression. So actual matches were mostly `org/apache/spark/api/java/functions ...`
-e 'org.apache.spark.api.r' \
...
```

```bash
grep -r "Note that " . | \ # Note:|NOTE:|Note that|'''Note:'''
grep -v "// Note that " | \  # starting with // does not appear in API documentation.
grep -E '.scala|.java' | \ # java/scala files
grep -v Suite | \ # exclude tests
grep -v Test | \ # exclude tests
grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
-e 'org.apache.spark.api.java.function' \
-e 'org.apache.spark.api.r' \
...
```

```bash
grep -r "Note: " . | \ # Note:|NOTE:|Note that|'''Note:'''
grep -v "// Note: " | \  # starting with // does not appear in API documentation.
grep -E '.scala|.java' | \ # java/scala files
grep -v Suite | \ # exclude tests
grep -v Test | \ # exclude tests
grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
-e 'org.apache.spark.api.java.function' \
-e 'org.apache.spark.api.r' \
...
```

```bash
grep -r "'''Note:'''" . | \ # Note:|NOTE:|Note that|'''Note:'''
grep -v "// '''Note:''' " | \  # starting with // does not appear in API documentation.
grep -E '.scala|.java' | \ # java/scala files
grep -v Suite | \ # exclude tests
grep -v Test | \ # exclude tests
grep -e 'org.apache.spark.api.java' \ # packages appear in API documenation
-e 'org.apache.spark.api.java.function' \
-e 'org.apache.spark.api.r' \
...
```

And then fixed one by one comparing with API documentation/access modifiers.

After that, manually tested via `jekyll build`.

Author: hyukjinkwon <[email protected]>

Closes apache#15889 from HyukjinKwon/SPARK-18437.
## What changes were proposed in this pull request?

Avoid hard-coding spark.rpc.askTimeout to non-default in Client; fix doc about spark.rpc.askTimeout default

## How was this patch tested?

Existing tests

Author: Sean Owen <[email protected]>

Closes apache#15833 from srowen/SPARK-18353.
## What changes were proposed in this pull request?

Fix since 2.1.0 on new SparkSession.close() method. I goofed in apache#15932 because it was back-ported to 2.1 instead of just master as originally planned.

Author: Sean Owen <[email protected]>

Closes apache#15938 from srowen/SPARK-18448.2.
Adds --conf option to set spark configuration properties in mesos dispacther.
Properties provided with --conf take precedence over properties within the properties file.
The reason for this PR is that for simple configuration or testing purposes we need to provide a property file (ideally a shared one for a cluster) even if we just provide a single property.

Manually tested.

Author: Stavros Kontopoulos <[email protected]>
Author: Stavros Kontopoulos <[email protected]>

Closes apache#14650 from skonto/dipatcher_conf.
…n LogisticRegression training

## What changes were proposed in this pull request?

This is a follow up to some of the discussion [here](apache#15593). During LogisticRegression training, we store the coefficients combined with intercepts as a flat vector, but a more natural abstraction is a matrix. Here, we refactor the code to use matrix where possible, which makes the code more readable and greatly simplifies the indexing.

Note: We do not use a Breeze matrix for the cost function as was mentioned in the linked PR. This is because LBFGS/OWLQN require an implicit `MutableInnerProductModule[DenseMatrix[Double], Double]` which is not natively defined in Breeze. We would need to extend Breeze in Spark to define it ourselves. Also, we do not modify the `regParamL1Fun` because OWLQN in Breeze requires a `MutableEnumeratedCoordinateField[(Int, Int), DenseVector[Double]]` (since we still use a dense vector for coefficients). Here again we would have to extend Breeze inside Spark.

## How was this patch tested?

This is internal code refactoring - the current unit tests passing show us that the change did not break anything. No added functionality in this patch.

Author: sethah <[email protected]>

Closes apache#15893 from sethah/logreg_refactor.
…ion in RadixSort.java

## What changes were proposed in this pull request?

This PR avoids that a result of an expression is negative due to signed integer overflow (e.g. 0x10?????? * 8 < 0). This PR casts each operand to `long` before executing a calculation. Since the result is interpreted as long, the result of the expression is positive.

## How was this patch tested?

Manually executed query82 of TPC-DS with 100TB

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#15907 from kiszk/SPARK-18458.
## What changes were proposed in this pull request?
The previous documentation and example for DateDiff was wrong.

## How was this patch tested?
Doc only change.

Author: Reynold Xin <[email protected]>

Closes apache#15937 from rxin/datediff-doc.
## What changes were proposed in this pull request?

This PR adds code generation to `Generate`. It supports two code paths:
- General `TraversableOnce` based iteration. This used for regular `Generator` (code generation supporting) expressions. This code path expects the expression to return a `TraversableOnce[InternalRow]` and it will iterate over the returned collection. This PR adds code generation for the `stack` generator.
- Specialized `ArrayData/MapData` based iteration. This is used for the `explode`, `posexplode` & `inline` functions and operates directly on the `ArrayData`/`MapData` result that the child of the generator returns.

### Benchmarks
I have added some benchmarks and it seems we can create a nice speedup for explode:
#### Environment
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
Intel(R) Core(TM) i7-4980HQ CPU  2.80GHz
```
#### Explode Array
##### Before
```
generate explode array:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate explode array wholestage off         7377 / 7607          2.3         439.7       1.0X
generate explode array wholestage on          6055 / 6086          2.8         360.9       1.2X
```
##### After
```
generate explode array:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate explode array wholestage off         7432 / 7696          2.3         443.0       1.0X
generate explode array wholestage on           631 /  646         26.6          37.6      11.8X
```
#### Explode Map
##### Before
```
generate explode map:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate explode map wholestage off         12792 / 12848          1.3         762.5       1.0X
generate explode map wholestage on          11181 / 11237          1.5         666.5       1.1X
```
##### After
```
generate explode map:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate explode map wholestage off         10949 / 10972          1.5         652.6       1.0X
generate explode map wholestage on             870 /  913         19.3          51.9      12.6X
```
#### Posexplode
##### Before
```
generate posexplode array:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate posexplode array wholestage off      7547 / 7580          2.2         449.8       1.0X
generate posexplode array wholestage on       5786 / 5838          2.9         344.9       1.3X
```
##### After
```
generate posexplode array:               Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate posexplode array wholestage off      7535 / 7548          2.2         449.1       1.0X
generate posexplode array wholestage on        620 /  624         27.1          37.0      12.1X
```
#### Inline
##### Before
```
generate inline array:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate inline array wholestage off          6935 / 6978          2.4         413.3       1.0X
generate inline array wholestage on           6360 / 6400          2.6         379.1       1.1X
```
##### After
```
generate inline array:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate inline array wholestage off          6940 / 6966          2.4         413.6       1.0X
generate inline array wholestage on           1002 / 1012         16.7          59.7       6.9X
```
#### Stack
##### Before
```
generate stack:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate stack wholestage off               12980 / 13104          1.3         773.7       1.0X
generate stack wholestage on                11566 / 11580          1.5         689.4       1.1X
```
##### After
```
generate stack:                          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
generate stack wholestage off               12875 / 12949          1.3         767.4       1.0X
generate stack wholestage on                   840 /  845         20.0          50.0      15.3X
```
## How was this patch tested?

Existing tests.

Author: Herman van Hovell <[email protected]>
Author: Herman van Hovell <[email protected]>

Closes apache#13065 from hvanhovell/SPARK-15214.
… tags in javadoc

## What changes were proposed in this pull request?

This PR proposes/fixes two things.

- Remove many errors to generate javadoc with Java8 from unrecognisable tags, `tparam` and `group`.

  ```
  [error] .../spark/mllib/target/java/org/apache/spark/ml/classification/Classifier.java:18: error: unknown tag: group
  [error]   /** group setParam */
  [error]       ^
  [error] .../spark/mllib/target/java/org/apache/spark/ml/classification/Classifier.java:8: error: unknown tag: tparam
  [error]  * tparam FeaturesType  Type of input features.  E.g., <code>Vector</code>
  [error]    ^
  ...
  ```

  It does not fully resolve the problem but remove many errors. It seems both `group` and `tparam` are unrecognisable in javadoc. It seems we can't print them pretty in javadoc in a way of `example` here because they appear differently (both examples can be found in http://spark.apache.org/docs/2.0.2/api/scala/index.html#org.apache.spark.ml.classification.Classifier).

- Print `example` in javadoc.
  Currently, there are few `example` tag in several places.

  ```
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example This operation might be used to evaluate a graph
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example We might use this operation to change the vertex values
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example This function might be used to initialize edge
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example This function might be used to initialize edge
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example This function might be used to initialize edge
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example We can use this function to compute the in-degree of each
  ./graphx/src/main/scala/org/apache/spark/graphx/Graph.scala:   * example This function is used to update the vertices with new values based on external data.
  ./graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala:   * example Loads a file in the following format:
  ./graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala:   * example This function is used to update the vertices with new
  ./graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala:   * example This function can be used to filter the graph based on some property, without
  ./graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala: * example We can use the Pregel abstraction to implement PageRank:
  ./graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala: * example Construct a `VertexRDD` from a plain RDD:
  ./repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkCommandLine.scala: * example new SparkCommandLine(Nil).settings
  ./repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkIMain.scala:   * example addImports("org.apache.spark.SparkContext")
  ./sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralGenerator.scala: * example {{{
  ```

**Before**

  <img width="505" alt="2016-11-20 2 43 23" src="https://cloud.githubusercontent.com/assets/6477701/20457285/26f07e1c-aecb-11e6-9ae9-d9dee66845f4.png">

**After**
  <img width="499" alt="2016-11-20 1 27 17" src="https://cloud.githubusercontent.com/assets/6477701/20457240/409124e4-aeca-11e6-9a91-0ba514148b52.png">

## How was this patch tested?

Maunally tested by `jekyll build` with Java 7 and 8

```
java version "1.7.0_80"
Java(TM) SE Runtime Environment (build 1.7.0_80-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.80-b11, mixed mode)
```

```
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)
```

Note: this does not make sbt unidoc suceed with Java 8 yet but it reduces the number of errors with Java 8.

Author: hyukjinkwon <[email protected]>

Closes apache#15939 from HyukjinKwon/SPARK-3359-javadoc.
…cInvoke, Invoke and NewInstance and modify to short circuit if arguments have null when `needNullCheck == true`.

## What changes were proposed in this pull request?

This pr extracts method for preparing arguments from `StaticInvoke`, `Invoke` and `NewInstance` and modify to short circuit if arguments have `null` when `propageteNull == true`.

The steps are as follows:

1. Introduce `InvokeLike` to extract common logic from `StaticInvoke`, `Invoke` and `NewInstance` to prepare arguments.
`StaticInvoke` and `Invoke` had a risk to exceed 64kb JVM limit to prepare arguments but after this patch they can handle them because they share the preparing code of NewInstance, which handles the limit well.

2. Remove unneeded null checking and fix nullability of `NewInstance`.
Avoid some of nullabilty checking which are not needed because the expression is not nullable.

3. Modify to short circuit if arguments have `null` when `needNullCheck == true`.
If `needNullCheck == true`, preparing arguments can be skipped if we found one of them is `null`, so modified to short circuit in the case.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <[email protected]>

Closes apache#15901 from ueshin/issues/SPARK-18467.
…d BKM

## What changes were proposed in this pull request?

Add model summary APIs for `GaussianMixtureModel` and `BisectingKMeansModel` in pyspark.

## How was this patch tested?

Unit tests.

Author: sethah <[email protected]>

Closes apache#15777 from sethah/pyspark_cluster_summaries.
…atalyst.

## What changes were proposed in this pull request?

The nullabilities of `MapObject` can be made more strict by relying on `inputObject.nullable` and `lambdaFunction.nullable`.

Also `ExternalMapToCatalyst.dataType` can be made more strict by relying on `valueConverter.nullable`.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <[email protected]>

Closes apache#15840 from ueshin/issues/SPARK-18398.
## What changes were proposed in this pull request?

This PR adds a new JDBCOption `maxConnections` which means the maximum number of simultaneous JDBC connections allowed. This option applies only to writing with coalesce operation if needed. It defaults to the number of partitions of RDD. Previously, SQL users cannot cannot control this while Scala/Java/Python users can use `coalesce` (or `repartition`) API.

**Reported Scenario**

For the following cases, the number of connections becomes 200 and database cannot handle all of them.

```sql
CREATE OR REPLACE TEMPORARY VIEW resultview
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:oracle:thin:10.129.10.111:1521:BKDB",
  dbtable "result",
  user "HIVE",
  password "HIVE"
);
-- set spark.sql.shuffle.partitions=200
INSERT OVERWRITE TABLE resultview SELECT g, count(1) AS COUNT FROM tnet.DT_LIVE_INFO GROUP BY g
```

## How was this patch tested?

Manual. Do the followings and see Spark UI.

**Step 1 (MySQL)**
```
CREATE TABLE t1 (a INT);
CREATE TABLE data (a INT);
INSERT INTO data VALUES (1);
INSERT INTO data VALUES (2);
INSERT INTO data VALUES (3);
```

**Step 2 (Spark)**
```scala
SPARK_HOME=$PWD bin/spark-shell --driver-memory 4G --driver-class-path mysql-connector-java-5.1.40-bin.jar
scala> sql("SET spark.sql.shuffle.partitions=3")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW data USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 'data', user 'root', password '')")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '1')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '2')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '3')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
scala> sql("CREATE OR REPLACE TEMPORARY VIEW t1 USING org.apache.spark.sql.jdbc OPTIONS (url 'jdbc:mysql://localhost:3306/t', dbtable 't1', user 'root', password '', maxConnections '4')")
scala> sql("INSERT OVERWRITE TABLE t1 SELECT a FROM data GROUP BY a")
```

![maxconnections](https://cloud.githubusercontent.com/assets/9700541/20287987/ed8409c2-aa84-11e6-8aab-ae28e63fe54d.png)

Author: Dongjoon Hyun <[email protected]>

Closes apache#15868 from dongjoon-hyun/SPARK-18413.
@robert3005 robert3005 merged commit 813a33f into master Nov 21, 2016
@robert3005 robert3005 deleted the rk/merge branch November 21, 2016 15:10
robert3005 pushed a commit that referenced this pull request Mar 6, 2017
## What changes were proposed in this pull request?

These error below seems caused by unidoc that does not understand double commented block.

```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:69: error: class, interface, or enum expected
[error]  * MapGroupsWithStateFunction&lt;String, Integer, Integer, String&gt; mappingFunction =
[error]                                  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:69: error: class, interface, or enum expected
[error]  * MapGroupsWithStateFunction&lt;String, Integer, Integer, String&gt; mappingFunction =
[error]                                                                       ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:70: error: class, interface, or enum expected
[error]  *    new MapGroupsWithStateFunction&lt;String, Integer, Integer, String&gt;() {
[error]                                         ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:70: error: class, interface, or enum expected
[error]  *    new MapGroupsWithStateFunction&lt;String, Integer, Integer, String&gt;() {
[error]                                                                             ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:72: error: illegal character: '#'
[error]  *      &#64;Override
[error]          ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:72: error: class, interface, or enum expected
[error]  *      &#64;Override
[error]              ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected
[error]  *      public String call(String key, Iterator&lt;Integer&gt; value, KeyedState&lt;Integer&gt; state) {
[error]                ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected
[error]  *      public String call(String key, Iterator&lt;Integer&gt; value, KeyedState&lt;Integer&gt; state) {
[error]                                                    ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected
[error]  *      public String call(String key, Iterator&lt;Integer&gt; value, KeyedState&lt;Integer&gt; state) {
[error]                                                                ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected
[error]  *      public String call(String key, Iterator&lt;Integer&gt; value, KeyedState&lt;Integer&gt; state) {
[error]                                                                                     ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected
[error]  *      public String call(String key, Iterator&lt;Integer&gt; value, KeyedState&lt;Integer&gt; state) {
[error]                                                                                                 ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:76: error: class, interface, or enum expected
[error]  *          boolean shouldRemove = ...; // Decide whether to remove the state
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:77: error: class, interface, or enum expected
[error]  *          if (shouldRemove) {
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:79: error: class, interface, or enum expected
[error]  *          } else {
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:81: error: class, interface, or enum expected
[error]  *            state.update(newState); // Set the new state
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:82: error: class, interface, or enum expected
[error]  *          }
[error]  ^
[error] .../forked/spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:85: error: class, interface, or enum expected
[error]  *          state.update(initialState);
[error]  ^
[error] .../forked/spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:86: error: class, interface, or enum expected
[error]  *        }
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:90: error: class, interface, or enum expected
[error]  * </code></pre>
[error]  ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:92: error: class, interface, or enum expected
[error]  * tparam S User-defined type of the state to be stored for each key. Must be encodable into
[error]            ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:93: error: class, interface, or enum expected
[error]  *           Spark SQL types (see {link Encoder} for more details).
[error]                                          ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:94: error: class, interface, or enum expected
[error]  * since 2.1.1
[error]           ^
```

And another link seems unrecognisable.

```
.../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:16: error: reference not found
[error]  * That is, in every batch of the {link streaming.StreamingQuery StreamingQuery},
[error]
```

Note that this PR does not fix the two breaks as below:

```
[error] .../spark/sql/core/target/java/org/apache/spark/sql/DataFrameStatFunctions.java:43: error: unexpected content
[error]    * see {link DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile} for
[error]      ^
[error] .../spark/sql/core/target/java/org/apache/spark/sql/DataFrameStatFunctions.java:52: error: bad use of '>'
[error]    * param relativeError The relative target precision to achieve (>= 0).
[error]                                                                     ^
[error]
```

because these seem probably fixed soon in apache#16776 and I intended to avoid potential conflicts.

## How was this patch tested?

Manually via `jekyll build`

Author: hyukjinkwon <[email protected]>

Closes apache#16926 from HyukjinKwon/javadoc-break.
mattsills pushed a commit to mattsills/spark that referenced this pull request Jul 17, 2020
… and aggregates

### What changes were proposed in this pull request?

Currently, in the following scenario, bucket join is not utilized:
```scala
val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df")
df.write.format("parquet").bucketBy(8, "i").saveAsTable("t")
sql("CREATE VIEW v AS SELECT * FROM t")
sql("SELECT * FROM t a JOIN v b ON a.i = b.i").explain
```
```
== Physical Plan ==
*(4) SortMergeJoin [i#13], [i#15], Inner
:- *(1) Sort [i#13 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#13, j#14]
:     +- *(1) Filter isnotnull(i#13)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
+- *(3) Sort [i#15 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#15, 8), true, [id=palantir#64] <----- Exchange node introduced
      +- *(2) Project [i#13 AS i#15, j#14 AS j#16]
         +- *(2) Filter isnotnull(i#13)
            +- *(2) ColumnarToRow
               +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:..., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
```
Notice that `Exchange` is present. This is because `Project` introduces aliases and `outputPartitioning` and `requiredChildDistribution` do not consider aliases while considering bucket join in `EnsureRequirements`. This PR addresses to allow this scenario.

### Why are the changes needed?

This allows bucket join to be utilized in the above example.

### Does this PR introduce any user-facing change?

Yes, now with the fix, the `explain` out is as follows:
```
== Physical Plan ==
*(3) SortMergeJoin [i#13], [i#15], Inner
:- *(1) Sort [i#13 ASC NULLS FIRST], false, 0
:  +- *(1) Project [i#13, j#14]
:     +- *(1) Filter isnotnull(i#13)
:        +- *(1) ColumnarToRow
:           +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:.., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
+- *(2) Sort [i#15 ASC NULLS FIRST], false, 0
   +- *(2) Project [i#13 AS i#15, j#14 AS j#16]
      +- *(2) Filter isnotnull(i#13)
         +- *(2) ColumnarToRow
            +- FileScan parquet default.t[i#13,j#14] Batched: true, DataFilters: [isnotnull(i#13)], Format: Parquet, Location: InMemoryFileIndex[file:.., PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct<i:int,j:int>, SelectedBucketsCount: 8 out of 8
```
Note that the `Exchange` is no longer present.

### How was this patch tested?

Closes apache#26943 from imback82/bucket_alias.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
mattsills pushed a commit to mattsills/spark that referenced this pull request Jul 17, 2020
…in optimizations

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
  1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
  2. If you fix some SQL features, you can provide some references of other DBMSes.
  3. If there is design documentation, please add the link.
  4. If there is a discussion in the mailing list, please add the link.
-->
This is a followup of apache#26434

This PR use one special shuffle reader for skew join, so that we only have one join after optimization. In order to do that, this PR
1. add a very general `CustomShuffledRowRDD` which support all kind of partition arrangement.
2. move the logic of coalescing shuffle partitions to a util function, and call it during skew join optimization, to totally decouple with the `ReduceNumShufflePartitions` rule. It's too complicated to interfere skew join with `ReduceNumShufflePartitions`, as you need to consider the size of split partitions which don't respect target size already.

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, you can clarify why it is a bug.
-->
The current skew join optimization has a serious performance issue: the size of the query plan depends on the number and size of skewed partitions.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
no

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
existing tests

test UI manually:
![image](https://user-images.githubusercontent.com/3182036/74357390-cfb30480-4dfa-11ea-83f6-825d1b9379ca.png)

explain output
```
AdaptiveSparkPlan(isFinalPlan=true)
+- OverwriteByExpression org.apache.spark.sql.execution.datasources.noop.NoopTable$403a2ed5, [AlwaysTrue()], org.apache.spark.sql.util.CaseInsensitiveStringMap1f
   +- *(5) SortMergeJoin(skew=true) [key1#2L], [key2#6L], Inner
      :- *(3) Sort [key1#2L ASC NULLS FIRST], false, 0
      :  +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
      :     +- ShuffleQueryStage 0
      :        +- Exchange hashpartitioning(key1#2L, 200), true, [id=palantir#53]
      :           +- *(1) Project [(id#0L % 2) AS key1#2L]
      :              +- *(1) Filter isnotnull((id#0L % 2))
      :                 +- *(1) Range (0, 100000, step=1, splits=6)
      +- *(4) Sort [key2#6L ASC NULLS FIRST], false, 0
         +- SkewJoinShuffleReader 2 skewed partitions with size(max=5 KB, min=5 KB, avg=5 KB)
            +- ShuffleQueryStage 1
               +- Exchange hashpartitioning(key2#6L, 200), true, [id=palantir#64]
                  +- *(2) Project [((id#4L % 2) + 1) AS key2#6L]
                     +- *(2) Filter isnotnull(((id#4L % 2) + 1))
                        +- *(2) Range (0, 100000, step=1, splits=6)
```

Closes apache#27493 from cloud-fan/aqe.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: herman <[email protected]>
(cherry picked from commit a4ceea6)
Signed-off-by: herman <[email protected]>
rahij pushed a commit that referenced this pull request Oct 21, 2020
… more scenarios such as PartitioningCollection

### What changes were proposed in this pull request?

This PR proposes to improve  `EnsureRquirement.reorderJoinKeys` to handle the following scenarios:
1. If the keys cannot be reordered to match the left-side `HashPartitioning`, consider the right-side `HashPartitioning`.
2. Handle `PartitioningCollection`, which may contain `HashPartitioning`

### Why are the changes needed?

1. For the scenario 1), the current behavior matches either the left-side `HashPartitioning` or the right-side `HashPartitioning`. This means that if both sides are `HashPartitioning`, it will try to match only the left side.
The following will not consider the right-side `HashPartitioning`:
```
val df1 = (0 until 10).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 10).map(i => (i % 7, i % 11)).toDF("i2", "j2")
df1.write.format("parquet").bucketBy(4, "i1", "j1").saveAsTable("t1")df2.write.format("parquet").bucketBy(4, "i2", "j2").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val join = t1.join(t2, t1("i1") === t2("j2") && t1("i1") === t2("i2"))
 join.explain

== Physical Plan ==
*(5) SortMergeJoin [i1#26, i1#26], [j2#31, i2#30], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#69]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(4) Sort [j2#31 ASC NULLS FIRST, i2#30 ASC NULLS FIRST], false, 0.
   +- Exchange hashpartitioning(j2#31, i2#30, 4), true, [id=#79].       <===== This can be removed
      +- *(3) Project [i2#30, j2#31]
         +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4

```

2.  For the scenario 2), the current behavior does not handle `PartitioningCollection`:
```
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
val join = df1.join(df2, df1("i1") === df2("i2") && df1("j1") === df2("j2")) // PartitioningCollection
val join2 = join.join(df3, join("j1") === df3("j3") && join("i1") === df3("i3"))
join2.explain

== Physical Plan ==
*(9) SortMergeJoin [j1#8, i1#7], [j3#30, i3#29], Inner
:- *(6) Sort [j1#8 ASC NULLS FIRST, i1#7 ASC NULLS FIRST], false, 0.       <===== This can be removed
:  +- Exchange hashpartitioning(j1#8, i1#7, 5), true, [id=#58]             <===== This can be removed
:     +- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:        :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:        :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#45]
:        :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:        :        +- *(1) LocalTableScan [_1#2, _2#3]
:        +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:           +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#51]
:              +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:                 +- *(3) LocalTableScan [_1#13, _2#14]
+- *(8) Sort [j3#30 ASC NULLS FIRST, i3#29 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(j3#30, i3#29, 5), true, [id=#64]
      +- *(7) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(7) LocalTableScan [_1#24, _2#25]
```
### Does this PR introduce _any_ user-facing change?

Yes, now from the above examples, the shuffle/sort nodes pointed by `This can be removed` are now removed:
1. Senario 1):
```
== Physical Plan ==
*(4) SortMergeJoin [i1#26, i1#26], [i2#30, j2#31], Inner
:- *(2) Sort [i1#26 ASC NULLS FIRST, i1#26 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i1#26, i1#26, 4), true, [id=#67]
:     +- *(1) Project [i1#26, j1#27]
:        +- *(1) Filter isnotnull(i1#26)
:           +- *(1) ColumnarToRow
:              +- FileScan parquet default.t1[i1#26,j1#27] Batched: true, DataFilters: [isnotnull(i1#26)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 4 out of 4
+- *(3) Sort [i2#30 ASC NULLS FIRST, j2#31 ASC NULLS FIRST], false, 0
   +- *(3) Project [i2#30, j2#31]
      +- *(3) Filter (((j2#31 = i2#30) AND isnotnull(j2#31)) AND isnotnull(i2#30))
         +- *(3) ColumnarToRow
            +- FileScan parquet default.t2[i2#30,j2#31] Batched: true, DataFilters: [(j2#31 = i2#30), isnotnull(j2#31), isnotnull(i2#30)], Format: Parquet, Location: InMemoryFileIndex[..., PartitionFilters: [], PushedFilters: [IsNotNull(j2), IsNotNull(i2)], ReadSchema: struct<i2:int,j2:int>, SelectedBucketsCount: 4 out of 4
```
2. Scenario 2):
```
== Physical Plan ==
*(8) SortMergeJoin [i1#7, j1#8], [i3#29, j3#30], Inner
:- *(5) SortMergeJoin [i1#7, j1#8], [i2#18, j2#19], Inner
:  :- *(2) Sort [i1#7 ASC NULLS FIRST, j1#8 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(i1#7, j1#8, 5), true, [id=#43]
:  :     +- *(1) Project [_1#2 AS i1#7, _2#3 AS j1#8]
:  :        +- *(1) LocalTableScan [_1#2, _2#3]
:  +- *(4) Sort [i2#18 ASC NULLS FIRST, j2#19 ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(i2#18, j2#19, 5), true, [id=#49]
:        +- *(3) Project [_1#13 AS i2#18, _2#14 AS j2#19]
:           +- *(3) LocalTableScan [_1#13, _2#14]
+- *(7) Sort [i3#29 ASC NULLS FIRST, j3#30 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#29, j3#30, 5), true, [id=#58]
      +- *(6) Project [_1#24 AS i3#29, _2#25 AS j3#30]
         +- *(6) LocalTableScan [_1#24, _2#25]
```

### How was this patch tested?

Added tests.

Closes apache#29074 from imback82/reorder_keys.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
jdcasale pushed a commit that referenced this pull request Jun 22, 2021
* Add failing tests

* Limit alias substituion in ScanOperation

* Update FORK.md

* Consistently use 'cancelAfter'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Resync with upstream master