Skip to content

Commit

Permalink
[CHORE] [New Query Planner] [1/N] Remove Python query planner. (#1538)
Browse files Browse the repository at this point in the history
This PR removes the old Python query planner, which is a prerequisite
for some breaking changes such as pushing (re)partitioning into the
physical plan.
  • Loading branch information
clarkzinzow authored Oct 27, 2023
1 parent bf0a388 commit d24f0df
Show file tree
Hide file tree
Showing 27 changed files with 136 additions and 3,241 deletions.
52 changes: 0 additions & 52 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import TYPE_CHECKING, ClassVar

if TYPE_CHECKING:
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.runner import Runner

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -56,18 +55,12 @@ def _get_runner_config_from_env() -> _RunnerConfig:
_RUNNER: Runner | None = None


def _get_planner_from_env() -> bool:
"""Returns whether or not to use the new query planner."""
return bool(int(os.getenv("DAFT_NEW_QUERY_PLANNER", default="1")))


@dataclasses.dataclass(frozen=True)
class DaftContext:
"""Global context for the current Daft execution environment"""

runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env)
disallow_set_runner: bool = False
use_rust_planner: bool = dataclasses.field(default_factory=_get_planner_from_env)

def runner(self) -> Runner:
global _RUNNER
Expand Down Expand Up @@ -117,15 +110,6 @@ def runner(self) -> Runner:
def is_ray_runner(self) -> bool:
return isinstance(self.runner_config, _RayRunnerConfig)

def logical_plan_builder_class(self) -> type[LogicalPlanBuilder]:
from daft.logical.logical_plan import PyLogicalPlanBuilder
from daft.logical.rust_logical_plan import RustLogicalPlanBuilder

if self.use_rust_planner:
return RustLogicalPlanBuilder
else:
return PyLogicalPlanBuilder


_DaftContext = DaftContext()

Expand Down Expand Up @@ -201,39 +185,3 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
)
_set_context(new_ctx)
return new_ctx


def set_new_planner() -> DaftContext:
"""Enable the new query planner.
WARNING: The new query planner is currently experimental and only partially implemented.
Alternatively, users can set this behavior via an environment variable: DAFT_NEW_QUERY_PLANNER=1
Returns:
DaftContext: Daft context after enabling the new query planner.
"""
old_ctx = get_context()
new_ctx = dataclasses.replace(
old_ctx,
use_rust_planner=True,
)
_set_context(new_ctx)
return new_ctx


def set_old_planner() -> DaftContext:
"""Enable the old query planner.
Alternatively, users can set this behavior via an environment variable: DAFT_NEW_QUERY_PLANNER=0
Returns:
DaftContext: Daft context after enabling the old query planner.
"""
old_ctx = get_context()
new_ctx = dataclasses.replace(
old_ctx,
use_rust_planner=False,
)
_set_context(new_ctx)
return new_ctx
2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ from typing import Any, Callable

from daft.runners.partitioning import PartitionCacheEntry
from daft.execution import physical_plan
from daft.planner.planner import PartitionT
from daft.plan_scheduler.physical_plan_scheduler import PartitionT
import pyarrow
import fsspec

Expand Down
6 changes: 3 additions & 3 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def _from_tables(cls, *parts: Table) -> "DataFrame":

context = get_context()
cache_entry = context.runner().put_partition_set_into_cache(result_pset)
builder = context.logical_plan_builder_class().from_in_memory_scan(cache_entry, parts[0].schema())
builder = LogicalPlanBuilder.from_in_memory_scan(cache_entry, parts[0].schema())
return cls(builder)

###
Expand Down Expand Up @@ -1175,7 +1175,7 @@ def _from_ray_dataset(cls, ds: "RayDataset") -> "DataFrame":

partition_set, schema = ray_runner_io.partition_set_from_ray_dataset(ds)
cache_entry = context.runner().put_partition_set_into_cache(partition_set)
builder = context.logical_plan_builder_class().from_in_memory_scan(
builder = LogicalPlanBuilder.from_in_memory_scan(
cache_entry,
schema=schema,
partition_spec=PartitionSpec(PartitionScheme.Unknown, partition_set.num_partitions()),
Expand Down Expand Up @@ -1244,7 +1244,7 @@ def _from_dask_dataframe(cls, ddf: "dask.DataFrame") -> "DataFrame":

partition_set, schema = ray_runner_io.partition_set_from_dask_dataframe(ddf)
cache_entry = context.runner().put_partition_set_into_cache(partition_set)
builder = context.logical_plan_builder_class().from_in_memory_scan(
builder = LogicalPlanBuilder.from_in_memory_scan(
cache_entry,
schema=schema,
partition_spec=PartitionSpec(PartitionScheme.Unknown, partition_set.num_partitions()),
Expand Down
191 changes: 0 additions & 191 deletions daft/execution/physical_plan_factory.py

This file was deleted.

3 changes: 1 addition & 2 deletions daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ def _get_tabular_files_scan(
else runner_io.get_schema_from_first_filepath(file_infos, file_format_config, storage_config)
)
# Construct plan
builder_cls = get_context().logical_plan_builder_class()
builder = builder_cls.from_tabular_scan(
builder = LogicalPlanBuilder.from_tabular_scan(
file_infos=file_infos,
schema=inferred_or_provided_schema,
file_format_config=file_format_config,
Expand Down
4 changes: 2 additions & 2 deletions daft/io/file_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from daft.context import get_context
from daft.daft import IOConfig, PartitionScheme, PartitionSpec
from daft.dataframe import DataFrame
from daft.logical.builder import LogicalPlanBuilder
from daft.runners.pyrunner import LocalPartitionSet
from daft.table import Table

Expand Down Expand Up @@ -47,8 +48,7 @@ def from_glob_path(path: str, io_config: Optional[IOConfig] = None) -> DataFrame
file_infos_table = Table._from_pytable(file_infos.to_table())
partition = LocalPartitionSet({0: file_infos_table})
cache_entry = context.runner().put_partition_set_into_cache(partition)
builder_cls = context.logical_plan_builder_class()
builder = builder_cls.from_in_memory_scan(
builder = LogicalPlanBuilder.from_in_memory_scan(
cache_entry,
schema=file_infos_table.schema(),
partition_spec=PartitionSpec(PartitionScheme.Unknown, partition.num_partitions()),
Expand Down
Loading

0 comments on commit d24f0df

Please sign in to comment.