Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Propagate configs to Ray remote functions #1707

Merged
merged 10 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 51 additions & 46 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,25 @@
raise ValueError(f"Unsupported DAFT_RUNNER variable: {runner}")


# Global Runner singleton, initialized when accessed through the DaftContext
_RUNNER: Runner | None = None


@dataclasses.dataclass(frozen=True)
@dataclasses.dataclass
class DaftContext:
"""Global context for the current Daft execution environment"""

daft_config: PyDaftConfig = PyDaftConfig()
runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env)
disallow_set_runner: bool = False
_runner: Runner | None = None

def runner(self) -> Runner:
global _RUNNER
if _RUNNER is not None:
return _RUNNER
if self._runner is not None:
return self._runner

if self.runner_config.name == "ray":
from daft.runners.ray_runner import RayRunner

assert isinstance(self.runner_config, _RayRunnerConfig)
_RUNNER = RayRunner(
self._runner = RayRunner(
daft_config=self.daft_config,
address=self.runner_config.address,
max_task_backlog=self.runner_config.max_task_backlog,
)
Expand All @@ -94,20 +92,16 @@
pass

assert isinstance(self.runner_config, _PyRunnerConfig)
_RUNNER = PyRunner(use_thread_pool=self.runner_config.use_thread_pool)
self._runner = PyRunner(daft_config=self.daft_config, use_thread_pool=self.runner_config.use_thread_pool)

else:
raise NotImplementedError(f"Runner config implemented: {self.runner_config.name}")

# Mark DaftContext as having the runner set, which prevents any subsequent setting of the config
# after the runner has been initialized once
global _DaftContext
_DaftContext = dataclasses.replace(
_DaftContext,
disallow_set_runner=True,
)
self.disallow_set_runner = True

return _RUNNER
return self._runner

@property
def is_ray_runner(self) -> bool:
Expand All @@ -121,11 +115,24 @@
return _DaftContext


def _set_context(ctx: DaftContext):
def set_context(ctx: DaftContext) -> DaftContext:
global _DaftContext

pop_context()
_DaftContext = ctx

return _DaftContext


def pop_context() -> DaftContext:
"""Helper used in tests and test fixtures to clear the global runner and allow for re-setting of configs."""
global _DaftContext

old_daft_context = _DaftContext
_DaftContext = DaftContext()

return old_daft_context


def set_runner_ray(
address: str | None = None,
Expand All @@ -150,24 +157,21 @@
Returns:
DaftContext: Daft context after setting the Ray runner
"""
old_ctx = get_context()
if old_ctx.disallow_set_runner:
ctx = get_context()
if ctx.disallow_set_runner:

Check warning on line 161 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L160-L161

Added lines #L160 - L161 were not covered by tests
if noop_if_initialized:
warnings.warn(
"Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call."
)
return old_ctx
return ctx

Check warning on line 166 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L166

Added line #L166 was not covered by tests
raise RuntimeError("Cannot set runner more than once")
new_ctx = dataclasses.replace(
old_ctx,
runner_config=_RayRunnerConfig(
address=address,
max_task_backlog=max_task_backlog,
),
disallow_set_runner=True,

ctx.runner_config = _RayRunnerConfig(

Check warning on line 169 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L169

Added line #L169 was not covered by tests
address=address,
max_task_backlog=max_task_backlog,
)
_set_context(new_ctx)
return new_ctx
ctx.disallow_set_runner = True
return ctx

Check warning on line 174 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L173-L174

Added lines #L173 - L174 were not covered by tests


def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
Expand All @@ -178,44 +182,45 @@
Returns:
DaftContext: Daft context after setting the Py runner
"""
old_ctx = get_context()
if old_ctx.disallow_set_runner:
ctx = get_context()
if ctx.disallow_set_runner:

Check warning on line 186 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L185-L186

Added lines #L185 - L186 were not covered by tests
raise RuntimeError("Cannot set runner more than once")
new_ctx = dataclasses.replace(
old_ctx,
runner_config=_PyRunnerConfig(use_thread_pool=use_thread_pool),
disallow_set_runner=True,
)
_set_context(new_ctx)
return new_ctx

ctx.runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool)
ctx.disallow_set_runner = True
return ctx

Check warning on line 191 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L189-L191

Added lines #L189 - L191 were not covered by tests


def set_config(
config: PyDaftConfig | None = None,
merge_scan_tasks_min_size_bytes: int | None = None,
merge_scan_tasks_max_size_bytes: int | None = None,
) -> DaftContext:
"""Globally sets various configuration parameters which control various aspects of Daft execution

Args:
config: A PyDaftConfig object to set the config to, before applying other kwargs. Defaults to None which indicates
that the old (current) config should be used.
merge_scan_tasks_min_size_bytes: Minimum size in bytes when merging ScanTasks when reading files from storage.
Increasing this value will make Daft perform more merging of files into a single partition before yielding,
which leads to bigger but fewer partitions. (Defaults to 64MB)
merge_scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage.
Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but
fewer partitions. (Defaults to 512MB)
"""
old_ctx = get_context()
ctx = get_context()
if ctx.disallow_set_runner:
raise RuntimeError(

Check warning on line 213 in daft/context.py

View check run for this annotation

Codecov / codecov/patch

daft/context.py#L213

Added line #L213 was not covered by tests
"Cannot call `set_config` after the runner has already been created. "
"Please call `set_config` before any calls to set the runner and before any dataframe creation or execution."
)

# Replace values in the DaftConfig with user-specified overrides
old_daft_config = old_ctx.daft_config
old_daft_config = ctx.daft_config if config is None else config
new_daft_config = old_daft_config.with_config_values(
merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes,
merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes,
)

new_ctx = dataclasses.replace(
old_ctx,
daft_config=new_daft_config,
)
_set_context(new_ctx)
return new_ctx
ctx.daft_config = new_daft_config
return ctx
9 changes: 4 additions & 5 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

import psutil

from daft.context import get_context
from daft.daft import (
FileFormatConfig,
FileInfos,
IOConfig,
PyDaftConfig,
ResourceRequest,
StorageConfig,
)
Expand Down Expand Up @@ -105,8 +105,9 @@ def get_schema_from_first_filepath(


class PyRunner(Runner[MicroPartition]):
def __init__(self, use_thread_pool: bool | None) -> None:
def __init__(self, daft_config: PyDaftConfig, use_thread_pool: bool | None) -> None:
super().__init__()
self.daft_config = daft_config
self._use_thread_pool: bool = use_thread_pool if use_thread_pool is not None else True

self.num_cpus = multiprocessing.cpu_count()
Expand All @@ -132,13 +133,11 @@ def run_iter(
# NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0
results_buffer_size: int | None = None,
) -> Iterator[PyMaterializedResult]:
daft_config = get_context().daft_config

# Optimize the logical plan.
builder = builder.optimize()
# Finalize the logical plan and get a physical plan scheduler for translating the
# physical plan to executable tasks.
plan_scheduler = builder.to_physical_plan_scheduler(daft_config)
plan_scheduler = builder.to_physical_plan_scheduler(self.daft_config)
psets = {
key: entry.value.values()
for key, entry in self._part_set_cache._uuid_to_partition_set.items()
Expand Down
Loading
Loading