Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS S3 IO through KvikIO #16499

Open
wants to merge 14 commits into
base: branch-24.12
Choose a base branch
from
87 changes: 85 additions & 2 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
#include <cudf/utilities/span.hpp>

#include <kvikio/file_handle.hpp>
#include <kvikio/remote_handle.hpp>

#include <rmm/device_buffer.hpp>

#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#include <regex>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -390,6 +392,86 @@ class user_datasource_wrapper : public datasource {
datasource* const source; ///< A non-owning pointer to the user-implemented datasource
};

/**
* @brief Remote file source backed by KvikIO, which handles S3 filepaths seamlessly.
*/
class remote_file_source : public datasource {
static std::unique_ptr<kvikio::S3Endpoint> create_s3_endpoint(char const* filepath)
{
auto [bucket_name, bucket_object] = kvikio::S3Endpoint::parse_s3_url(filepath);
return std::make_unique<kvikio::S3Endpoint>(bucket_name, bucket_object);
}

public:
explicit remote_file_source(char const* filepath) : _kvikio_file{create_s3_endpoint(filepath)} {}

~remote_file_source() override = default;

[[nodiscard]] bool supports_device_read() const override { return true; }

[[nodiscard]] bool is_device_read_preferred(size_t size) const override { return true; }
vuule marked this conversation as resolved.
Show resolved Hide resolved

[[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); }

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, this->size() - offset);
return _kvikio_file.pread(dst, read_size, offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: If insufficient memory is allocated in dst, does pread catch the exception thrown?

}

size_t device_read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
return device_read_async(offset, size, dst, stream).get();
}

std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream) override
{
rmm::device_buffer out_data(size, stream);
size_t read = device_read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read, stream);
return datasource::buffer::create(std::move(out_data));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
auto const read_size = std::min(size, this->size() - offset);
return _kvikio_file.pread(dst, read_size, offset).get();
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
auto const count = std::min(size, this->size() - offset);
std::vector<uint8_t> h_data(count);
this->host_read(offset, count, h_data.data());
return datasource::buffer::create(std::move(h_data));
}

/**
* @brief Is `url` referring to a remote file supported by KvikIO?
*
* For now, only S3 urls (urls starting with "s3://") are supported.
*/
static bool is_supported_remote_url(std::string const& url)
{
// Regular expression to match "s3://"
std::regex pattern{R"(^s3://)", std::regex_constants::icase};
return std::regex_search(url, pattern);
}

private:
kvikio::RemoteHandle _kvikio_file;
};

} // namespace

std::unique_ptr<datasource> datasource::create(std::string const& filepath,
Expand All @@ -404,8 +486,9 @@ std::unique_ptr<datasource> datasource::create(std::string const& filepath,

CUDF_FAIL("Invalid LIBCUDF_MMAP_ENABLED value: " + policy);
}();

if (use_memory_mapping) {
if (remote_file_source::is_supported_remote_url(filepath)) {
return std::make_unique<remote_file_source>(filepath.c_str());
} else if (use_memory_mapping) {
return std::make_unique<memory_mapped_source>(filepath.c_str(), offset, max_size_estimate);
} else {
// `file_source` reads the file directly, without memory mapping
Expand Down
16 changes: 16 additions & 0 deletions python/cudf/cudf/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,22 @@ def _integer_and_none_validator(val):
_make_contains_validator([False, True]),
)

_register_option(
"kvikio_remote_io",
_env_get_bool("CUDF_KVIKIO_REMOTE_IO", False),
textwrap.dedent(
"""
Whether to use KvikIO's remote IO backend or not.
\tWARN: this is experimental and may be removed at any time
\twithout warning or deprecation period.
\tSet KVIKIO_NTHREADS (default is 8) to change the number of
\tconcurrent tcp connections, which is important for good performance.
\tValid values are True or False. Default is False.
"""
madsbk marked this conversation as resolved.
Show resolved Hide resolved
),
_make_contains_validator([False, True]),
)


class option_context(ContextDecorator):
"""
Expand Down
11 changes: 11 additions & 0 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def s3_base(endpoint_ip, endpoint_port):
# with an S3 endpoint on localhost

endpoint_uri = f"http://{endpoint_ip}:{endpoint_port}/"
os.environ["AWS_ENDPOINT_URL"] = endpoint_uri

server = ThreadedMotoServer(ip_address=endpoint_ip, port=endpoint_port)
server.start()
Expand Down Expand Up @@ -105,6 +106,15 @@ def s3_context(s3_base, bucket, files=None):
pass


@pytest.fixture(
params=[True, False],
ids=["kvikio=ON", "kvikio=OFF"],
)
def kvikio_remote_io(request):
with cudf.option_context("kvikio_remote_io", request.param):
yield request.param


@pytest.fixture
def pdf(scope="module"):
df = pd.DataFrame()
Expand Down Expand Up @@ -193,6 +203,7 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):
def test_read_parquet(
s3_base,
s3so,
kvikio_remote_io,
pdf,
bytes_per_thread,
columns,
Expand Down
33 changes: 25 additions & 8 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import pandas as pd
from fsspec.core import expand_paths_if_needed, get_fs_token_paths

import cudf
from cudf.api.types import is_list_like
from cudf.core._compat import PANDAS_LT_300
from cudf.utils.docutils import docfmt_partial
Expand Down Expand Up @@ -1624,6 +1625,16 @@ def _maybe_expand_directories(paths, glob_pattern, fs):
return expanded_paths


def _use_kvikio_remote_io(fs) -> bool:
"""Whether `kvikio_remote_io` is enabled and `fs` refers to a S3 file"""

try:
from s3fs.core import S3FileSystem
except ImportError:
return False
return cudf.get_option("kvikio_remote_io") and isinstance(fs, S3FileSystem)


@doc_get_reader_filepath_or_buffer()
def get_reader_filepath_or_buffer(
path_or_data,
Expand All @@ -1649,17 +1660,17 @@ def get_reader_filepath_or_buffer(
)
]
if not input_sources:
raise ValueError("Empty input source list: {input_sources}.")
raise ValueError(f"Empty input source list: {input_sources}.")

filepaths_or_buffers = []
string_paths = [isinstance(source, str) for source in input_sources]
if any(string_paths):
# Sources are all strings. Thes strings are typically
# Sources are all strings. The strings are typically
# file paths, but they may also be raw text strings.

# Don't allow a mix of source types
if not all(string_paths):
raise ValueError("Invalid input source list: {input_sources}.")
raise ValueError(f"Invalid input source list: {input_sources}.")

# Make sure we define a filesystem (if possible)
paths = input_sources
Expand Down Expand Up @@ -1712,11 +1723,17 @@ def get_reader_filepath_or_buffer(
raise FileNotFoundError(
f"{input_sources} could not be resolved to any files"
)
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)

# If `kvikio_remote_io` is enabled and `fs` refers to a S3 file,
# we create S3 URLs and let them pass-through to libcudf.
if _use_kvikio_remote_io(fs):
filepaths_or_buffers = [f"s3://{fpath}" for fpath in paths]
else:
filepaths_or_buffers = _prefetch_remote_buffers(
paths,
fs,
**(prefetch_options or {}),
)
else:
raw_text_input = True

Expand Down
12 changes: 7 additions & 5 deletions python/pylibcudf/pylibcudf/io/types.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import codecs
import errno
import io
import os
import re

from pylibcudf.libcudf.io.json import \
json_recovery_mode_t as JSONRecoveryMode # no-cython-lint
Expand Down Expand Up @@ -143,6 +144,8 @@ cdef class SourceInfo:

Mixing different types of sources will raise a `ValueError`.
"""
# Regular expression that match remote file paths supported by libcudf
_is_remote_file_pattern = re.compile(r"^s3://", re.IGNORECASE)

def __init__(self, list sources):
if not sources:
Expand All @@ -157,11 +160,10 @@ cdef class SourceInfo:
for src in sources:
if not isinstance(src, (os.PathLike, str)):
raise ValueError("All sources must be of the same type!")
if not os.path.isfile(src):
raise FileNotFoundError(errno.ENOENT,
os.strerror(errno.ENOENT),
src)

if not (os.path.isfile(src) or self._is_remote_file_pattern.match(src)):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), src
)
c_files.push_back(<string> str(src).encode())

self.c_obj = move(source_info(c_files))
Expand Down
Loading