Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed May 1, 2024
1 parent c421acb commit 60da15f
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ fn materialize_scan_task(
use pyo3::ToPyObject;

let table_iterators = scan_task.sources.iter().map(|source| {
// Call each Python function to create an Iterator
// Call Python function to create an Iterator (Grabs the GIL and then releases it)
match source {
DataFileSource::PythonFactoryFunction {
module,
Expand All @@ -369,16 +369,17 @@ fn materialize_scan_task(
});

let mut tables = Vec::new();
let mut remaining = scan_task.pushdowns.limit;
let mut rows_seen_so_far = 0;
for iterator in table_iterators {
let iterator = iterator?;

// If no rows remaining, break and skip having to call the rest of the iterators
if remaining.map(|r| r == 0).unwrap_or(false) {
break;
}

while remaining.map(|r| r > 0).unwrap_or(true) {
// Iterate on this iterator to exhaustion, or until the limit is met
while scan_task
.pushdowns
.limit
.map(|limit| rows_seen_so_far < limit)
.unwrap_or(true)
{
// Grab the GIL to call next() on the iterator, and then release it once we have the Table
let table = match Python::with_gil(|py| {
iterator
Expand Down Expand Up @@ -410,22 +411,35 @@ fn materialize_scan_task(
};

// Apply limit if necessary, and update `&mut remaining`
let table = if let Some(remaining) = remaining.as_mut() {
if table.len() > *remaining {
*remaining = 0;
let table = if let Some(limit) = scan_task.pushdowns.limit {
let limited_table = if rows_seen_so_far + table.len() > limit {
table
.slice(0, *remaining)
.slice(0, limit - rows_seen_so_far)
.with_context(|_| DaftCoreComputeSnafu)?
} else {
*remaining -= table.len();
table
}
};

// Update the rows_seen_so_far
rows_seen_so_far += limited_table.len();

limited_table
} else {
table
};

tables.push(table);
}

// If seen enough rows, early-terminate
if scan_task
.pushdowns
.limit
.map(|limit| rows_seen_so_far >= limit)
.unwrap_or(false)
{
break;
}
}

tables
Expand Down

0 comments on commit 60da15f

Please sign in to comment.