Skip to content

Commit

Permalink
[BUG] Fixes globbing on windows by consolidating on posix-style paths (
Browse files Browse the repository at this point in the history
…#1472)

Addresses part of
[#1466](#1466) which is
currently failing because of globbing in windows on: #1467

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Oct 7, 2023
1 parent 54c666f commit 78fcfff
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 47 deletions.
4 changes: 0 additions & 4 deletions src/daft-io/src/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,6 @@ impl AzureBlobSource {

#[async_trait]
impl ObjectSource for AzureBlobSource {
fn delimiter(&self) -> &'static str {
AZURE_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let parsed = url::Url::parse(uri).with_context(|_| InvalidUrlSnafu { path: uri })?;
let container = match parsed.host_str() {
Expand Down
4 changes: 0 additions & 4 deletions src/daft-io/src/google_cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,6 @@ impl GCSSource {

#[async_trait]
impl ObjectSource for GCSSource {
fn delimiter(&self) -> &'static str {
GCS_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
self.client.get(uri, range).await
}
Expand Down
4 changes: 0 additions & 4 deletions src/daft-io/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ impl HttpSource {

#[async_trait]
impl ObjectSource for HttpSource {
fn delimiter(&self) -> &'static str {
HTTP_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let request = self.client.get(uri);
let request = match range {
Expand Down
55 changes: 44 additions & 11 deletions src/daft-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use url::ParseError;

const PLATFORM_FS_DELIMITER: &str = std::path::MAIN_SEPARATOR_STR;
/// NOTE: We hardcode this even for Windows
///
/// For the most part, Windows machines work quite well with POSIX-style paths
/// as long as there is no "mix" of "\" and "/".
const PATH_SEGMENT_DELIMITER: &str = "/";

pub(crate) struct LocalSource {}

Expand Down Expand Up @@ -107,10 +111,6 @@ pub struct LocalFile {

#[async_trait]
impl ObjectSource for LocalSource {
fn delimiter(&self) -> &'static str {
PLATFORM_FS_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
const LOCAL_PROTOCOL: &str = "file://";
if let Some(uri) = uri.strip_prefix(LOCAL_PROTOCOL) {
Expand Down Expand Up @@ -148,6 +148,14 @@ impl ObjectSource for LocalSource {
let fanout_limit = None;
let page_size = None;

// If on Windows, the delimiter provided may be "\" which is treated as an escape character by `glob`
// We sanitize our filepaths here but note that on-return we will be received POSIX-style paths as well
#[cfg(target_env = "msvc")]
{
let glob_path = glob_path.replace("\\", "/");
return glob(self, glob_path.as_str(), fanout_limit, page_size).await;
}

glob(self, glob_path, fanout_limit, page_size).await
}

Expand Down Expand Up @@ -208,6 +216,15 @@ impl ObjectSource for LocalSource {
let entry = entry.with_context(|_| UnableToFetchDirectoryEntriesSnafu {
path: uri.to_string(),
})?;

// NOTE: `entry` returned by ReadDirStream can potentially mix posix-delimiters ("/") and windows-delimiter ("\")
// on Windows machines if we naively use `entry.path()`. Manually concatting the entries to the uri is safer.
let path = format!(
"{}{PATH_SEGMENT_DELIMITER}{}",
uri.trim_end_matches(PATH_SEGMENT_DELIMITER),
entry.file_name().to_string_lossy()
);

let meta = tokio::fs::metadata(entry.path()).await.with_context(|_| {
UnableToFetchFileMetadataSnafu {
path: entry.path().to_string_lossy().to_string(),
Expand All @@ -217,8 +234,12 @@ impl ObjectSource for LocalSource {
filepath: format!(
"{}{}{}",
LOCAL_PROTOCOL,
entry.path().to_string_lossy(),
if meta.is_dir() { self.delimiter() } else { "" }
path,
if meta.is_dir() {
PATH_SEGMENT_DELIMITER
} else {
""
}
),
size: Some(meta.len()),
filetype: meta.file_type().try_into().with_context(|_| {
Expand Down Expand Up @@ -348,7 +369,7 @@ mod tests {
write_remote_parquet_to_local_file(&mut file2).await?;
let mut file3 = tempfile::NamedTempFile::new_in(dir.path()).unwrap();
write_remote_parquet_to_local_file(&mut file3).await?;
let dir_path = format!("file://{}", dir.path().to_string_lossy());
let dir_path = format!("file://{}", dir.path().to_string_lossy().replace("\\", "/"));
let client = LocalSource::get_client().await?;

let ls_result = client.ls(dir_path.as_ref(), true, None, None).await?;
Expand All @@ -357,17 +378,29 @@ mod tests {
files.sort_by(|a, b| a.filepath.cmp(&b.filepath));
let mut expected = vec![
FileMetadata {
filepath: format!("file://{}", file1.path().to_string_lossy()),
filepath: format!(
"file://{}/{}",
dir.path().to_string_lossy().replace("\\", "/"),
file1.path().file_name().unwrap().to_string_lossy(),
),
size: Some(file1.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
FileMetadata {
filepath: format!("file://{}", file2.path().to_string_lossy()),
filepath: format!(
"file://{}/{}",
dir.path().to_string_lossy().replace("\\", "/"),
file2.path().file_name().unwrap().to_string_lossy(),
),
size: Some(file2.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
FileMetadata {
filepath: format!("file://{}", file3.path().to_string_lossy()),
filepath: format!(
"file://{}/{}",
dir.path().to_string_lossy().replace("\\", "/"),
file3.path().file_name().unwrap().to_string_lossy(),
),
size: Some(file3.as_file().metadata().unwrap().len()),
filetype: FileType::File,
},
Expand Down
3 changes: 0 additions & 3 deletions src/daft-io/src/object_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ use async_stream::stream;

#[async_trait]
pub(crate) trait ObjectSource: Sync + Send {
/// Returns the delimiter for the platform (S3 vs GCS vs Azure vs local-unix vs Windows etc)
fn delimiter(&self) -> &'static str;

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult>;
async fn get_range(&self, uri: &str, range: Range<usize>) -> super::Result<GetResult> {
self.get(uri, Some(range)).await
Expand Down
36 changes: 20 additions & 16 deletions src/daft-io/src/object_store_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ lazy_static! {

const SCHEME_SUFFIX_LEN: usize = "://".len();

/// NOTE: Our globbing logic makes very strong assumptions about the delimiter being used to denote
/// directories. The concept of a "glob" is a Unix concept anyways, and so even for Windows machines
/// the `glob` utility can only be used with POSIX-style paths.
const GLOB_DELIMITER: &str = "/";

#[derive(Clone)]
pub(crate) struct GlobState {
// Current path in dirtree and glob_fragments
Expand Down Expand Up @@ -153,10 +158,7 @@ impl GlobFragment {
/// 2. Non-wildcard fragments are joined and coalesced by delimiter
/// 3. The first fragment is prefixed by "{scheme}://"
/// 4. Preserves any leading delimiters
pub(crate) fn to_glob_fragments(
glob_str: &str,
delimiter: &str,
) -> super::Result<Vec<GlobFragment>> {
pub(crate) fn to_glob_fragments(glob_str: &str) -> super::Result<Vec<GlobFragment>> {
// NOTE: We only use the URL parse library to get the scheme, because it will escape some of our glob special characters
// such as ? and {}
let glob_url = url::Url::parse(glob_str).map_err(|e| super::Error::InvalidUrl {
Expand All @@ -169,8 +171,8 @@ pub(crate) fn to_glob_fragments(

// NOTE: Leading delimiter may be important for absolute paths on local directory, and is considered
// part of the first fragment
let leading_delimiter = if glob_str_after_scheme.starts_with(delimiter) {
delimiter
let leading_delimiter = if glob_str_after_scheme.starts_with(GLOB_DELIMITER) {
GLOB_DELIMITER
} else {
""
};
Expand All @@ -179,7 +181,7 @@ pub(crate) fn to_glob_fragments(
let mut coalesced_fragments = vec![];
let mut nonspecial_fragments_so_far = vec![];
for fragment in glob_str_after_scheme
.split(delimiter)
.split(GLOB_DELIMITER)
.map(GlobFragment::new)
{
match fragment {
Expand All @@ -188,7 +190,7 @@ pub(crate) fn to_glob_fragments(
if !nonspecial_fragments_so_far.is_empty() {
coalesced_fragments.push(GlobFragment::join(
nonspecial_fragments_so_far.drain(..).as_slice(),
delimiter,
GLOB_DELIMITER,
));
}
coalesced_fragments.push(fragment);
Expand All @@ -201,7 +203,7 @@ pub(crate) fn to_glob_fragments(
if !nonspecial_fragments_so_far.is_empty() {
coalesced_fragments.push(GlobFragment::join(
nonspecial_fragments_so_far.drain(..).as_slice(),
delimiter,
GLOB_DELIMITER,
));
}

Expand Down Expand Up @@ -293,6 +295,10 @@ async fn ls_with_prefix_fallback(
/// Uses the `globset` crate for matching, and thus supports all the syntax enabled by that crate.
/// See: https://docs.rs/globset/latest/globset/#syntax
///
/// NOTE: Users of this function are responsible for sanitizing their paths and delimiters to follow the `globset` crate's expectations
/// in terms of delimiters. E.g. on Windows machines, callers of [`glob`] must convert all Windows-style "\" delimiters to "/" because
/// `globset` treats "\" as escape characters.
///
/// Arguments:
/// * source: the ObjectSource to use for file listing
/// * glob: the string to glob
Expand All @@ -306,9 +312,7 @@ pub(crate) async fn glob(
glob: &str,
fanout_limit: Option<usize>,
page_size: Option<i32>,
) -> super::Result<BoxStream<super::Result<FileMetadata>>> {
let delimiter = source.delimiter();

) -> super::Result<BoxStream<'static, super::Result<FileMetadata>>> {
// If no special characters, we fall back to ls behavior
let full_fragment = GlobFragment::new(glob);
if !full_fragment.has_special_character() {
Expand All @@ -331,14 +335,14 @@ pub(crate) async fn glob(

// If user specifies a trailing / then we understand it as an attempt to list the folder(s) matched
// and append a trailing * fragment
let glob = if glob.ends_with(source.delimiter()) {
let glob = if glob.ends_with(GLOB_DELIMITER) {
glob.to_string() + "*"
} else {
glob.to_string()
};
let glob = glob.as_str();

let glob_fragments = to_glob_fragments(glob, delimiter)?;
let glob_fragments = to_glob_fragments(glob)?;
let full_glob_matcher = GlobBuilder::new(glob)
.literal_separator(true)
.backslash_escape(true)
Expand Down Expand Up @@ -468,7 +472,7 @@ pub(crate) async fn glob(
let partial_glob_matcher = GlobBuilder::new(
GlobFragment::join(
&state.glob_fragments[..state.current_fragment_idx + 1],
source.delimiter(),
GLOB_DELIMITER,
)
.raw_str(),
)
Expand All @@ -492,7 +496,7 @@ pub(crate) async fn glob(
Ok(fm) => match fm.filetype {
FileType::Directory
if partial_glob_matcher.is_match(
fm.filepath.as_str().trim_end_matches(source.delimiter()),
fm.filepath.as_str().trim_end_matches(GLOB_DELIMITER),
) =>
{
visit(
Expand Down
4 changes: 0 additions & 4 deletions src/daft-io/src/s3_like.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,10 +692,6 @@ impl S3LikeSource {

#[async_trait]
impl ObjectSource for S3LikeSource {
fn delimiter(&self) -> &'static str {
S3_DELIMITER
}

async fn get(&self, uri: &str, range: Option<Range<usize>>) -> super::Result<GetResult> {
let permit = self
.connection_pool_sema
Expand Down
5 changes: 4 additions & 1 deletion tests/io/test_list_files_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def compare_local_result(daft_ls_result: list, fs_result: list):
# io_glob does not return directories
fs_files = [(p, t) for p, t in fs_files if t == "file"]

# io_glob returns posix-style paths
fs_files = [(p.replace("\\", "/"), t) for p, t in fs_files]

assert sorted(daft_files) == sorted(fs_files)


Expand Down Expand Up @@ -134,5 +137,5 @@ def test_missing_file_path(tmp_path, include_protocol):
p = f"{d}/c/cc/ddd"
if include_protocol:
p = "file://" + p
with pytest.raises(FileNotFoundError, match=f"File: {d}/c/cc/ddd not found"):
with pytest.raises(FileNotFoundError, match=f"/c/cc/ddd not found"):
io_glob(p)

0 comments on commit 78fcfff

Please sign in to comment.