Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix runner check at plan execution time for new query planner #1435

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,9 @@ class PhysicalPlanScheduler:
A work scheduler for physical query plans.
"""

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: ...
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan: ...

class LogicalPlanBuilder:
"""
Expand Down
4 changes: 2 additions & 2 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from typing import Iterator, TypeVar, cast

from daft.context import get_context
from daft.daft import (
FileFormat,
FileFormatConfig,
Expand All @@ -29,10 +28,11 @@ def tabular_scan(
file_format_config: FileFormatConfig,
storage_config: StorageConfig,
limit: int,
is_ray_runner: bool,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
# TODO(Clark): Fix this Ray runner hack.
part = Table._from_pytable(file_info_table)
if get_context().is_ray_runner:
if is_ray_runner:
import ray

parts = [ray.put(part)]
Expand Down
4 changes: 3 additions & 1 deletion daft/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ class PhysicalPlanScheduler(ABC):
"""

@abstractmethod
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
pass
4 changes: 3 additions & 1 deletion daft/planner/py_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
def __init__(self, plan: logical_plan.LogicalPlan):
self._plan = plan

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
def to_partition_tasks(

Check warning on line 12 in daft/planner/py_planner.py

View check run for this annotation

Codecov / codecov/patch

daft/planner/py_planner.py#L12

Added line #L12 was not covered by tests
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(physical_plan_factory._get_physical_plan(self._plan, psets))
6 changes: 4 additions & 2 deletions daft/planner/rust_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ class RustPhysicalPlanScheduler(PhysicalPlanScheduler):
def __init__(self, scheduler: _PhysicalPlanScheduler):
self._scheduler = scheduler

def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets, is_ray_runner))
2 changes: 1 addition & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[Table]:
if entry.value is not None
}
# Get executable tasks from planner.
tasks = plan_scheduler.to_partition_tasks(psets)
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False)

with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
partitions_gen = self._physical_plan_to_partitions(tasks)
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def _run_plan(
from loguru import logger

# Get executable tasks from plan scheduler.
tasks = plan_scheduler.to_partition_tasks(psets)
tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=True)

# Note: For autoscaling clusters, we will probably want to query cores dynamically.
# Keep in mind this call takes about 0.3ms.
Expand Down
53 changes: 32 additions & 21 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ pub struct PhysicalPlanScheduler {
#[pymethods]
impl PhysicalPlanScheduler {
/// Converts the contained physical plan into an iterator of executable partition tasks.
pub fn to_partition_tasks(&self, psets: HashMap<String, Vec<PyObject>>) -> PyResult<PyObject> {
Python::with_gil(|py| self.plan.to_partition_tasks(py, &psets))
pub fn to_partition_tasks(
&self,
psets: HashMap<String, Vec<PyObject>>,
is_ray_runner: bool,
) -> PyResult<PyObject> {
Python::with_gil(|py| self.plan.to_partition_tasks(py, &psets, is_ray_runner))
}
}

Expand Down Expand Up @@ -98,6 +102,7 @@ impl PartitionIterator {
}

#[cfg(feature = "python")]
#[allow(clippy::too_many_arguments)]
fn tabular_scan(
py: Python<'_>,
source_schema: &SchemaRef,
Expand All @@ -106,6 +111,7 @@ fn tabular_scan(
file_format_config: &Arc<FileFormatConfig>,
storage_config: &Arc<StorageConfig>,
limit: &Option<usize>,
is_ray_runner: bool,
) -> PyResult<PyObject> {
let columns_to_read = projection_schema
.fields
Expand All @@ -123,6 +129,7 @@ fn tabular_scan(
PyFileFormatConfig::from(file_format_config.clone()),
PyStorageConfig::from(storage_config.clone()),
*limit,
is_ray_runner,
))?;
Ok(py_iter.into())
}
Expand Down Expand Up @@ -162,6 +169,7 @@ impl PhysicalPlan {
&self,
py: Python<'_>,
psets: &HashMap<String, Vec<PyObject>>,
is_ray_runner: bool,
) -> PyResult<PyObject> {
match self {
PhysicalPlan::InMemoryScan(InMemoryScan {
Expand Down Expand Up @@ -198,6 +206,7 @@ impl PhysicalPlan {
file_format_config,
storage_config,
limit,
is_ray_runner,
),
PhysicalPlan::TabularScanCsv(TabularScanCsv {
projection_schema,
Expand All @@ -219,6 +228,7 @@ impl PhysicalPlan {
file_format_config,
storage_config,
limit,
is_ray_runner,
),
PhysicalPlan::TabularScanJson(TabularScanJson {
projection_schema,
Expand All @@ -240,13 +250,14 @@ impl PhysicalPlan {
file_format_config,
storage_config,
limit,
is_ray_runner,
),
PhysicalPlan::Project(Project {
input,
projection,
resource_request,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let projection_pyexprs: Vec<PyExpr> = projection
.iter()
.map(|expr| PyExpr::from(expr.clone()))
Expand All @@ -258,7 +269,7 @@ impl PhysicalPlan {
Ok(py_iter.into())
}
PhysicalPlan::Filter(Filter { input, predicate }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let expressions_mod =
py.import(pyo3::intern!(py, "daft.expressions.expressions"))?;
let py_predicate = expressions_mod
Expand Down Expand Up @@ -287,7 +298,7 @@ impl PhysicalPlan {
limit,
num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_physical_plan =
py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?;
let local_limit_iter = py_physical_plan
Expand All @@ -299,7 +310,7 @@ impl PhysicalPlan {
Ok(global_limit_iter.into())
}
PhysicalPlan::Explode(Explode { input, to_explode }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let explode_pyexprs: Vec<PyExpr> = to_explode
.iter()
.map(|expr| PyExpr::from(expr.clone()))
Expand All @@ -316,7 +327,7 @@ impl PhysicalPlan {
descending,
num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let sort_by_pyexprs: Vec<PyExpr> = sort_by
.iter()
.map(|expr| PyExpr::from(expr.clone()))
Expand All @@ -337,15 +348,15 @@ impl PhysicalPlan {
input_num_partitions,
output_num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "split"))?
.call1((upstream_iter, *input_num_partitions, *output_num_partitions))?;
Ok(py_iter.into())
}
PhysicalPlan::Flatten(Flatten { input }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "flatten_plan"))?
Expand All @@ -356,7 +367,7 @@ impl PhysicalPlan {
input,
num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "fanout_random"))?
Expand All @@ -368,7 +379,7 @@ impl PhysicalPlan {
num_partitions,
partition_by,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let partition_by_pyexprs: Vec<PyExpr> = partition_by
.iter()
.map(|expr| PyExpr::from(expr.clone()))
Expand All @@ -383,7 +394,7 @@ impl PhysicalPlan {
"FanoutByRange not implemented, since only use case (sorting) doesn't need it yet."
),
PhysicalPlan::ReduceMerge(ReduceMerge { input }) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))?
.getattr(pyo3::intern!(py, "reduce_merge"))?
Expand All @@ -396,7 +407,7 @@ impl PhysicalPlan {
input,
..
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let aggs_as_pyexprs: Vec<PyExpr> = aggregations
.iter()
.map(|agg_expr| PyExpr::from(Expr::Agg(agg_expr.clone())))
Expand All @@ -416,16 +427,16 @@ impl PhysicalPlan {
num_from,
num_to,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "coalesce"))?
.call1((upstream_iter, *num_from, *num_to))?;
Ok(py_iter.into())
}
PhysicalPlan::Concat(Concat { other, input }) => {
let upstream_input_iter = input.to_partition_tasks(py, psets)?;
let upstream_other_iter = other.to_partition_tasks(py, psets)?;
let upstream_input_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let upstream_other_iter = other.to_partition_tasks(py, psets, is_ray_runner)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "concat"))?
Expand All @@ -440,8 +451,8 @@ impl PhysicalPlan {
join_type,
..
}) => {
let upstream_left_iter = left.to_partition_tasks(py, psets)?;
let upstream_right_iter = right.to_partition_tasks(py, psets)?;
let upstream_left_iter = left.to_partition_tasks(py, psets, is_ray_runner)?;
let upstream_right_iter = right.to_partition_tasks(py, psets, is_ray_runner)?;
let left_on_pyexprs: Vec<PyExpr> = left_on
.iter()
.map(|expr| PyExpr::from(expr.clone()))
Expand Down Expand Up @@ -474,7 +485,7 @@ impl PhysicalPlan {
input,
}) => tabular_write(
py,
input.to_partition_tasks(py, psets)?,
input.to_partition_tasks(py, psets, is_ray_runner)?,
file_format,
schema,
root_dir,
Expand All @@ -493,7 +504,7 @@ impl PhysicalPlan {
input,
}) => tabular_write(
py,
input.to_partition_tasks(py, psets)?,
input.to_partition_tasks(py, psets, is_ray_runner)?,
file_format,
schema,
root_dir,
Expand All @@ -512,7 +523,7 @@ impl PhysicalPlan {
input,
}) => tabular_write(
py,
input.to_partition_tasks(py, psets)?,
input.to_partition_tasks(py, psets, is_ray_runner)?,
file_format,
schema,
root_dir,
Expand Down
Loading