-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] [New Executor] [2/N] daft-execution crate + proof-of-concept compute ops and partition reference + metadata model for new executor. #2340
Conversation
456ecfe
to
fc4cf7a
Compare
rayon = {workspace = true} | ||
snafu = {workspace = true} | ||
sysinfo = {workspace = true} | ||
tokio = {workspace = true} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these dependencies will be used in future PRs in the stack; I already double-checked that only those required by the last PR in the stack are included here.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2340 +/- ##
=======================================
Coverage ? 78.47%
=======================================
Files ? 487
Lines ? 56169
Branches ? 0
=======================================
Hits ? 44081
Misses ? 12088
Partials ? 0
|
input_meta: &[PartitionMetadata], | ||
) -> ResourceRequest { | ||
self.resource_request | ||
.or_memory_bytes(input_meta.iter().map(|m| m.size_bytes).sum()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should eventually take the max of the heap memory estimate for all fused ops in the chain, using max output size estimates from previous ops in the chain. This would require looping in the approximate stats estimate logic that's currently tied to the physical plan, which we previously talked about factoring out.
#[derive(Debug)] | ||
pub struct FusedOpBuilder<T> { | ||
// Task op at the front of the chain. | ||
source_op: Arc<dyn PartitionTaskOp<Input = T>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this look if we have some of these cases:
BroadcastJoin(ScanOP, ScanOP)
or
Concat(ScanOp, Micropartition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, current behavior in status quo execution model is to materialize scans before BroadcastJoin
and the like, so tabling this as a post proof-of-concept optimization.
input_meta: &[PartitionMetadata], | ||
) -> PartitionMetadata { | ||
assert_eq!(input_meta.len(), 1); | ||
let input_meta = &input_meta[0]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also be updating the number of bytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For sure, I'll fix that! Note that this is currently unused - the partial metadata machinery is still a TODO for all ops and the surrounding execution model, since no exchange or sink ops have needed it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm our current behavior is to set size_bytes
to None
, since we don't know the exact new size in bytes until we actually apply the limit. E.g. you could remove 99% of the rows, but 99% of the size in bytes could be in that remaining 1% of rows if those rows are particularly large.
Daft/daft/execution/execution_step.py
Line 552 in 021b103
size_bytes=None, |
assert_eq!(inputs.len(), 1); | ||
let input = inputs.into_iter().next().unwrap(); | ||
let out = input.add_monotonically_increasing_id( | ||
self.num_partitions.load(Ordering::SeqCst) as u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we be using fetch_add()
here instead of relying on with_input_metadata
being called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with_input_metadata()
is guaranteed to be called within the scheduler right before submission, while execute()
is called at task execution time, potentially on a different machine if using a distributed executor, which wouldn't update the partition counter for other to-be-executed tasks. For that reason, we should ensure such state is mutated within the scheduler before submission.
378ddd0
to
160b172
Compare
This PR adds the
daft-execution
subcrate containing a set of proof-of-concept local compute ops and the partition reference + metadata model for the new executor.Partial metadata machinery isn't yet implemented since no task scheduler or exchange op has required it yet.
TODOs