diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index 21afada68e84..8daef4de9c98 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -269,6 +269,46 @@ # Dask tests and examples. - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=-client python/ray/util/dask/... +- label: "Dataset tests (Arrow nightly)" + conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] + instance_size: medium + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - DATA_PROCESSING_TESTING=1 ARROW_VERSION=nightly ./ci/env/install-dependencies.sh + - ./ci/env/env_info.sh + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/... + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/... + +- label: "Dataset tests (Arrow 10)" + conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] + instance_size: medium + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - DATA_PROCESSING_TESTING=1 ARROW_VERSION=10.* ./ci/env/install-dependencies.sh + - ./ci/env/env_info.sh + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/... + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/... + +- label: "Dataset tests (Arrow 9)" + conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] + instance_size: medium + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - DATA_PROCESSING_TESTING=1 ARROW_VERSION=9.* ./ci/env/install-dependencies.sh + - ./ci/env/env_info.sh + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/... + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/... + +- label: "Dataset tests (Arrow 8)" + conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] + instance_size: medium + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - DATA_PROCESSING_TESTING=1 ARROW_VERSION=8.* ./ci/env/install-dependencies.sh + - ./ci/env/env_info.sh + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only python/ray/data/... + - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=ray_data python/ray/air/... + - label: "Dataset tests (Arrow 7)" conditions: ["NO_WHEELS_REQUIRED", "RAY_CI_PYTHON_AFFECTED", "RAY_CI_DATA_AFFECTED"] instance_size: medium diff --git a/python/ray/_private/storage.py b/python/ray/_private/storage.py index d45ee9f50364..cbe75c0b189b 100644 --- a/python/ray/_private/storage.py +++ b/python/ray/_private/storage.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, List, Optional from ray._private.client_mode_hook import client_mode_hook +from ray._private.utils import _add_creatable_buckets_param_if_s3_uri if TYPE_CHECKING: import pyarrow.fs @@ -368,6 +369,9 @@ def _init_filesystem(create_valid_file: bool = False, check_valid_file: bool = T fs_creator = _load_class(parsed_uri.netloc) _filesystem, _storage_prefix = fs_creator(parsed_uri.path) else: + # Arrow's S3FileSystem doesn't allow creating buckets by default, so we add a + # query arg enabling bucket creation if an S3 URI is provided. + _storage_uri = _add_creatable_buckets_param_if_s3_uri(_storage_uri) _filesystem, _storage_prefix = pyarrow.fs.FileSystem.from_uri(_storage_uri) if os.name == "nt": diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index d1cb74633ed5..4c74fd29292a 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -15,6 +15,7 @@ import tempfile import threading import time +from urllib.parse import urlencode, unquote, urlparse, parse_qsl, urlunparse import uuid import warnings from inspect import signature @@ -1599,6 +1600,73 @@ def split_address(address: str) -> Tuple[str, str]: return (module_string, inner_address) +def _add_url_query_params(url: str, params: Dict[str, str]) -> str: + """Add params to the provided url as query parameters. + + If url already contains query parameters, they will be merged with params, with the + existing query parameters overriding any in params with the same parameter name. + + Args: + url: The URL to add query parameters to. + params: The query parameters to add. + + Returns: + URL with params added as query parameters. + """ + # Unquote URL first so we don't lose existing args. + url = unquote(url) + # Parse URL. + parsed_url = urlparse(url) + # Merge URL query string arguments dict with new params. + base_params = params + params = dict(parse_qsl(parsed_url.query)) + base_params.update(params) + # bool and dict values should be converted to json-friendly values. + base_params.update( + { + k: json.dumps(v) + for k, v in base_params.items() + if isinstance(v, (bool, dict)) + } + ) + + # Convert URL arguments to proper query string. + encoded_params = urlencode(base_params, doseq=True) + # Replace query string in parsed URL with updated query string. + parsed_url = parsed_url._replace(query=encoded_params) + # Convert back to URL. + return urlunparse(parsed_url) + + +def _add_creatable_buckets_param_if_s3_uri(uri: str) -> str: + """If the provided URI is an S3 URL, add allow_bucket_creation=true as a query + parameter. For pyarrow >= 9.0.0, this is required in order to allow + ``S3FileSystem.create_dir()`` to create S3 buckets. + + If the provided URI is not an S3 URL or if pyarrow < 9.0.0 is installed, we return + the URI unchanged. + + Args: + uri: The URI that we'll add the query parameter to, if it's an S3 URL. + + Returns: + A URI with the added allow_bucket_creation=true query parameter, if the provided + URI is an S3 URL; uri will be returned unchanged otherwise. + """ + from pkg_resources._vendor.packaging.version import parse as parse_version + + pyarrow_version = _get_pyarrow_version() + if pyarrow_version is not None: + pyarrow_version = parse_version(pyarrow_version) + if pyarrow_version is not None and pyarrow_version < parse_version("9.0.0"): + # This bucket creation query parameter is not required for pyarrow < 9.0.0. + return uri + parsed_uri = urlparse(uri) + if parsed_uri.scheme == "s3": + uri = _add_url_query_params(uri, {"allow_bucket_creation": True}) + return uri + + def _get_pyarrow_version() -> Optional[str]: """Get the version of the installed pyarrow package, returned as a tuple of ints. Returns None if the package is not found. diff --git a/python/ray/air/tests/test_tensor_extension.py b/python/ray/air/tests/test_tensor_extension.py index 0a3d9848f595..a04c1ac3359e 100644 --- a/python/ray/air/tests/test_tensor_extension.py +++ b/python/ray/air/tests/test_tensor_extension.py @@ -2,6 +2,7 @@ import numpy as np import pandas as pd +from pkg_resources._vendor.packaging.version import parse as parse_version import pyarrow as pa import pytest @@ -12,6 +13,7 @@ ArrowVariableShapedTensorType, ) from ray.air.util.tensor_extensions.pandas import TensorArray, TensorDtype +from ray._private.utils import _get_pyarrow_version def test_tensor_array_validation(): @@ -343,8 +345,20 @@ def test_arrow_tensor_array_getitem(chunked): if chunked: t_arr = pa.chunked_array(t_arr) - for idx in range(outer_dim): - np.testing.assert_array_equal(t_arr[idx], arr[idx]) + pyarrow_version = parse_version(_get_pyarrow_version()) + if ( + chunked + and pyarrow_version >= parse_version("8.0.0") + and pyarrow_version < parse_version("9.0.0") + ): + for idx in range(outer_dim): + item = t_arr[idx] + assert isinstance(item, pa.ExtensionScalar) + item = item.type._extension_scalar_to_ndarray(item) + np.testing.assert_array_equal(item, arr[idx]) + else: + for idx in range(outer_dim): + np.testing.assert_array_equal(t_arr[idx], arr[idx]) # Test __iter__. for t_subarr, subarr in zip(t_arr, arr): @@ -368,8 +382,19 @@ def test_arrow_tensor_array_getitem(chunked): np.testing.assert_array_equal(t_arr2_npy, arr[1:]) - for idx in range(1, outer_dim): - np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx]) + if ( + chunked + and pyarrow_version >= parse_version("8.0.0") + and pyarrow_version < parse_version("9.0.0") + ): + for idx in range(1, outer_dim): + item = t_arr2[idx - 1] + assert isinstance(item, pa.ExtensionScalar) + item = item.type._extension_scalar_to_ndarray(item) + np.testing.assert_array_equal(item, arr[idx]) + else: + for idx in range(1, outer_dim): + np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx]) @pytest.mark.parametrize("chunked", [False, True]) @@ -387,8 +412,20 @@ def test_arrow_variable_shaped_tensor_array_getitem(chunked): if chunked: t_arr = pa.chunked_array(t_arr) - for idx in range(outer_dim): - np.testing.assert_array_equal(t_arr[idx], arr[idx]) + pyarrow_version = parse_version(_get_pyarrow_version()) + if ( + chunked + and pyarrow_version >= parse_version("8.0.0") + and pyarrow_version < parse_version("9.0.0") + ): + for idx in range(outer_dim): + item = t_arr[idx] + assert isinstance(item, pa.ExtensionScalar) + item = item.type._extension_scalar_to_ndarray(item) + np.testing.assert_array_equal(item, arr[idx]) + else: + for idx in range(outer_dim): + np.testing.assert_array_equal(t_arr[idx], arr[idx]) # Test __iter__. for t_subarr, subarr in zip(t_arr, arr): @@ -414,8 +451,19 @@ def test_arrow_variable_shaped_tensor_array_getitem(chunked): for t_subarr, subarr in zip(t_arr2_npy, arr[1:]): np.testing.assert_array_equal(t_subarr, subarr) - for idx in range(1, outer_dim): - np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx]) + if ( + chunked + and pyarrow_version >= parse_version("8.0.0") + and pyarrow_version < parse_version("9.0.0") + ): + for idx in range(1, outer_dim): + item = t_arr2[idx - 1] + assert isinstance(item, pa.ExtensionScalar) + item = item.type._extension_scalar_to_ndarray(item) + np.testing.assert_array_equal(item, arr[idx]) + else: + for idx in range(1, outer_dim): + np.testing.assert_array_equal(t_arr2[idx - 1], arr[idx]) @pytest.mark.parametrize( diff --git a/python/ray/air/util/tensor_extensions/arrow.py b/python/ray/air/util/tensor_extensions/arrow.py index e752b3d9ce66..a6f8282b66cb 100644 --- a/python/ray/air/util/tensor_extensions/arrow.py +++ b/python/ray/air/util/tensor_extensions/arrow.py @@ -1,13 +1,51 @@ import itertools from typing import Iterable, Optional, Tuple, List, Sequence, Union +from pkg_resources._vendor.packaging.version import parse as parse_version import numpy as np import pyarrow as pa from ray.air.util.tensor_extensions.utils import _is_ndarray_variable_shaped_tensor +from ray._private.utils import _get_pyarrow_version from ray.util.annotations import PublicAPI +PYARROW_VERSION = _get_pyarrow_version() +if PYARROW_VERSION is not None: + PYARROW_VERSION = parse_version(PYARROW_VERSION) +# Minimum version of Arrow that supports ExtensionScalars. +# TODO(Clark): Remove conditional definition once we only support Arrow 8.0.0+. +MIN_PYARROW_VERSION_SCALAR = parse_version("8.0.0") +# Minimum version of Arrow that supports subclassable ExtensionScalars. +# TODO(Clark): Remove conditional definition once we only support Arrow 9.0.0+. +MIN_PYARROW_VERSION_SCALAR_SUBCLASS = parse_version("9.0.0") + + +def _arrow_supports_extension_scalars(): + """ + Whether Arrow ExtensionScalars are supported in the current pyarrow version. + + This returns True if the pyarrow version is 8.0.0+, or if the pyarrow version is + unknown. + """ + # TODO(Clark): Remove utility once we only support Arrow 8.0.0+. + return PYARROW_VERSION is None or PYARROW_VERSION >= MIN_PYARROW_VERSION_SCALAR + + +def _arrow_extension_scalars_are_subclassable(): + """ + Whether Arrow ExtensionScalars support subclassing in the current pyarrow version. + + This returns True if the pyarrow version is 9.0.0+, or if the pyarrow version is + unknown. + """ + # TODO(Clark): Remove utility once we only support Arrow 9.0.0+. + return ( + PYARROW_VERSION is None + or PYARROW_VERSION >= MIN_PYARROW_VERSION_SCALAR_SUBCLASS + ) + + @PublicAPI(stability="beta") class ArrowTensorType(pa.PyExtensionType): """ @@ -62,6 +100,27 @@ def __arrow_ext_class__(self): """ return ArrowTensorArray + if _arrow_extension_scalars_are_subclassable(): + # TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+. + def __arrow_ext_scalar_class__(self): + """ + ExtensionScalar subclass with custom logic for this array of tensors type. + """ + return ArrowTensorScalar + + if _arrow_supports_extension_scalars(): + # TODO(Clark): Remove this version guard once we only support Arrow 8.0.0+. + def _extension_scalar_to_ndarray( + self, scalar: pa.ExtensionScalar + ) -> np.ndarray: + """ + Convert an ExtensionScalar to a tensor element. + """ + # TODO(Clark): Construct ndarray view directly on tensor element buffer to + # ensure reliable zero-copy semantics. + flat_ndarray = scalar.value.values.to_numpy(zero_copy_only=False) + return flat_ndarray.reshape(self.shape) + def __str__(self) -> str: return ( f"ArrowTensorType(shape={self.shape}, dtype={self.storage_type.value_type})" @@ -71,8 +130,85 @@ def __repr__(self) -> str: return str(self) +if _arrow_extension_scalars_are_subclassable(): + # TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+. + @PublicAPI(stability="beta") + class ArrowTensorScalar(pa.ExtensionScalar): + def as_py(self) -> np.ndarray: + return self.type._extension_scalar_to_ndarray(self) + + def __array__(self) -> np.ndarray: + return self.as_py() + + +# TODO(Clark): Remove this mixin once we only support Arrow 9.0.0+. +class _ArrowTensorScalarIndexingMixin: + """ + A mixin providing support for scalar indexing in tensor extension arrays for + Arrow < 9.0.0, before full ExtensionScalar support was added. This mixin overrides + __getitem__, __iter__, and to_pylist. + """ + + # This mixin will be a no-op (no methods added) for Arrow 9.0.0+. + if not _arrow_extension_scalars_are_subclassable(): + # NOTE: These __iter__ and to_pylist definitions are shared for both + # Arrow < 8.0.0 and Arrow 8.*. + def __iter__(self): + # Override pa.Array.__iter__() in order to return an iterator of + # properly shaped tensors instead of an iterator of flattened tensors. + # See comment in above __getitem__ method. + for i in range(len(self)): + # Use overridden __getitem__ method. + yield self.__getitem__(i) + + def to_pylist(self): + # Override pa.Array.to_pylist() due to a lack of ExtensionScalar + # support (see comment in __getitem__). + return list(self) + + if _arrow_supports_extension_scalars(): + # NOTE(Clark): This __getitem__ override is only needed for Arrow 8.*, + # before ExtensionScalar subclassing support was added. + # TODO(Clark): Remove these methods once we only support Arrow 9.0.0+. + def __getitem__(self, key): + # This __getitem__ hook allows us to support proper indexing when + # accessing a single tensor (a "scalar" item of the array). Without this + # hook for integer keys, the indexing will fail on pyarrow < 9.0.0 due + # to a lack of ExtensionScalar subclassing support. + + # NOTE(Clark): We'd like to override the pa.Array.getitem() helper + # instead, which would obviate the need for overriding __iter__(), but + # unfortunately overriding Cython cdef methods with normal Python + # methods isn't allowed. + item = super().__getitem__(key) + if not isinstance(key, slice): + item = item.type._extension_scalar_to_ndarray(item) + return item + + else: + # NOTE(Clark): This __getitem__ override is only needed for Arrow < 8.0.0, + # before any ExtensionScalar support was added. + # TODO(Clark): Remove these methods once we only support Arrow 8.0.0+. + def __getitem__(self, key): + # This __getitem__ hook allows us to support proper indexing when + # accessing a single tensor (a "scalar" item of the array). Without this + # hook for integer keys, the indexing will fail on pyarrow < 8.0.0 due + # to a lack of ExtensionScalar support. + + # NOTE(Clark): We'd like to override the pa.Array.getitem() helper + # instead, which would obviate the need for overriding __iter__(), but + # unfortunately overriding Cython cdef methods with normal Python + # methods isn't allowed. + if isinstance(key, slice): + return super().__getitem__(key) + return self._to_numpy(key) + + +# NOTE: We need to inherit from the mixin before pa.ExtensionArray to ensure that the +# mixin's overriding methods appear first in the MRO. +# TODO(Clark): Remove this mixin once we only support Arrow 9.0.0+. @PublicAPI(stability="beta") -class ArrowTensorArray(pa.ExtensionArray): +class ArrowTensorArray(_ArrowTensorScalarIndexingMixin, pa.ExtensionArray): """ An array of fixed-shape, homogeneous-typed tensors. @@ -84,37 +220,6 @@ class ArrowTensorArray(pa.ExtensionArray): OFFSET_DTYPE = np.int32 - def __getitem__(self, key): - # This __getitem__ hook allows us to support proper - # indexing when accessing a single tensor (a "scalar" item of the - # array). Without this hook for integer keys, the indexing will fail on - # all currently released pyarrow versions due to a lack of proper - # ExtensionScalar support. Support was added in - # https://github.com/apache/arrow/pull/10904, but hasn't been released - # at the time of this comment, and even with this support, the returned - # ndarray is a flat representation of the n-dimensional tensor. - - # NOTE(Clark): We'd like to override the pa.Array.getitem() helper - # instead, which would obviate the need for overriding __iter__() - # below, but unfortunately overriding Cython cdef methods with normal - # Python methods isn't allowed. - if isinstance(key, slice): - return super().__getitem__(key) - return self._to_numpy(key) - - def __iter__(self): - # Override pa.Array.__iter__() in order to return an iterator of - # properly shaped tensors instead of an iterator of flattened tensors. - # See comment in above __getitem__ method. - for i in range(len(self)): - # Use overridden __getitem__ method. - yield self.__getitem__(i) - - def to_pylist(self): - # Override pa.Array.to_pylist() due to a lack of ExtensionScalar - # support (see comment in __getitem__). - return list(self) - @classmethod def from_numpy( cls, arr: Union[np.ndarray, Iterable[np.ndarray]] @@ -437,6 +542,14 @@ def __arrow_ext_class__(self): """ return ArrowVariableShapedTensorArray + if _arrow_extension_scalars_are_subclassable(): + # TODO(Clark): Remove this version guard once we only support Arrow 9.0.0+. + def __arrow_ext_scalar_class__(self): + """ + ExtensionScalar subclass with custom logic for this array of tensors type. + """ + return ArrowTensorScalar + def __str__(self) -> str: dtype = self.storage_type["data"].type.value_type return f"ArrowVariableShapedTensorType(dtype={dtype}, ndim={self.ndim})" @@ -444,9 +557,30 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) + if _arrow_supports_extension_scalars(): + # TODO(Clark): Remove this version guard once we only support Arrow 8.0.0+. + def _extension_scalar_to_ndarray( + self, scalar: pa.ExtensionScalar + ) -> np.ndarray: + """ + Convert an ExtensionScalar to a tensor element. + """ + # TODO(Clark): Construct ndarray view directly on tensor element buffer to + # ensure reliable zero-copy semantics. + flat_ndarray = scalar.value.get("data").values.to_numpy( + zero_copy_only=False + ) + shape = tuple(scalar.value.get("shape").as_py()) + return flat_ndarray.reshape(shape) + +# NOTE: We need to inherit from the mixin before pa.ExtensionArray to ensure that the +# mixin's overriding methods appear first in the MRO. +# TODO(Clark): Remove this mixin once we only support Arrow 9.0.0+. @PublicAPI(stability="alpha") -class ArrowVariableShapedTensorArray(pa.ExtensionArray): +class ArrowVariableShapedTensorArray( + _ArrowTensorScalarIndexingMixin, pa.ExtensionArray +): """ An array of heterogeneous-shaped, homogeneous-typed tensors. @@ -462,36 +596,6 @@ class ArrowVariableShapedTensorArray(pa.ExtensionArray): OFFSET_DTYPE = np.int32 - def __getitem__(self, key): - # This __getitem__ hook allows us to support proper indexing when accessing a - # single tensor (a "scalar" item of the array). Without this hook for integer - # keys, the indexing will fail on all currently released pyarrow versions due - # to a lack of proper ExtensionScalar support. Support was added in - # https://github.com/apache/arrow/pull/10904, but hasn't been released at the - # time of this comment, and even with this support, the returned ndarray is a - # flat representation of the n-dimensional tensor. - - # NOTE(Clark): We'd like to override the pa.Array.getitem() helper instead, - # which would obviate the need for overriding __iter__() below, but - # unfortunately overriding Cython cdef methods with normal Python methods isn't - # allowed. - if isinstance(key, slice): - return super().__getitem__(key) - return self._to_numpy(key) - - def __iter__(self): - # Override pa.Array.__iter__() in order to return an iterator of properly - # shaped tensors instead of an iterator of flattened tensors. - # See comment in above __getitem__ method. - for i in range(len(self)): - # Use overridden __getitem__ method. - yield self.__getitem__(i) - - def to_pylist(self): - # Override pa.Array.to_pylist() due to a lack of ExtensionScalar support (see - # comment in __getitem__). - return list(self) - @classmethod def from_numpy( cls, arr: Union[np.ndarray, List[np.ndarray], Tuple[np.ndarray]] diff --git a/python/ray/data/_internal/arrow_block.py b/python/ray/data/_internal/arrow_block.py index 49e39975c3eb..be1865e52d9d 100644 --- a/python/ray/data/_internal/arrow_block.py +++ b/python/ray/data/_internal/arrow_block.py @@ -16,6 +16,7 @@ import numpy as np +from ray._private.utils import _get_pyarrow_version from ray.data._internal.arrow_ops import transform_polars, transform_pyarrow from ray.data._internal.table_block import ( VALUE_COL_NAME, @@ -181,7 +182,26 @@ def numpy_to_block( @staticmethod def _build_tensor_row(row: ArrowRow, col_name: str = VALUE_COL_NAME) -> np.ndarray: + from pkg_resources._vendor.packaging.version import parse as parse_version + element = row[col_name][0] + # TODO(Clark): Reduce this to np.asarray(element) once we only support Arrow + # 9.0.0+. + pyarrow_version = _get_pyarrow_version() + if pyarrow_version is not None: + pyarrow_version = parse_version(pyarrow_version) + if pyarrow_version is None or pyarrow_version >= parse_version("8.0.0"): + assert isinstance(element, pyarrow.ExtensionScalar) + if pyarrow_version is None or pyarrow_version >= parse_version("9.0.0"): + # For Arrow 9.0.0+, accessing an element in a chunked tensor array + # produces an ArrowTensorScalar, which we convert to an ndarray using + # .as_py(). + element = element.as_py() + else: + # For Arrow 8.*, accessing an element in a chunked tensor array produces + # an ExtensionScalar, which we convert to an ndarray using our custom + # method. + element = element.type._extension_scalar_to_ndarray(element) # For Arrow < 8.0.0, accessing an element in a chunked tensor array produces an # ndarray, which we return directly. return element diff --git a/python/ray/data/_internal/util.py b/python/ray/data/_internal/util.py index a75232cae1ac..e9a731eabdd6 100644 --- a/python/ray/data/_internal/util.py +++ b/python/ray/data/_internal/util.py @@ -21,8 +21,6 @@ # constraints given in python/setup.py. # Inclusive minimum pyarrow version. MIN_PYARROW_VERSION = "6.0.1" -# Exclusive maximum pyarrow version. -MAX_PYARROW_VERSION = "8.0.0" RAY_DISABLE_PYARROW_VERSION_CHECK = "RAY_DISABLE_PYARROW_VERSION_CHECK" _VERSION_VALIDATED = False _LOCAL_SCHEME = "local" @@ -58,13 +56,11 @@ def _check_pyarrow_version(): if version is not None: from pkg_resources._vendor.packaging.version import parse as parse_version - if (parse_version(version) < parse_version(MIN_PYARROW_VERSION)) or ( - parse_version(version) >= parse_version(MAX_PYARROW_VERSION) - ): + if parse_version(version) < parse_version(MIN_PYARROW_VERSION): raise ImportError( - f"Datasets requires pyarrow >= {MIN_PYARROW_VERSION}, < " - f"{MAX_PYARROW_VERSION}, but {version} is installed. Reinstall " - f'with `pip install -U "pyarrow<{MAX_PYARROW_VERSION}"`. ' + f"Datasets requires pyarrow >= {MIN_PYARROW_VERSION}, but " + f"{version} is installed. Reinstall with " + f'`pip install -U "pyarrow"`. ' "If you want to disable this pyarrow version check, set the " f"environment variable {RAY_DISABLE_PYARROW_VERSION_CHECK}=1." ) @@ -72,8 +68,8 @@ def _check_pyarrow_version(): logger.warning( "You are using the 'pyarrow' module, but the exact version is unknown " "(possibly carried as an internal component by another module). Please " - f"make sure you are using pyarrow >= {MIN_PYARROW_VERSION}, < " - f"{MAX_PYARROW_VERSION} to ensure compatibility with Ray Datasets. " + f"make sure you are using pyarrow >= {MIN_PYARROW_VERSION} to ensure " + "compatibility with Ray Datasets. " "If you want to disable this pyarrow version check, set the " f"environment variable {RAY_DISABLE_PYARROW_VERSION_CHECK}=1." ) diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index bc844a0c8b0c..e4cd15550ba3 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -35,6 +35,7 @@ from ray.types import ObjectRef from ray.util.annotations import DeveloperAPI, PublicAPI +from ray._private.utils import _add_creatable_buckets_param_if_s3_uri if TYPE_CHECKING: import pandas as pd @@ -275,7 +276,10 @@ def do_write( path, filesystem = _resolve_paths_and_filesystem(path, filesystem) path = path[0] if try_create_dir: - filesystem.create_dir(path, recursive=True) + # Arrow's S3FileSystem doesn't allow creating buckets by default, so we add + # a query arg enabling bucket creation if an S3 URI is provided. + tmp = _add_creatable_buckets_param_if_s3_uri(path) + filesystem.create_dir(tmp, recursive=True) filesystem = _wrap_s3_serialization_workaround(filesystem) _write_block_to_file = self._write_block diff --git a/python/ray/data/datasource/file_meta_provider.py b/python/ray/data/datasource/file_meta_provider.py index fbeb173fa04c..057c39522411 100644 --- a/python/ray/data/datasource/file_meta_provider.py +++ b/python/ray/data/datasource/file_meta_provider.py @@ -332,7 +332,15 @@ def prefetch_file_metadata( def _handle_read_os_error(error: OSError, paths: Union[str, List[str]]) -> str: # NOTE: this is not comprehensive yet, and should be extended as more errors arise. - aws_error_pattern = r"^(.*)AWS Error \[code \d+\]: No response body\.(.*)$" + # NOTE: The latter patterns are raised in Arrow 10+, while the former is raised in + # Arrow < 10. + aws_error_pattern = ( + r"^(?:(.*)AWS Error \[code \d+\]: No response body\.(.*))|" + r"(?:(.*)AWS Error UNKNOWN \(HTTP status 400\) during HeadObject operation: " + r"No response body\.(.*))|" + r"(?:(.*)AWS Error ACCESS_DENIED during HeadObject operation: No response " + r"body\.(.*))$" + ) if re.match(aws_error_pattern, str(error)): # Specially handle AWS error when reading files, to give a clearer error # message to avoid confusing users. The real issue is most likely that the AWS diff --git a/python/ray/data/tests/conftest.py b/python/ray/data/tests/conftest.py index 627e34ea11ad..a3b2e0bd787e 100644 --- a/python/ray/data/tests/conftest.py +++ b/python/ray/data/tests/conftest.py @@ -13,6 +13,7 @@ from ray.data.datasource.file_based_datasource import BlockWritePathProvider from ray.air.constants import TENSOR_COLUMN_NAME from ray.air.util.tensor_extensions.arrow import ArrowTensorArray +from ray._private.utils import _get_pyarrow_version # Trigger pytest hook to automatically zip test cluster logs to archive dir on failure from ray.tests.conftest import pytest_runtest_makereport # noqa @@ -96,10 +97,19 @@ def s3_fs_with_anonymous_crendential( def _s3_fs(aws_credentials, s3_server, s3_path): + from pkg_resources._vendor.packaging.version import parse as parse_version import urllib.parse + kwargs = aws_credentials.copy() + + if parse_version(_get_pyarrow_version()) >= parse_version("9.0.0"): + kwargs["allow_bucket_creation"] = True + kwargs["allow_bucket_deletion"] = True + fs = pa.fs.S3FileSystem( - region="us-west-2", endpoint_override=s3_server, **aws_credentials + region="us-west-2", + endpoint_override=s3_server, + **kwargs, ) if s3_path.startswith("s3://"): if "@" in s3_path: @@ -343,7 +353,7 @@ def ds_numpy_list_of_ndarray_tensor_format(ray_start_regular_shared): yield ray.data.from_numpy([np.arange(4).reshape((1, 2, 2))] * 4) -@pytest.fixture(params=["5.0.0", "8.0.0"]) +@pytest.fixture(params=["5.0.0"]) def unsupported_pyarrow_version(request): orig_version = pa.__version__ pa.__version__ = request.param @@ -355,18 +365,6 @@ def unsupported_pyarrow_version(request): pa.__version__ = orig_version -@pytest.fixture(params=["5.0.0", "8.0.0"]) -def unsupported_pyarrow_version_that_exists(request): - orig_version = pa.__version__ - pa.__version__ = request.param - # Unset pyarrow version cache. - import ray._private.utils as utils - - utils._PYARROW_VERSION = None - yield request.param - pa.__version__ = orig_version - - @pytest.fixture def disable_pyarrow_version_check(): os.environ["RAY_DISABLE_PYARROW_VERSION_CHECK"] = "1" diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 778660c7ca8a..f5582a2ea796 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -4767,14 +4767,10 @@ def test_random_shuffle_check_random(shutdown_only): prev = x -def test_unsupported_pyarrow_versions_check( - shutdown_only, unsupported_pyarrow_version_that_exists -): +def test_unsupported_pyarrow_versions_check(shutdown_only, unsupported_pyarrow_version): # Test that unsupported pyarrow versions cause an error to be raised upon the # initial pyarrow use. - ray.init( - runtime_env={"pip": [f"pyarrow=={unsupported_pyarrow_version_that_exists}"]} - ) + ray.init(runtime_env={"pip": [f"pyarrow=={unsupported_pyarrow_version}"]}) # Test Arrow-native creation APIs. # Test range_table. @@ -4796,14 +4792,14 @@ def test_unsupported_pyarrow_versions_check( def test_unsupported_pyarrow_versions_check_disabled( shutdown_only, - unsupported_pyarrow_version_that_exists, + unsupported_pyarrow_version, disable_pyarrow_version_check, ): # Test that unsupported pyarrow versions DO NOT cause an error to be raised upon the # initial pyarrow use when the version check is disabled. ray.init( runtime_env={ - "pip": [f"pyarrow=={unsupported_pyarrow_version_that_exists}"], + "pip": [f"pyarrow=={unsupported_pyarrow_version}"], "env_vars": {"RAY_DISABLE_PYARROW_VERSION_CHECK": "1"}, }, ) diff --git a/python/ray/data/tests/test_dataset_formats.py b/python/ray/data/tests/test_dataset_formats.py index d6af782f4123..6be33a32eb92 100644 --- a/python/ray/data/tests/test_dataset_formats.py +++ b/python/ray/data/tests/test_dataset_formats.py @@ -312,25 +312,20 @@ def get_node_id(): def test_read_s3_file_error(ray_start_regular_shared, s3_path): dummy_path = s3_path + "_dummy" error_message = "Please check that file exists and has properly configured access." - with pytest.raises(OSError) as e: + with pytest.raises(OSError, match=error_message): ray.data.read_parquet(dummy_path) - assert error_message in str(e.value) - with pytest.raises(OSError) as e: + with pytest.raises(OSError, match=error_message): ray.data.read_binary_files(dummy_path) - assert error_message in str(e.value) - with pytest.raises(OSError) as e: + with pytest.raises(OSError, match=error_message): ray.data.read_csv(dummy_path) - assert error_message in str(e.value) - with pytest.raises(OSError) as e: + with pytest.raises(OSError, match=error_message): ray.data.read_json(dummy_path) - assert error_message in str(e.value) - with pytest.raises(OSError) as e: + with pytest.raises(OSError, match=error_message): error = OSError( f"Error creating dataset. Could not read schema from {dummy_path}: AWS " "Error [code 15]: No response body.. Is this a 'parquet' file?" ) _handle_read_os_error(error, dummy_path) - assert error_message in str(e.value) if __name__ == "__main__": diff --git a/python/ray/tests/test_storage.py b/python/ray/tests/test_storage.py index 3c070cabcf90..f216705b480e 100644 --- a/python/ray/tests/test_storage.py +++ b/python/ray/tests/test_storage.py @@ -3,12 +3,17 @@ import urllib from pathlib import Path +from pkg_resources._vendor.packaging.version import parse as parse_version import pyarrow.fs import pytest import ray import ray._private.storage as storage from ray._private.test_utils import simulate_storage +from ray._private.utils import ( + _add_creatable_buckets_param_if_s3_uri, + _get_pyarrow_version, +) from ray.tests.conftest import * # noqa @@ -166,6 +171,39 @@ def test_connecting_to_cluster(shutdown_only, storage_type): subprocess.check_call(["ray", "stop"]) +def test_add_creatable_buckets_param_if_s3_uri(): + if parse_version(_get_pyarrow_version()) >= parse_version("9.0.0"): + # Test that the allow_bucket_creation=true query arg is added to an S3 URI. + uri = "s3://bucket/foo" + assert ( + _add_creatable_buckets_param_if_s3_uri(uri) + == "s3://bucket/foo?allow_bucket_creation=true" + ) + + # Test that query args are merged (i.e. existing query args aren't dropped). + uri = "s3://bucket/foo?bar=baz" + assert ( + _add_creatable_buckets_param_if_s3_uri(uri) + == "s3://bucket/foo?allow_bucket_creation=true&bar=baz" + ) + + # Test that existing allow_bucket_creation=false query arg isn't overridden. + uri = "s3://bucket/foo?allow_bucket_creation=false" + assert ( + _add_creatable_buckets_param_if_s3_uri(uri) + == "s3://bucket/foo?allow_bucket_creation=false" + ) + else: + # Test that the allow_bucket_creation=true query arg is not added to an S3 URI, + # since we're using Arrow < 9. + uri = "s3://bucket/foo" + assert _add_creatable_buckets_param_if_s3_uri(uri) == uri + + # Test that non-S3 URI is unchanged. + uri = "gcs://bucket/foo" + assert _add_creatable_buckets_param_if_s3_uri(uri) == "gcs://bucket/foo" + + if __name__ == "__main__": import sys diff --git a/python/requirements.txt b/python/requirements.txt index 48fc14bbafb4..f229ffc4b7c7 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -57,7 +57,7 @@ aiorwlock opentelemetry-exporter-otlp==1.1.0 starlette typer -pyarrow >= 6.0.1, < 8.0.0; python_version >= '3.7' and platform_system != "Windows" +pyarrow >= 6.0.1; python_version >= '3.7' and platform_system != "Windows" pyarrow >= 6.0.1, < 7.0.0; python_version < '3.7' or platform_system == "Windows" aiohttp_cors opentelemetry-api==1.1.0