diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index d7ad04a2d1..43a1b3e23c 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -1,7 +1,7 @@ use async_stream::stream; use futures::stream::{BoxStream, StreamExt}; use itertools::Itertools; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, path::Path, sync::Arc}; use tokio::sync::mpsc::Sender; use globset::{GlobBuilder, GlobMatcher}; @@ -26,6 +26,11 @@ const SCHEME_SUFFIX_LEN: usize = "://".len(); /// the `glob` utility can only be used with POSIX-style paths. const GLOB_DELIMITER: &str = "/"; +// NOTE: We use the following suffixes to filter out Spark marker files +const MARKER_SUFFIXES: [&str; 1] = [".crc"]; +// NOTE: We use the following file names to filter out Spark marker files +const MARKER_FILES: [&str; 3] = ["_metadata", "_common_metadata", "_success"]; + #[derive(Clone)] pub(crate) struct GlobState { // Current path in dirtree and glob_fragments @@ -300,12 +305,24 @@ async fn ls_with_prefix_fallback( /// Helper to filter FileMetadata entries that should not be returned by globbing fn _should_return(fm: &FileMetadata) -> bool { + let file_path = fm.filepath.to_lowercase(); + let file_name = Path::new(&file_path).file_name().and_then(|f| f.to_str()); match fm.filetype { // Do not return size-0 File entries that end with "/" // These are usually used to demarcate "empty folders", since S3 is not really a filesystem // However they can lead to unexpected globbing behavior since most users do not expect them to exist FileType::File - if fm.filepath.ends_with(GLOB_DELIMITER) && fm.size.is_some_and(|s| s == 0) => + if file_path.ends_with(GLOB_DELIMITER) && fm.size.is_some_and(|s| s == 0) => + { + false + } + // Do not return Spark marker files + FileType::File + if MARKER_SUFFIXES + .iter() + .any(|suffix| file_path.ends_with(suffix)) + || file_name + .is_some_and(|file| MARKER_FILES.iter().any(|m_file| file == *m_file)) => { false } diff --git a/tests/integration/io/parquet/test_reads_s3_minio.py b/tests/integration/io/parquet/test_reads_s3_minio.py index f640513f65..cafdd9130a 100644 --- a/tests/integration/io/parquet/test_reads_s3_minio.py +++ b/tests/integration/io/parquet/test_reads_s3_minio.py @@ -36,3 +36,27 @@ def test_minio_parquet_read_no_files(minio_io_config): with pytest.raises(FileNotFoundError, match="Glob path had no matches:"): daft.read_parquet("s3://data-engineering-prod/foo/**.parquet", io_config=minio_io_config) + + +@pytest.mark.integration() +def test_minio_parquet_ignore_marker_files(minio_io_config): + bucket_name = "data-engineering-prod" + with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs: + target_paths = [ + f"s3://data-engineering-prod/X/no_ext_parquet_metadata", + f"s3://data-engineering-prod/Y/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet", + f"s3://data-engineering-prod/Z/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet", + ] + data = {"x": [1, 2, 3, 4]} + pa_table = pa.Table.from_pydict(data) + for path in target_paths: + pq.write_table(pa_table, path, filesystem=fs) + + marker_files = ["_metadata", "_SUCCESS", "_common_metadata", "a.crc"] + for marker in marker_files: + fs.touch(f"s3://{bucket_name}/X/{marker}") + fs.touch(f"s3://{bucket_name}/Y/{marker}") + fs.touch(f"s3://{bucket_name}/Z/{marker}") + + read = daft.read_parquet(f"s3://{bucket_name}/**", io_config=minio_io_config) + assert read.to_pydict() == {"x": [1, 2, 3, 4] * 3}