Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into interval-literals-a…
Browse files Browse the repository at this point in the history
…s-ansi
  • Loading branch information
MaxGekk committed Apr 15, 2021
2 parents a408ab9 + 71133e1 commit e4a4776
Show file tree
Hide file tree
Showing 65 changed files with 7,773 additions and 494 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ jobs:
run: |
apache_spark_ref=`git rev-parse HEAD`
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF##*/}
git merge --progress --ff-only FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' merge --no-commit --progress --squash FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' commit -m "Merged commit"
echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT and Maven
Expand Down Expand Up @@ -186,7 +187,8 @@ jobs:
run: |
apache_spark_ref=`git rev-parse HEAD`
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF##*/}
git merge --progress --ff-only FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' merge --no-commit --progress --squash FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' commit -m "Merged commit"
echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT and Maven
Expand Down Expand Up @@ -261,7 +263,8 @@ jobs:
run: |
apache_spark_ref=`git rev-parse HEAD`
git fetch https://github.com/$GITHUB_REPOSITORY.git ${GITHUB_REF##*/}
git merge --progress --ff-only FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' merge --no-commit --progress --squash FETCH_HEAD
git -c user.name='Apache Spark Test Account' -c user.email='[email protected]' commit -m "Merged commit"
echo "::set-output name=APACHE_SPARK_REF::$apache_spark_ref"
# Cache local repositories. Note that GitHub Actions cache has a 2G limit.
- name: Cache Scala, SBT and Maven
Expand Down
16 changes: 2 additions & 14 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1583,15 +1583,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def addFile(
path: String, recursive: Boolean, addedOnSubmit: Boolean, isArchive: Boolean = false
): Unit = {
val uri = if (!isArchive) {
if (Utils.isAbsoluteURI(path) && path.contains("%")) {
new URI(path)
} else {
new Path(path).toUri
}
} else {
Utils.resolveURI(path)
}
val uri = Utils.resolveURI(path)
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
case "local" =>
Expand Down Expand Up @@ -1979,11 +1971,7 @@ class SparkContext(config: SparkConf) extends Logging {
// For local paths with backslashes on Windows, URI throws an exception
(addLocalJarFile(new File(path)), "local")
} else {
val uri = if (Utils.isAbsoluteURI(path) && path.contains("%")) {
new URI(path)
} else {
new Path(path).toUri
}
val uri = Utils.resolveURI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Utils.validateURL(uri)
val uriScheme = uri.getScheme
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{HttpURLConnection, URI, URL}
import java.net.{HttpURLConnection, InetSocketAddress, URI, URL}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files => JavaFiles, Paths}
import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, OWNER_WRITE}
Expand All @@ -41,6 +41,11 @@ import scala.util.Try
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.lang3.StringUtils
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.server.Handler
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods.{compact, render}

Expand Down Expand Up @@ -364,6 +369,22 @@ private[spark] object TestUtils {
}
}

def withHttpServer(resBaseDir: String = ".")(body: URL => Unit): Unit = {
// 0 as port means choosing randomly from the available ports
val server = new Server(new InetSocketAddress(Utils.localCanonicalHostName, 0))
val resHandler = new ResourceHandler()
resHandler.setResourceBase(resBaseDir)
val handlers = new HandlerList()
handlers.setHandlers(Array[Handler](resHandler, new DefaultHandler()))
server.setHandler(handlers)
server.start()
try {
body(server.getURI.toURL)
} finally {
server.stop()
}
}

/**
* Wait until at least `numExecutors` executors are up, or throw `TimeoutException` if the waiting
* time elapsed before `numExecutors` executors up. Exposed for testing.
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,9 @@ private[spark] class SparkSubmit extends Logging {
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)

if (args.verbose) {
childArgs ++= Seq("--verbose")
}
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}

Expand Down
12 changes: 12 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,18 @@ def __hash__(self):
"pyspark.pandas.tests.test_series_conversion",
"pyspark.pandas.tests.test_series_datetime",
"pyspark.pandas.tests.test_series_string",
"pyspark.pandas.tests.test_categorical",
"pyspark.pandas.tests.test_csv",
"pyspark.pandas.tests.test_groupby",
"pyspark.pandas.tests.test_expanding",
"pyspark.pandas.tests.test_indexing",
"pyspark.pandas.tests.test_namespace",
"pyspark.pandas.tests.test_repr",
"pyspark.pandas.tests.test_reshape",
"pyspark.pandas.tests.test_rolling",
"pyspark.pandas.tests.test_sql",
"pyspark.pandas.tests.test_stats",
"pyspark.pandas.tests.test_window",
"pyspark.pandas.tests.plot.test_frame_plot",
"pyspark.pandas.tests.plot.test_frame_plot_matplotlib",
"pyspark.pandas.tests.plot.test_frame_plot_plotly",
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ license: |

- In Spark 3.2, `CREATE TABLE .. LIKE ..` command can not use reserved properties. You need their specific clauses to specify them, for example, `CREATE TABLE test1 LIKE test LOCATION 'some path'`. You can set `spark.sql.legacy.notReserveProperties` to `true` to ignore the `ParseException`, in this case, these properties will be silently removed, for example: `TBLPROPERTIES('owner'='yao')` will have no effect. In Spark version 3.1 and below, the reserved properties can be used in `CREATE TABLE .. LIKE ..` command but have no side effects, for example, `TBLPROPERTIES('location'='/tmp')` does not change the location of the table but only create a headless property just like `'a'='b'`.

- In Spark 3.2, `TRANSFORM` operator can't support alias in inputs. In Spark 3.1 and earlier, we can write script transform like `SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL`.

## Upgrading from Spark SQL 3.0 to 3.1

- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
from pyspark.profiler import Profiler, BasicProfiler
from pyspark.version import __version__ # noqa: F401
from pyspark.version import __version__
from pyspark._globals import _NoValue # noqa: F401


Expand Down Expand Up @@ -125,4 +125,5 @@ def wrapper(self, *args, **kwargs):
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
"StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext",
"RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo", "InheritableThread",
"__version__",
]
3 changes: 1 addition & 2 deletions python/pyspark/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ from pyspark.taskcontext import ( # noqa: F401
TaskContext as TaskContext,
)
from pyspark.util import InheritableThread as InheritableThread # noqa: F401
from pyspark.version import __version__ as __version__

# Compatibility imports
from pyspark.sql import ( # noqa: F401
Expand All @@ -71,5 +72,3 @@ def copy_func(
doc: Optional[str] = ...,
) -> F: ...
def keyword_only(func: F) -> F: ...

__version__: str
13 changes: 7 additions & 6 deletions python/pyspark/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ def assert_python_version():

if sys.version_info[:2] <= deprecated_version:
warnings.warn(
"Koalas support for Python {dep_ver} is deprecated and will be dropped in "
"pandas-on-Spark support for Python {dep_ver} is deprecated and will be dropped in "
"the future release. At that point, existing Python {dep_ver} workflows "
"that use Koalas will continue to work without modification, but Python {dep_ver} "
"users will no longer get access to the latest Koalas features and bugfixes. "
"We recommend that you upgrade to Python {min_ver} or newer.".format(
"that use pandas-on-Spark will continue to work without modification, but "
"Python {dep_ver} users will no longer get access to the latest pandas-on-Spark "
"features and bugfixes. We recommend that you upgrade to Python {min_ver} or "
"newer.".format(
dep_ver=".".join(map(str, deprecated_version)),
min_ver=".".join(map(str, min_supported_version)),
),
Expand All @@ -68,8 +69,8 @@ def assert_python_version():
"'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to "
"set this environment variable to '1' in both driver and executor sides if you use "
"pyarrow>=2.0.0. "
"Koalas will set it for you but it does not work if there is a Spark context already "
"launched."
"pandas-on-Spark will set it for you but it does not work if there is a Spark context "
"already launched."
)
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

Expand Down
40 changes: 20 additions & 20 deletions python/pyspark/pandas/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#
"""
Koalas specific features.
pandas-on-Spark specific features.
"""
import inspect
from typing import Any, Optional, Tuple, Union, TYPE_CHECKING, cast
Expand Down Expand Up @@ -47,8 +47,8 @@
from pyspark.pandas.series import Series # noqa: F401 (SPARK-34943)


class KoalasFrameMethods(object):
""" Koalas specific features for DataFrame. """
class PandasOnSparkFrameMethods(object):
""" pandas-on-Spark specific features for DataFrame. """

def __init__(self, frame: "DataFrame"):
self._kdf = frame
Expand Down Expand Up @@ -194,10 +194,10 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame":
See also `Transform and apply a function
<https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
.. note:: the `func` is unable to access to the whole input frame. Koalas internally
splits the input series into multiple batches and calls `func` with each batch multiple
times. Therefore, operations such as global aggregations are impossible. See the example
below.
.. note:: the `func` is unable to access to the whole input frame. pandas-on-Spark
internally splits the input series into multiple batches and calls `func` with each
batch multiple times. Therefore, operations such as global aggregations are impossible.
See the example below.
>>> # This case does not return the length of whole frame but of the batch internally
... # used.
Expand Down Expand Up @@ -286,7 +286,7 @@ def apply_batch(self, func, args=(), **kwds) -> "DataFrame":
A B
0 1 2
You can also omit the type hints so Koalas infers the return schema as below:
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.koalas.apply_batch(lambda pdf: pdf.query('A == 1'))
A B
Expand Down Expand Up @@ -400,10 +400,10 @@ def transform_batch(self, func, *args, **kwargs) -> Union["DataFrame", "Series"]
See also `Transform and apply a function
<https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
.. note:: the `func` is unable to access to the whole input frame. Koalas internally
splits the input series into multiple batches and calls `func` with each batch multiple
times. Therefore, operations such as global aggregations are impossible. See the example
below.
.. note:: the `func` is unable to access to the whole input frame. pandas-on-Spark
internally splits the input series into multiple batches and calls `func` with each
batch multiple times. Therefore, operations such as global aggregations are impossible.
See the example below.
>>> # This case does not return the length of whole frame but of the batch internally
... # used.
Expand Down Expand Up @@ -497,7 +497,7 @@ def transform_batch(self, func, *args, **kwargs) -> Union["DataFrame", "Series"]
2 7
dtype: int64
You can also omit the type hints so Koalas infers the return schema as below:
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.koalas.transform_batch(lambda pdf: pdf + 1)
A B
Expand Down Expand Up @@ -699,8 +699,8 @@ def pandas_frame_func(f, field_name):
return DataFrame(internal)


class KoalasSeriesMethods(object):
""" Koalas specific features for Series. """
class PandasOnSparkSeriesMethods(object):
""" pandas-on-Spark specific features for Series. """

def __init__(self, series: "Series"):
self._kser = series
Expand All @@ -713,10 +713,10 @@ def transform_batch(self, func, *args, **kwargs) -> "Series":
See also `Transform and apply a function
<https://koalas.readthedocs.io/en/latest/user_guide/transform_apply.html>`_.
.. note:: the `func` is unable to access to the whole input series. Koalas internally
splits the input series into multiple batches and calls `func` with each batch multiple
times. Therefore, operations such as global aggregations are impossible. See the example
below.
.. note:: the `func` is unable to access to the whole input series. pandas-on-Spark
internally splits the input series into multiple batches and calls `func` with each
batch multiple times. Therefore, operations such as global aggregations are impossible.
See the example below.
>>> # This case does not return the length of whole frame but of the batch internally
... # used.
Expand Down Expand Up @@ -774,7 +774,7 @@ def transform_batch(self, func, *args, **kwargs) -> "Series":
2 6
Name: A, dtype: int64
You can also omit the type hints so Koalas infers the return schema as below:
You can also omit the type hints so pandas-on-Spark infers the return schema as below:
>>> df.A.koalas.transform_batch(lambda pser: pser + 1)
0 2
Expand Down
Loading

0 comments on commit e4a4776

Please sign in to comment.