Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fixes globbing on windows by consolidating on posix-style paths #1472

Merged
merged 7 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,6 @@ impl AzureBlobSource {

#[async_trait]
impl ObjectSource for AzureBlobSource {
fn delimiter(&self) -> &'static str {
AZURE_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let container = match parsed.host_str() {
Expand Down
4 changes: 0 additions & 4 deletions src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,6 @@ impl GCSSource {

#[async_trait]
impl ObjectSource for GCSSource {
fn delimiter(&self) -> &'static str {
GCS_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
self.client.get(uri, range).await
}
Expand Down
4 changes: 0 additions & 4 deletions src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ impl HttpSource {

#[async_trait]
impl ObjectSource for HttpSource {
fn delimiter(&self) -> &'static str {
HTTP_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let request = self.client.get(uri);
let request = match range {
Expand Down
55 changes: 44 additions & 11 deletions src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use url::ParseError;

const PLATFORM_FS_DELIMITER: &str = std::path::MAIN_SEPARATOR_STR;
/// NOTE: We hardcode this even for Windows
///
/// For the most part, Windows machines work quite well with POSIX-style paths
/// as long as there is no "mix" of "\" and "/".
const PATH_SEGMENT_DELIMITER: &str = "/";

pub(crate) struct LocalSource {}

Expand Down Expand Up @@ -107,10 +111,6 @@ pub struct LocalFile {

#[async_trait]
impl ObjectSource for LocalSource {
fn delimiter(&self) -> &'static str {
PLATFORM_FS_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
const LOCAL_PROTOCOL: &str = "file://";
if let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) {
Expand Down Expand Up @@ -148,6 +148,14 @@ impl ObjectSource for LocalSource {
let fanout_limit = None;
let page_size = None;

// If on Windows, the delimiter provided may be "\" which is treated as an escape character by `glob`
// We sanitize our filepaths here but note that on-return we will be received POSIX-style paths as well
#[cfg(target_env = "msvc")]
{
let glob_path = glob_path.replace("\\", "/");
return glob(self, glob_path.as_str(), fanout_limit, page_size).await;
}

glob(self, glob_path, fanout_limit, page_size).await
}

Expand Down Expand Up @@ -208,6 +216,15 @@ impl ObjectSource for LocalSource {
let entry = entry.with_context(|_| UnableToFetchDirectoryEntriesSnafu {
path: uri.to_string(),
})?;

// NOTE: `entry` returned by ReadDirStream can potentially mix posix-delimiters ("/") and windows-delimiter ("\")
// on Windows machines if we naively use `entry.path()`. Manually concatting the entries to the uri is safer.
let path = format!(
"{}{PATH_SEGMENT_DELIMITER}{}",
uri.trim_end_matches(PATH_SEGMENT_DELIMITER),
entry.file_name().to_string_lossy()
);

let meta = tokio::fs::metadata(entry.path()).await.with_context(|_| {
UnableToFetchFileMetadataSnafu {
path: entry.path().to_string_lossy().to_string(),
Expand All @@ -217,8 +234,12 @@ impl ObjectSource for LocalSource {
filepath: format!(
"{}{}{}",
LOCAL_PROTOCOL,
entry.path().to_string_lossy(),
if meta.is_dir() { self.delimiter() } else { "" }
path,
if meta.is_dir() {
PATH_SEGMENT_DELIMITER
} else {
""
}
),
size: Some(meta.len()),
filetype: meta.file_type().try_into().with_context(|_| {
Expand Down Expand Up @@ -348,7 +369,7 @@ mod tests {
write_remote_parquet_to_local_file(&mut file2).await?;
let mut file3 = tempfile::NamedTempFile::new_in(dir.path()).unwrap();
write_remote_parquet_to_local_file(&mut file3).await?;
let dir_path = format!("file://{}", dir.path().to_string_lossy());
let dir_path = format!("file://{}", dir.path().to_string_lossy().replace("\\", "/"));
let client = LocalSource::get_client().await?;

let ls_result = client.ls(dir_path.as_ref(), true, None, None).await?;
Expand All @@ -357,17 +378,29 @@ mod tests {
files.sort_by(|a, b| a.filepath.cmp(&b.filepath));
let mut expected = vec![
FileMetadata {
filepath: format!("file://{}", file1.path().to_string_lossy()),
filepath: format!(
"file://{}/{}",
dir.path().to_string_lossy().replace("\\", "/"),
file1.path().file_name().unwrap().to_string_lossy(),
),
size: Some(file1.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
FileMetadata {
filepath: format!("file://{}", file2.path().to_string_lossy()),
filepath: format!(
"file://{}/{}",
dir.path().to_string_lossy().replace("\\", "/"),
file2.path().file_name().unwrap().to_string_lossy(),
),
size: Some(file2.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
FileMetadata {
filepath: format!("file://{}", file3.path().to_string_lossy()),
filepath: format!(
"file://{}/{}",
dir.path().to_string_lossy().replace("\\", "/"),
file3.path().file_name().unwrap().to_string_lossy(),
),
size: Some(file3.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
Expand Down
3 changes: 0 additions & 3 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ use async_stream::stream;

#[async_trait]
pub(crate) trait ObjectSource: Sync + Send {
/// Returns the delimiter for the platform (S3 vs GCS vs Azure vs local-unix vs Windows etc)
fn delimiter(&self) -> &'static str;

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult>;
async fn get_range(&self, uri: &str, range: Range<usize>) -> super::Result<GetResult> {
self.get(uri, Some(range)).await
Expand Down
36 changes: 20 additions & 16 deletions src/daft-io/src/object_store_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ lazy_static! {

const SCHEME_SUFFIX_LEN: usize = "://".len();

/// NOTE: Our globbing logic makes very strong assumptions about the delimiter being used to denote
/// directories. The concept of a "glob" is a Unix concept anyways, and so even for Windows machines
/// the `glob` utility can only be used with POSIX-style paths.
const GLOB_DELIMITER: &str = "/";

#[derive(Clone)]
pub(crate) struct GlobState {
// Current path in dirtree and glob_fragments
Expand Down Expand Up @@ -153,10 +158,7 @@ impl GlobFragment {
/// 2. Non-wildcard fragments are joined and coalesced by delimiter
/// 3. The first fragment is prefixed by "{scheme}://"
/// 4. Preserves any leading delimiters
pub(crate) fn to_glob_fragments(
glob_str: &str,
delimiter: &str,
) -> super::Result<Vec<GlobFragment>> {
pub(crate) fn to_glob_fragments(glob_str: &str) -> super::Result<Vec<GlobFragment>> {
// NOTE: We only use the URL parse library to get the scheme, because it will escape some of our glob special characters
// such as ? and {}
let glob_url = url::Url::parse(glob_str).map_err(|e| super::Error::InvalidUrl {
Expand All @@ -169,8 +171,8 @@ pub(crate) fn to_glob_fragments(

// NOTE: Leading delimiter may be important for absolute paths on local directory, and is considered
// part of the first fragment
let leading_delimiter = if glob_str_after_scheme.starts_with(delimiter) {
delimiter
let leading_delimiter = if glob_str_after_scheme.starts_with(GLOB_DELIMITER) {
GLOB_DELIMITER
} else {
""
};
Expand All @@ -179,7 +181,7 @@ pub(crate) fn to_glob_fragments(
let mut coalesced_fragments = vec![];
let mut nonspecial_fragments_so_far = vec![];
for fragment in glob_str_after_scheme
.split(delimiter)
.split(GLOB_DELIMITER)
.map(GlobFragment::new)
{
match fragment {
Expand All @@ -188,7 +190,7 @@ pub(crate) fn to_glob_fragments(
if !nonspecial_fragments_so_far.is_empty() {
coalesced_fragments.push(GlobFragment::join(
nonspecial_fragments_so_far.drain(..).as_slice(),
delimiter,
GLOB_DELIMITER,
));
}
coalesced_fragments.push(fragment);
Expand All @@ -201,7 +203,7 @@ pub(crate) fn to_glob_fragments(
if !nonspecial_fragments_so_far.is_empty() {
coalesced_fragments.push(GlobFragment::join(
nonspecial_fragments_so_far.drain(..).as_slice(),
delimiter,
GLOB_DELIMITER,
));
}

Expand Down Expand Up @@ -293,6 +295,10 @@ async fn ls_with_prefix_fallback(
/// Uses the `globset` crate for matching, and thus supports all the syntax enabled by that crate.
/// See: https://docs.rs/globset/latest/globset/#syntax
///
/// NOTE: Users of this function are responsible for sanitizing their paths and delimiters to follow the `globset` crate's expectations
/// in terms of delimiters. E.g. on Windows machines, callers of [`glob`] must convert all Windows-style "\" delimiters to "/" because
/// `globset` treats "\" as escape characters.
///
/// Arguments:
/// * source: the ObjectSource to use for file listing
/// * glob: the string to glob
Expand All @@ -306,9 +312,7 @@ pub(crate) async fn glob(
glob: &str,
fanout_limit: Option<usize>,
page_size: Option<i32>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let delimiter = source.delimiter();

) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
// If no special characters, we fall back to ls behavior
let full_fragment = GlobFragment::new(glob);
if !full_fragment.has_special_character() {
Expand All @@ -331,14 +335,14 @@ pub(crate) async fn glob(

// If user specifies a trailing / then we understand it as an attempt to list the folder(s) matched
// and append a trailing * fragment
let glob = if glob.ends_with(source.delimiter()) {
let glob = if glob.ends_with(GLOB_DELIMITER) {
glob.to_string() + "*"
} else {
glob.to_string()
};
let glob = glob.as_str();

let glob_fragments = to_glob_fragments(glob, delimiter)?;
let glob_fragments = to_glob_fragments(glob)?;
let full_glob_matcher = GlobBuilder::new(glob)
.literal_separator(true)
.backslash_escape(true)
Expand Down Expand Up @@ -468,7 +472,7 @@ pub(crate) async fn glob(
let partial_glob_matcher = GlobBuilder::new(
GlobFragment::join(
&state.glob_fragments[..state.current_fragment_idx + 1],
source.delimiter(),
GLOB_DELIMITER,
)
.raw_str(),
)
Expand All @@ -492,7 +496,7 @@ pub(crate) async fn glob(
Ok(fm) => match fm.filetype {
FileType::Directory
if partial_glob_matcher.is_match(
fm.filepath.as_str().trim_end_matches(source.delimiter()),
fm.filepath.as_str().trim_end_matches(GLOB_DELIMITER),
) =>
{
visit(
Expand Down
4 changes: 0 additions & 4 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,6 @@ impl S3LikeSource {

#[async_trait]
impl ObjectSource for S3LikeSource {
fn delimiter(&self) -> &'static str {
S3_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let permit = self
.connection_pool_sema
Expand Down
5 changes: 4 additions & 1 deletion tests/io/test_list_files_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def compare_local_result(daft_ls_result: list, fs_result: list):
# io_glob does not return directories
fs_files = [(p, t) for p, t in fs_files if t == "file"]

# io_glob returns posix-style paths
fs_files = [(p.replace("\\", "/"), t) for p, t in fs_files]

assert sorted(daft_files) == sorted(fs_files)


Expand Down Expand Up @@ -134,5 +137,5 @@ def test_missing_file_path(tmp_path, include_protocol):
p = f"{d}/c/cc/ddd"
if include_protocol:
p = "file://" + p
with pytest.raises(FileNotFoundError, match=f"File: {d}/c/cc/ddd not found"):
with pytest.raises(FileNotFoundError, match=f"/c/cc/ddd not found"):
io_glob(p)
Loading