From 4455416b1cfa7b9be2a5085854d28ecff06d6e05 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 30 Oct 2023 11:49:17 -0700 Subject: [PATCH 1/6] Init GlobScanOperator --- Cargo.lock | 2 + src/daft-io/src/lib.rs | 18 ++++++ src/daft-scan/Cargo.toml | 2 + src/daft-scan/src/glob.rs | 130 ++++++++++++++++++++++++++++++++++++++ src/daft-scan/src/lib.rs | 1 + 5 files changed, 153 insertions(+) create mode 100644 src/daft-scan/src/glob.rs diff --git a/Cargo.lock b/Cargo.lock index a098e5b3a9..e434d815a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1286,12 +1286,14 @@ dependencies = [ "common-error", "daft-core", "daft-dsl", + "daft-io", "daft-stats", "daft-table", "pyo3", "pyo3-log", "serde", "snafu", + "tokio", ] [[package]] diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 013eb9050c..9dd8bfe910 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -17,6 +17,7 @@ use lazy_static::lazy_static; pub mod python; pub use common_io_config::{AzureConfig, IOConfig, S3Config}; +use object_io::FileMetadata; pub use object_io::GetResult; #[cfg(feature = "python")] pub use python::register_modules; @@ -165,6 +166,23 @@ impl IOClient { Ok(new_source) } + pub async fn glob( + &self, + input: &str, + fanout_limit: Option, + page_size: Option, + io_stats: Option>, + ) -> Result> { + let (scheme, _) = parse_url(input)?; + let source = self.get_source(&scheme).await?; + let files: Vec = source + .glob(input, fanout_limit, page_size, io_stats) + .await? + .try_collect() + .await?; + Ok(files) + } + pub async fn single_url_get( &self, input: String, diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index 827fa777e8..91cef24997 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -2,12 +2,14 @@ common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} +daft-io = {path = "../daft-io", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} pyo3 = {workspace = true, optional = true} pyo3-log = {workspace = true} serde = {workspace = true} snafu = {workspace = true} +tokio = {workspace = true} [features] default = ["python"] diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs new file mode 100644 index 0000000000..0d53d71db4 --- /dev/null +++ b/src/daft-scan/src/glob.rs @@ -0,0 +1,130 @@ +use std::{fmt::Display, sync::Arc}; + +use common_error::DaftResult; +use daft_core::schema::SchemaRef; +use daft_io::{get_io_client, IOClient}; + +use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; +#[derive(Debug)] +pub struct GlobScanOperator { + glob_path: String, + file_type: FileType, + columns_to_select: Option>, + limit: Option, + schema: SchemaRef, + runtime: Arc, + io_config: Arc, +} + +fn run_glob( + glob_path: &str, + io_client: Arc, + runtime: Arc, +) -> DaftResult> { + runtime.block_on(async { + Ok(io_client + .as_ref() + .glob(glob_path, None, None, None) + .await? + .into_iter() + .map(|fm| fm.filepath) + .collect()) + }) +} + +impl GlobScanOperator { + pub fn try_new( + glob_path: &str, + file_type: FileType, + io_config: Arc, + runtime: Arc, + ) -> DaftResult { + // TODO: Allow for returning errors from this function + let io_client = get_io_client(false, io_config)?; + // TODO: Glob for first file using a limit + let paths = run_glob(glob_path, io_client, runtime)?; + let _first_filepath = paths[0].as_str(); + let schema = match file_type { + FileType::Csv => todo!(), + FileType::Parquet => todo!(), + FileType::Avro => todo!("Schema inference for Avro not implemented"), + FileType::Orc => todo!("Schema inference for Orc not implemented"), + }; + + Ok(Self { + glob_path: glob_path.to_string(), + file_type, + columns_to_select: None, + limit: None, + schema, + runtime: runtime.clone(), + io_config, + }) + } +} + +impl Display for GlobScanOperator { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#?}", self) + } +} + +impl ScanOperator for GlobScanOperator { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn partitioning_keys(&self) -> &[PartitionField] { + &[] + } + + fn num_partitions(&self) -> common_error::DaftResult { + unimplemented!("Cannot get number of partitions -- this will not be implemented."); + } + + fn select(self: Box, columns: &[&str]) -> common_error::DaftResult { + for c in columns { + if self.schema.get_field(c).is_err() { + return Err(common_error::DaftError::FieldNotFound(format!( + "{c} not found in {:?}", + self.columns_to_select + ))); + } + } + let mut to_rtn = self; + to_rtn.columns_to_select = Some(columns.iter().map(|s| s.to_string()).collect()); + Ok(to_rtn) + } + + fn limit(self: Box, num: usize) -> DaftResult { + let mut to_rtn = self; + to_rtn.limit = Some(num); + Ok(to_rtn) + } + + fn filter(self: Box, _predicate: &daft_dsl::Expr) -> DaftResult<(bool, ScanOperatorRef)> { + Ok((false, self)) + } + + fn to_scan_tasks( + self: Box, + ) -> DaftResult>>> { + let io_client = get_io_client(false, self.io_config.clone())?; + let files = run_glob(self.glob_path.as_str(), io_client, self.runtime.clone())?; + let iter = files.into_iter().map(move |f| { + let source = DataFileSource::AnonymousDataFile { + file_type: self.file_type, + path: f, + metadata: None, + partition_spec: None, + statistics: None, + }; + Ok(ScanTask { + source, + columns: self.columns_to_select.clone(), + limit: self.limit, + }) + }); + Ok(Box::new(iter)) + } +} diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index 018ba491af..f23fbb0cae 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -10,6 +10,7 @@ use daft_stats::{PartitionSpec, TableMetadata, TableStatistics}; use serde::{Deserialize, Serialize}; mod anonymous; +mod glob; #[cfg(feature = "python")] pub mod python; #[cfg(feature = "python")] From 042f7f2793015b318811e96ac6e91c9c016ee468 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 30 Oct 2023 13:12:09 -0700 Subject: [PATCH 2/6] Use single-threaded runtime --- src/daft-scan/src/glob.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 0d53d71db4..f6afe844a7 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use daft_io::{get_io_client, IOClient}; +use daft_io::{get_io_client, get_runtime}; use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; #[derive(Debug)] @@ -12,15 +12,13 @@ pub struct GlobScanOperator { columns_to_select: Option>, limit: Option, schema: SchemaRef, - runtime: Arc, io_config: Arc, } -fn run_glob( - glob_path: &str, - io_client: Arc, - runtime: Arc, -) -> DaftResult> { +fn run_glob(glob_path: &str, io_config: Arc) -> DaftResult> { + // Use single-threaded runtime which should be safe here as it is not shared across async contexts + let runtime = get_runtime(false)?; + let io_client = get_io_client(false, io_config)?; runtime.block_on(async { Ok(io_client .as_ref() @@ -37,16 +35,13 @@ impl GlobScanOperator { glob_path: &str, file_type: FileType, io_config: Arc, - runtime: Arc, ) -> DaftResult { - // TODO: Allow for returning errors from this function - let io_client = get_io_client(false, io_config)?; // TODO: Glob for first file using a limit - let paths = run_glob(glob_path, io_client, runtime)?; + let paths = run_glob(glob_path, io_config)?; let _first_filepath = paths[0].as_str(); let schema = match file_type { - FileType::Csv => todo!(), FileType::Parquet => todo!(), + FileType::Csv => todo!(), FileType::Avro => todo!("Schema inference for Avro not implemented"), FileType::Orc => todo!("Schema inference for Orc not implemented"), }; @@ -57,7 +52,6 @@ impl GlobScanOperator { columns_to_select: None, limit: None, schema, - runtime: runtime.clone(), io_config, }) } @@ -109,8 +103,7 @@ impl ScanOperator for GlobScanOperator { fn to_scan_tasks( self: Box, ) -> DaftResult>>> { - let io_client = get_io_client(false, self.io_config.clone())?; - let files = run_glob(self.glob_path.as_str(), io_client, self.runtime.clone())?; + let files = run_glob(self.glob_path.as_str(), self.io_config.clone())?; let iter = files.into_iter().map(move |f| { let source = DataFileSource::AnonymousDataFile { file_type: self.file_type, From 8aa54310ec3d7f1651ed149cdf6c9499376d6790 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 30 Oct 2023 14:57:56 -0700 Subject: [PATCH 3/6] Link with parquet and CSV schema inference --- Cargo.lock | 2 ++ src/daft-scan/Cargo.toml | 2 ++ src/daft-scan/src/glob.rs | 42 +++++++++++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e434d815a4..020640abca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1285,8 +1285,10 @@ version = "0.1.10" dependencies = [ "common-error", "daft-core", + "daft-csv", "daft-dsl", "daft-io", + "daft-parquet", "daft-stats", "daft-table", "pyo3", diff --git a/src/daft-scan/Cargo.toml b/src/daft-scan/Cargo.toml index 91cef24997..8603ea3dc6 100644 --- a/src/daft-scan/Cargo.toml +++ b/src/daft-scan/Cargo.toml @@ -1,8 +1,10 @@ [dependencies] common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} +daft-csv = {path = "../daft-csv", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-io = {path = "../daft-io", default-features = false} +daft-parquet = {path = "../daft-parquet", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} daft-table = {path = "../daft-table", default-features = false} pyo3 = {workspace = true, optional = true} diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index f6afe844a7..ab36a8d655 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc}; use common_error::DaftResult; use daft_core::schema::SchemaRef; -use daft_io::{get_io_client, get_runtime}; +use daft_io::{get_io_client, get_runtime, IOStatsContext}; use crate::{DataFileSource, FileType, PartitionField, ScanOperator, ScanOperatorRef, ScanTask}; #[derive(Debug)] @@ -31,17 +31,43 @@ fn run_glob(glob_path: &str, io_config: Arc) -> DaftResult, ) -> DaftResult { - // TODO: Glob for first file using a limit - let paths = run_glob(glob_path, io_config)?; - let _first_filepath = paths[0].as_str(); + // TODO: Limit return from run_glob + let paths = run_glob(glob_path, io_config.clone())?; + let first_filepath = paths[0].as_str(); + let schema = match file_type { - FileType::Parquet => todo!(), - FileType::Csv => todo!(), + FileType::Parquet => { + let io_client = get_io_client(true, io_config.clone())?; // it appears that read_parquet_schema is hardcoded to use multithreaded_io + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator constructor read_parquet_schema: for uri {first_filepath}" + )); + daft_parquet::read::read_parquet_schema( + first_filepath, + io_client, + Some(io_stats), + Default::default(), // TODO: pass-through schema inference options + )? + } + FileType::Csv => { + let io_client = get_io_client(true, io_config.clone())?; // it appears that read_parquet_schema is hardcoded to use multithreaded_io + let io_stats = IOStatsContext::new(format!( + "GlobScanOperator constructor read_csv_schema: for uri {first_filepath}" + )); + let (schema, _, _, _, _) = daft_csv::metadata::read_csv_schema( + first_filepath, + true, // TODO: pass-through schema inference options + None, // TODO: pass-through schema inference options + None, // TODO: pass-through schema inference options + io_client, + Some(io_stats), + )?; + schema + } FileType::Avro => todo!("Schema inference for Avro not implemented"), FileType::Orc => todo!("Schema inference for Orc not implemented"), }; @@ -51,7 +77,7 @@ impl GlobScanOperator { file_type, columns_to_select: None, limit: None, - schema, + schema: Arc::new(schema), io_config, }) } From fabd9c0e97445b943666c6530bacdb1aa31a55ea Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 30 Oct 2023 15:02:14 -0700 Subject: [PATCH 4/6] Add rt guard --- src/daft-scan/src/glob.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index ab36a8d655..cfc56189cb 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -15,10 +15,12 @@ pub struct GlobScanOperator { io_config: Arc, } -fn run_glob(glob_path: &str, io_config: Arc) -> DaftResult> { +fn run_full_glob(glob_path: &str, io_config: Arc) -> DaftResult> { // Use single-threaded runtime which should be safe here as it is not shared across async contexts let runtime = get_runtime(false)?; let io_client = get_io_client(false, io_config)?; + + let _rt_guard = runtime.enter(); runtime.block_on(async { Ok(io_client .as_ref() @@ -37,7 +39,7 @@ impl GlobScanOperator { io_config: Arc, ) -> DaftResult { // TODO: Limit return from run_glob - let paths = run_glob(glob_path, io_config.clone())?; + let paths = run_full_glob(glob_path, io_config.clone())?; let first_filepath = paths[0].as_str(); let schema = match file_type { @@ -129,7 +131,7 @@ impl ScanOperator for GlobScanOperator { fn to_scan_tasks( self: Box, ) -> DaftResult>>> { - let files = run_glob(self.glob_path.as_str(), self.io_config.clone())?; + let files = run_full_glob(self.glob_path.as_str(), self.io_config.clone())?; let iter = files.into_iter().map(move |f| { let source = DataFileSource::AnonymousDataFile { file_type: self.file_type, From 41cb6323bfecd92bf1f828c4a9ebbee97f4fd2da Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Mon, 30 Oct 2023 15:47:43 -0700 Subject: [PATCH 5/6] Add crude mechanism for limiting number of returned results from io_glob --- daft/daft.pyi | 1 + src/daft-io/src/azure_blob.rs | 2 + src/daft-io/src/google_cloud.rs | 2 + src/daft-io/src/http.rs | 3 +- src/daft-io/src/lib.rs | 3 +- src/daft-io/src/local.rs | 13 +++- src/daft-io/src/object_io.rs | 1 + src/daft-io/src/object_store_glob.rs | 59 +++++++++++++------ src/daft-io/src/python.rs | 2 + src/daft-io/src/s3_like.rs | 2 + src/daft-scan/src/glob.rs | 13 ++-- .../io/test_list_files_s3_minio.py | 12 ++++ 12 files changed, 85 insertions(+), 28 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 8444e5c1fd..d830b4477f 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -848,4 +848,5 @@ def io_glob( io_config: IOConfig | None = None, fanout_limit: int | None = None, page_size: int | None = None, + limit: int | None = None, ) -> list[dict]: ... diff --git a/src/daft-io/src/azure_blob.rs b/src/daft-io/src/azure_blob.rs index 5ad53b4742..9b0130316c 100644 --- a/src/daft-io/src/azure_blob.rs +++ b/src/daft-io/src/azure_blob.rs @@ -473,6 +473,7 @@ impl ObjectSource for AzureBlobSource { glob_path: &str, fanout_limit: Option, page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>> { use crate::object_store_glob::glob; @@ -485,6 +486,7 @@ impl ObjectSource for AzureBlobSource { glob_path, fanout_limit, page_size.or(Some(1000)), + limit, io_stats, ) .await diff --git a/src/daft-io/src/google_cloud.rs b/src/daft-io/src/google_cloud.rs index cda1460041..dfb20a44de 100644 --- a/src/daft-io/src/google_cloud.rs +++ b/src/daft-io/src/google_cloud.rs @@ -407,6 +407,7 @@ impl ObjectSource for GCSSource { glob_path: &str, fanout_limit: Option, page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>> { use crate::object_store_glob::glob; @@ -419,6 +420,7 @@ impl ObjectSource for GCSSource { glob_path, fanout_limit, page_size.or(Some(1000)), + limit, io_stats, ) .await diff --git a/src/daft-io/src/http.rs b/src/daft-io/src/http.rs index c1cdc07764..01b143990a 100644 --- a/src/daft-io/src/http.rs +++ b/src/daft-io/src/http.rs @@ -250,6 +250,7 @@ impl ObjectSource for HttpSource { glob_path: &str, _fanout_limit: Option, _page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>> { use crate::object_store_glob::glob; @@ -258,7 +259,7 @@ impl ObjectSource for HttpSource { let fanout_limit = None; let page_size = None; - glob(self, glob_path, fanout_limit, page_size, io_stats).await + glob(self, glob_path, fanout_limit, page_size, limit, io_stats).await } async fn ls( diff --git a/src/daft-io/src/lib.rs b/src/daft-io/src/lib.rs index 9dd8bfe910..323dc96192 100644 --- a/src/daft-io/src/lib.rs +++ b/src/daft-io/src/lib.rs @@ -171,12 +171,13 @@ impl IOClient { input: &str, fanout_limit: Option, page_size: Option, + limit: Option, io_stats: Option>, ) -> Result> { let (scheme, _) = parse_url(input)?; let source = self.get_source(&scheme).await?; let files: Vec = source - .glob(input, fanout_limit, page_size, io_stats) + .glob(input, fanout_limit, page_size, limit, io_stats) .await? .try_collect() .await?; diff --git a/src/daft-io/src/local.rs b/src/daft-io/src/local.rs index fd27d87d27..13e1dd3845 100644 --- a/src/daft-io/src/local.rs +++ b/src/daft-io/src/local.rs @@ -147,6 +147,7 @@ impl ObjectSource for LocalSource { glob_path: &str, _fanout_limit: Option, _page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>> { use crate::object_store_glob::glob; @@ -160,10 +161,18 @@ impl ObjectSource for LocalSource { #[cfg(target_env = "msvc")] { let glob_path = glob_path.replace("\\", "/"); - return glob(self, glob_path.as_str(), fanout_limit, page_size, io_stats).await; + return glob( + self, + glob_path.as_str(), + fanout_limit, + page_size, + limit, + io_stats, + ) + .await; } - glob(self, glob_path, fanout_limit, page_size, io_stats).await + glob(self, glob_path, fanout_limit, page_size, limit, io_stats).await } async fn ls( diff --git a/src/daft-io/src/object_io.rs b/src/daft-io/src/object_io.rs index c0429c687d..592d33f410 100644 --- a/src/daft-io/src/object_io.rs +++ b/src/daft-io/src/object_io.rs @@ -116,6 +116,7 @@ pub(crate) trait ObjectSource: Sync + Send { glob_path: &str, fanout_limit: Option, page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>>; diff --git a/src/daft-io/src/object_store_glob.rs b/src/daft-io/src/object_store_glob.rs index d7de111795..b8d46b13f3 100644 --- a/src/daft-io/src/object_store_glob.rs +++ b/src/daft-io/src/object_store_glob.rs @@ -320,18 +320,21 @@ pub(crate) async fn glob( glob: &str, fanout_limit: Option, page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>> { // If no special characters, we fall back to ls behavior let full_fragment = GlobFragment::new(glob); if !full_fragment.has_special_character() { + let mut remaining_results = limit; let glob = full_fragment.escaped_str().to_string(); return Ok(stream! { let mut results = source.iter_dir(glob.as_str(), true, page_size, io_stats).await?; - while let Some(result) = results.next().await { + while let Some(result) = results.next().await && remaining_results.map(|rr| rr > 0).unwrap_or(true) { match result { Ok(fm) => { if matches!(fm.filetype, FileType::File) { + remaining_results = remaining_results.map(|rr| rr - 1); yield Ok(fm) } }, @@ -362,7 +365,7 @@ pub(crate) async fn glob( .compile_matcher(); // Channel to send results back to caller. Note that all results must have FileType::File. - let (to_rtn_tx, mut to_rtn_rx) = tokio::sync::mpsc::channel(16 * 1024); + let (to_rtn_tx, mut to_rtn_rx) = tokio::sync::mpsc::channel(limit.unwrap_or(16 * 1024)); /// Dispatches a task to visit the specified `path` (a concrete path on the filesystem to either a File or Directory). /// Based on the current glob_fragment being processed (accessible via `glob_fragments[i]`) this task will: @@ -419,16 +422,20 @@ pub(crate) async fn glob( FileType::File if state.full_glob_matcher.is_match(fm.filepath.as_str()) => { - result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect"); + if let Some(e) = result_tx.send(Ok(fm)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + }; } _ => (), } } // Silence NotFound errors when in wildcard "search" mode Err(super::Error::NotFound { .. }) if state.wildcard_mode => (), - Err(e) => result_tx.send(Err(e)).await.expect( - "Internal multithreading channel is broken: results may be incorrect", - ), + Err(e) => { + if let Some(e) = result_tx.send(Err(e)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } + } } } // BASE CASE: current fragment is the last fragment in `glob_fragments` @@ -446,14 +453,18 @@ pub(crate) async fn glob( if matches!(fm.filetype, FileType::File) && state.full_glob_matcher.is_match(fm.filepath.as_str()) { - result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect"); + if let Some(e) = result_tx.send(Ok(fm)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } } } // Silence NotFound errors when in wildcard "search" mode Err(super::Error::NotFound { .. }) if state.wildcard_mode => (), - Err(e) => result_tx.send(Err(e)).await.expect( - "Internal multithreading channel is broken: results may be incorrect", - ), + Err(e) => { + if let Some(e) = result_tx.send(Err(e)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } + } } } // Last fragment does not contain wildcard: we return it if the full path exists and is a FileType::File @@ -474,14 +485,18 @@ pub(crate) async fn glob( && matches!(single_file_ls.files[0].filetype, FileType::File) { let fm = single_file_ls.files.drain(..).next().unwrap(); - result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect"); + if let Some(e) = result_tx.send(Ok(fm)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } } } // Silence NotFound errors when in wildcard "search" mode Err(super::Error::NotFound { .. }) if state.wildcard_mode => (), - Err(e) => result_tx.send(Err(e)).await.expect( - "Internal multithreading channel is broken: results may be incorrect", - ), + Err(e) => { + if let Some(e) = result_tx.send(Err(e)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } + } }; } @@ -535,15 +550,19 @@ pub(crate) async fn glob( FileType::File if state.full_glob_matcher.is_match(fm.filepath.as_str()) => { - result_tx.send(Ok(fm)).await.expect("Internal multithreading channel is broken: results may be incorrect"); + if let Some(e) = result_tx.send(Ok(fm)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } } _ => (), }, // Always silence NotFound since we are in wildcard "search" mode here by definition Err(super::Error::NotFound { .. }) => (), - Err(e) => result_tx.send(Err(e)).await.expect( - "Internal multithreading channel is broken: results may be incorrect", - ), + Err(e) => { + if let Some(e) = result_tx.send(Err(e)).await.err() { + log::debug!("Sender unable to send results into channel during glob (this is expected if a limit was applied, which results in early termination): {e}"); + } + } } } @@ -579,7 +598,9 @@ pub(crate) async fn glob( ); let to_rtn_stream = stream! { - while let Some(v) = to_rtn_rx.recv().await { + let mut remaining_results = limit; + while remaining_results.map(|rr| rr > 0).unwrap_or(true) && let Some(v) = to_rtn_rx.recv().await { + remaining_results = remaining_results.map(|rr| rr - 1); yield v } }; diff --git a/src/daft-io/src/python.rs b/src/daft-io/src/python.rs index 1dba8af023..b85140b4b4 100644 --- a/src/daft-io/src/python.rs +++ b/src/daft-io/src/python.rs @@ -18,6 +18,7 @@ mod py { io_config: Option, fanout_limit: Option, page_size: Option, + limit: Option, ) -> PyResult<&PyList> { let multithreaded_io = multithreaded_io.unwrap_or(true); let io_stats = IOStatsContext::new(format!("io_glob for {path}")); @@ -39,6 +40,7 @@ mod py { path.as_ref(), fanout_limit, page_size, + limit, Some(io_stats_handle), ) .await? diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index f8b25c6e6d..73b7e8db79 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -781,6 +781,7 @@ impl ObjectSource for S3LikeSource { glob_path: &str, fanout_limit: Option, page_size: Option, + limit: Option, io_stats: Option, ) -> super::Result>> { use crate::object_store_glob::glob; @@ -793,6 +794,7 @@ impl ObjectSource for S3LikeSource { glob_path, fanout_limit, page_size.or(Some(1000)), + limit, io_stats, ) .await diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index cfc56189cb..6a49eaf23f 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -15,7 +15,11 @@ pub struct GlobScanOperator { io_config: Arc, } -fn run_full_glob(glob_path: &str, io_config: Arc) -> DaftResult> { +fn run_glob( + glob_path: &str, + io_config: Arc, + limit: Option, +) -> DaftResult> { // Use single-threaded runtime which should be safe here as it is not shared across async contexts let runtime = get_runtime(false)?; let io_client = get_io_client(false, io_config)?; @@ -24,7 +28,7 @@ fn run_full_glob(glob_path: &str, io_config: Arc) -> DaftResu runtime.block_on(async { Ok(io_client .as_ref() - .glob(glob_path, None, None, None) + .glob(glob_path, None, None, limit, None) .await? .into_iter() .map(|fm| fm.filepath) @@ -38,8 +42,7 @@ impl GlobScanOperator { file_type: FileType, io_config: Arc, ) -> DaftResult { - // TODO: Limit return from run_glob - let paths = run_full_glob(glob_path, io_config.clone())?; + let paths = run_glob(glob_path, io_config.clone(), Some(1))?; let first_filepath = paths[0].as_str(); let schema = match file_type { @@ -131,7 +134,7 @@ impl ScanOperator for GlobScanOperator { fn to_scan_tasks( self: Box, ) -> DaftResult>>> { - let files = run_full_glob(self.glob_path.as_str(), self.io_config.clone())?; + let files = run_glob(self.glob_path.as_str(), self.io_config.clone(), None)?; let iter = files.into_iter().map(move |f| { let source = DataFileSource::AnonymousDataFile { file_type: self.file_type, diff --git a/tests/integration/io/test_list_files_s3_minio.py b/tests/integration/io/test_list_files_s3_minio.py index 1d67768998..b5142447ca 100644 --- a/tests/integration/io/test_list_files_s3_minio.py +++ b/tests/integration/io/test_list_files_s3_minio.py @@ -362,3 +362,15 @@ def test_missing_file_path(minio_io_config, recursive): fs.write_bytes(f"s3://{bucket_name}/{name}", b"") with pytest.raises(FileNotFoundError, match=f"s3://{bucket_name}/c/cc/ddd"): daft_ls_result = io_glob(path, io_config=minio_io_config) + + +@pytest.mark.integration() +@pytest.mark.parametrize("limit", [1, 2]) +def test_limit(minio_io_config, limit): + bucket_name = "bucket" + with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs: + files = ["a", "b/bb", "c/cc/ccc"] + for name in files: + fs.write_bytes(f"s3://{bucket_name}/{name}", b"") + daft_ls_result = io_glob(f"s3://{bucket_name}/**", io_config=minio_io_config, limit=limit) + assert len(daft_ls_result) == limit From 8f68e9c49cfeb9169d651fe8237fadeb21771acc Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 31 Oct 2023 14:08:44 -0700 Subject: [PATCH 6/6] Use multithreaded runtime --- src/daft-scan/src/glob.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 6a49eaf23f..b7c2717f5f 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -20,9 +20,9 @@ fn run_glob( io_config: Arc, limit: Option, ) -> DaftResult> { - // Use single-threaded runtime which should be safe here as it is not shared across async contexts - let runtime = get_runtime(false)?; - let io_client = get_io_client(false, io_config)?; + // Use multi-threaded runtime which should be global Arc-ed cached singletons + let runtime = get_runtime(true)?; + let io_client = get_io_client(true, io_config)?; let _rt_guard = runtime.enter(); runtime.block_on(async {