Skip to content

Commit

Permalink
[BUG] Add marker prefixes to filter during reads (#2726)
Browse files Browse the repository at this point in the history
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Aug 27, 2024
1 parent f6c2cd0 commit 9e8506e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 3 deletions.
13 changes: 10 additions & 3 deletions src/daft-io/src/object_store_glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const SCHEME_SUFFIX_LEN: usize = "://".len();
/// the `glob` utility can only be used with POSIX-style paths.
const GLOB_DELIMITER: &str = "/";

// NOTE: We use the following suffixes to filter out Spark marker files
// NOTE: We use the following suffixes to filter out Spark/Databricks marker files
const MARKER_SUFFIXES: [&str; 1] = [".crc"];
// NOTE: We use the following file names to filter out Spark marker files
// NOTE: We use the following file names to filter out Spark/Databricks marker files
const MARKER_FILES: [&str; 3] = ["_metadata", "_common_metadata", "_success"];
// NOTE: We use the following prefixes to filter out Spark/Databricks marker files
const MARKER_PREFIXES: [&str; 2] = ["_started", "_committed"];

#[derive(Clone)]
pub(crate) struct GlobState {
Expand Down Expand Up @@ -322,7 +324,12 @@ fn _should_return(fm: &FileMetadata) -> bool {
.iter()
.any(|suffix| file_path.ends_with(suffix))
|| file_name
.is_some_and(|file| MARKER_FILES.iter().any(|m_file| file == *m_file)) =>
.is_some_and(|file| MARKER_FILES.iter().any(|m_file| file == *m_file))
|| file_name.is_some_and(|file| {
MARKER_PREFIXES
.iter()
.any(|m_prefix| file.starts_with(m_prefix))
}) =>
{
false
}
Expand Down
24 changes: 24 additions & 0 deletions tests/integration/io/parquet/test_reads_s3_minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ def test_minio_parquet_ignore_marker_files(minio_io_config):
assert read.to_pydict() == {"x": [1, 2, 3, 4] * 3}


@pytest.mark.integration()
def test_minio_parquet_ignore_marker_prefixes(minio_io_config):
from datetime import datetime

bucket_name = "data-engineering-prod"
with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs:
target_paths = [
"s3://data-engineering-prod/X/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet",
"s3://data-engineering-prod/Y/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet",
]
data = {"x": [1, 2, 3, 4]}
pa_table = pa.Table.from_pydict(data)
for path in target_paths:
pq.write_table(pa_table, path, filesystem=fs)

marker_prefixes = ["_started", "_committed"]
for marker_prefix in marker_prefixes:
fs.touch(f"s3://{bucket_name}/X/{marker_prefix}_{datetime.now().isoformat()}")
fs.touch(f"s3://{bucket_name}/Y/{marker_prefix}_{datetime.now().isoformat()}")

read = daft.read_parquet(f"s3://{bucket_name}/**", io_config=minio_io_config)
assert read.to_pydict() == {"x": [1, 2, 3, 4] * 2}


@pytest.mark.integration()
def test_minio_parquet_read_mismatched_schemas_no_pushdown(minio_io_config):
# When we read files, we infer schema from the first file
Expand Down

0 comments on commit 9e8506e

Please sign in to comment.