From 60da15fd9ec74da8c3d7879ad4e0c81e0dc647ca Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Wed, 1 May 2024 13:35:15 -0700 Subject: [PATCH] Cleanup code --- src/daft-micropartition/src/micropartition.rs | 42 ++++++++++++------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 6bbc33fe2f..18dc127a71 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -351,7 +351,7 @@ fn materialize_scan_task( use pyo3::ToPyObject; let table_iterators = scan_task.sources.iter().map(|source| { - // Call each Python function to create an Iterator + // Call Python function to create an Iterator (Grabs the GIL and then releases it) match source { DataFileSource::PythonFactoryFunction { module, @@ -369,16 +369,17 @@ fn materialize_scan_task( }); let mut tables = Vec::new(); - let mut remaining = scan_task.pushdowns.limit; + let mut rows_seen_so_far = 0; for iterator in table_iterators { let iterator = iterator?; - // If no rows remaining, break and skip having to call the rest of the iterators - if remaining.map(|r| r == 0).unwrap_or(false) { - break; - } - - while remaining.map(|r| r > 0).unwrap_or(true) { + // Iterate on this iterator to exhaustion, or until the limit is met + while scan_task + .pushdowns + .limit + .map(|limit| rows_seen_so_far < limit) + .unwrap_or(true) + { // Grab the GIL to call next() on the iterator, and then release it once we have the Table let table = match Python::with_gil(|py| { iterator @@ -410,22 +411,35 @@ fn materialize_scan_task( }; // Apply limit if necessary, and update `&mut remaining` - let table = if let Some(remaining) = remaining.as_mut() { - if table.len() > *remaining { - *remaining = 0; + let table = if let Some(limit) = scan_task.pushdowns.limit { + let limited_table = if rows_seen_so_far + table.len() > limit { table - .slice(0, *remaining) + .slice(0, limit - rows_seen_so_far) .with_context(|_| DaftCoreComputeSnafu)? } else { - *remaining -= table.len(); table - } + }; + + // Update the rows_seen_so_far + rows_seen_so_far += limited_table.len(); + + limited_table } else { table }; tables.push(table); } + + // If seen enough rows, early-terminate + if scan_task + .pushdowns + .limit + .map(|limit| rows_seen_so_far >= limit) + .unwrap_or(false) + { + break; + } } tables