Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Arrow 7.0.0+ Support] [Mono-PR] Add support for Arrow 7+. #29161

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,65 @@
# Dask tests and examples.
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/...

- label: "Dataset tests"
- label: "Dataset tests (Arrow nightly)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ./ci/env/install-dependencies.sh
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=nightly ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Dataset tests (Arrow 10)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=10.* ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Dataset tests (Arrow 9)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=9.* ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Dataset tests (Arrow 8)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=8.* ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Dataset tests (Arrow 7)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=7.* ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Dataset tests (Arrow 6)"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- DATA_PROCESSING_TESTING=1 ARROW_VERSION=6.* ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/...
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/...

- label: "Workflow tests"
conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_WORKFLOW_AFFECTED"]
Expand Down
7 changes: 7 additions & 0 deletions ci/env/install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,13 @@ install_pip_packages() {
fi
if [ "${DATA_PROCESSING_TESTING-}" = 1 ]; then
pip install -U -c "${WORKSPACE_DIR}"/python/requirements.txt -r "${WORKSPACE_DIR}"/python/requirements/data_processing/requirements_dataset.txt
if [ -n "${ARROW_VERSION-}" ]; then
if [ "${ARROW_VERSION-}" = nightly ]; then
pip install --extra-index-url https://pypi.fury.io/arrow-nightlies/ --prefer-binary --pre pyarrow
else
pip install -U pyarrow=="${ARROW_VERSION}"
fi
fi
fi

# Remove this entire section once Serve dependencies are fixed.
Expand Down
4 changes: 4 additions & 0 deletions python/ray/_private/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING, List, Optional

from ray._private.client_mode_hook import client_mode_hook
from ray._private.utils import _add_creatable_buckets_param_if_s3_uri

if TYPE_CHECKING:
import pyarrow.fs
Expand Down Expand Up @@ -368,6 +369,9 @@ def _init_filesystem(create_valid_file: bool = False, check_valid_file: bool = T
fs_creator = _load_class(parsed_uri.netloc)
_filesystem, _storage_prefix = fs_creator(parsed_uri.path)
else:
# Arrow's S3FileSystem doesn't allow creating buckets by default, so we add a
# query arg enabling bucket creation if an S3 URI is provided.
_storage_uri = _add_creatable_buckets_param_if_s3_uri(_storage_uri)
_filesystem, _storage_prefix = pyarrow.fs.FileSystem.from_uri(_storage_uri)

if os.name == "nt":
Expand Down
101 changes: 101 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import hashlib
import importlib
import inspect
import json
import logging
import multiprocessing
import os
Expand All @@ -14,6 +15,7 @@
import tempfile
import threading
import time
from urllib.parse import urlencode, unquote, urlparse, parse_qsl, ParseResult
import uuid
import warnings
from inspect import signature
Expand Down Expand Up @@ -61,6 +63,7 @@


ENV_DISABLE_DOCKER_CPU_WARNING = "RAY_DISABLE_DOCKER_CPU_WARNING" in os.environ
_PYARROW_VERSION = None


def get_user_temp_dir():
Expand Down Expand Up @@ -1578,3 +1581,101 @@ def split_address(address: str) -> Tuple[str, str]:

module_string, inner_address = address.split("://", maxsplit=1)
return (module_string, inner_address)


def _add_url_query_params(
url: str, params: Dict[str, str], override: bool = False
) -> str:
"""Add params to the provided url as query parameters.

Args:
url: The URL to add query parameters to.
params: The query parameters to add.
override: Whether the provided params should override existing query parameters
in url: if True, the existing query parameters will be overwritten; if
False, the existing query parameters will take precedent. Default is False.

Returns:
URL with params added as query parameters.
"""
# Unquoting URL first so we don't loose existing args.
url = unquote(url)
# Parse URL.
parsed_url = urlparse(url)
# Converting URL query string arguments to dict.
base_params = dict(parse_qsl(parsed_url.query))
if not override:
# If not overriding, treat params as base parameters and override with existing
# query string parameters.
base_params, params = params, base_params
# Merging URL query string arguments dict with new params.
base_params.update(params)
# bool and dict values should be converted to json-friendly values.
base_params.update(
{
k: json.dumps(v)
for k, v in base_params.items()
if isinstance(v, (bool, dict))
}
)

# Converting URL arguments to proper query string.
encoded_params = urlencode(base_params, doseq=True)
# Creating new parsed result object based on provided with new
# URL arguments. Same thing happens inside of urlparse.
new_url = ParseResult(
parsed_url.scheme,
parsed_url.netloc,
parsed_url.path,
parsed_url.params,
encoded_params,
parsed_url.fragment,
).geturl()

return new_url


def _add_creatable_buckets_param_if_s3_uri(uri: str) -> str:
"""If the provided URI is an S3 URL, add allow_bucket_creation=true as a query
parameter. For pyarrow >= 9.0.0, this is required in order to allow
``S3FileSystem.create_dir()`` to create S3 buckets.

If the provided URI is not an S3 URL or if pyarrow < 9.0.0 is installed, we return
the URI unchanged.

Args:
uri: The URI that we'll add the query parameter to, if it's an S3 URL.

Returns:
A URI with the added allow_bucket_creation=true query parameter, if the provided
URI is an S3 URL; uri will be returned unchanged otherwise.
"""
from pkg_resources._vendor.packaging.version import parse as parse_version

pyarrow_version = _get_pyarrow_version()
if pyarrow_version is not None:
pyarrow_version = parse_version(pyarrow_version)
if pyarrow_version is not None and pyarrow_version < parse_version("9.0.0"):
# This bucket creation query parameter is not required for pyarrow < 9.0.0.
return uri
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "s3":
uri = _add_url_query_params(uri, {"allow_bucket_creation": True})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this override if there is existing allow_bucket_creation=False? If not, make it clear in the docstring about this semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, this won't override if the query parameter already exists; the optional override argument controls this and is documented in the _add_url_query_params docstring.

return uri


def _get_pyarrow_version() -> Optional[str]:
"""Get the version of the installed pyarrow package, returned as a tuple of ints.
Returns None if the package is not found.
"""
global _PYARROW_VERSION
if _PYARROW_VERSION is None:
try:
import pyarrow
except ModuleNotFoundError:
# pyarrow not installed, short-circuit.
pass
else:
if hasattr(pyarrow, "__version__"):
_PYARROW_VERSION = pyarrow.__version__
return _PYARROW_VERSION
Loading