Skip to content

Commit

Permalink
[FEAT] Add runner logic in PyRunner for ActorPoolProject (#2677)
Browse files Browse the repository at this point in the history
Adds remaining logic for linking the ActorPoolProject physical plan with
execution logic in the PyRunner

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
Co-authored-by: Kevin Wang <[email protected]>
  • Loading branch information
3 people authored Sep 5, 2024
1 parent c6afd78 commit 8a5170f
Show file tree
Hide file tree
Showing 21 changed files with 631 additions and 113 deletions.
4 changes: 4 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class ResourceRequest:
def with_num_cpus(self, num_cpus: float | None) -> ResourceRequest: ...
def with_num_gpus(self, num_gpus: float | None) -> ResourceRequest: ...
def with_memory_bytes(self, memory_bytes: int | None) -> ResourceRequest: ...
def __mul__(self, factor: float) -> ResourceRequest: ...
def __add__(self, other: ResourceRequest) -> ResourceRequest: ...
def __repr__(self) -> str: ...
def __eq__(self, other: ResourceRequest) -> bool: ... # type: ignore[override]
Expand Down Expand Up @@ -1198,6 +1199,9 @@ def stateful_udf(
concurrency: int | None,
) -> PyExpr: ...
def check_column_name_validity(name: str, schema: PySchema): ...
def extract_partial_stateful_udf_py(expression: PyExpr) -> dict[str, PartialStatefulUDF]: ...
def bind_stateful_udfs(expression: PyExpr, initialized_funcs: dict[str, Callable]) -> PyExpr: ...
def resolve_expr(expr: PyExpr, schema: PySchema) -> tuple[PyExpr, PyField]: ...
def hash(expr: PyExpr, seed: Any | None = None) -> PyExpr: ...
def cosine_distance(expr: PyExpr, other: PyExpr) -> PyExpr: ...
def url_download(
Expand Down
26 changes: 26 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class PartitionTask(Generic[PartitionT]):
num_results: int
stage_id: int
partial_metadatas: list[PartialPartitionMetadata]

# Indicates that this PartitionTask must be executed on the executor with the supplied ID
# This is used when a specific executor (e.g. an Actor pool) must be provisioned and used for the task
actor_pool_id: str | None

_id: int = field(default_factory=lambda: next(ID_GEN))

def id(self) -> str:
Expand Down Expand Up @@ -87,6 +92,7 @@ def __init__(
inputs: list[PartitionT],
partial_metadatas: list[PartialPartitionMetadata] | None,
resource_request: ResourceRequest = ResourceRequest(),
actor_pool_id: str | None = None,
) -> None:
self.inputs = inputs
if partial_metadatas is not None:
Expand All @@ -96,6 +102,7 @@ def __init__(
self.resource_request: ResourceRequest = resource_request
self.instructions: list[Instruction] = list()
self.num_results = len(inputs)
self.actor_pool_id = actor_pool_id

def add_instruction(
self,
Expand Down Expand Up @@ -133,6 +140,7 @@ def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPa
num_results=1,
resource_request=resource_request_final_cpu,
partial_metadatas=self.partial_metadatas,
actor_pool_id=self.actor_pool_id,
)

def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]:
Expand All @@ -153,6 +161,7 @@ def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPart
num_results=self.num_results,
resource_request=resource_request_final_cpu,
partial_metadatas=self.partial_metadatas,
actor_pool_id=self.actor_pool_id,
)

def __str__(self) -> str:
Expand Down Expand Up @@ -530,6 +539,23 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


@dataclass(frozen=True)
class StatefulUDFProject(SingleOutputInstruction):
projection: ExpressionsProjection

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
raise NotImplementedError("UDFProject instruction cannot be run from outside an Actor. Please file an issue.")

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
return [
PartialPartitionMetadata(
num_rows=None, # UDFs can potentially change cardinality
size_bytes=None,
boundaries=None, # TODO: figure out if the stateful UDF projection changes boundaries
)
]


def _prune_boundaries(boundaries: Boundaries, projection: ExpressionsProjection) -> Boundaries | None:
"""
If projection expression is a nontrivial computation (i.e. not a direct col() reference and not an alias) on top of a boundary
Expand Down
86 changes: 82 additions & 4 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

from daft.udf import PartialStatefulUDF


# A PhysicalPlan that is still being built - may yield both PartitionTaskBuilders and PartitionTasks.
InProgressPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionTaskBuilder[PartitionT]]]
Expand Down Expand Up @@ -204,11 +202,91 @@ def pipeline_instruction(
def actor_pool_project(
child_plan: InProgressPhysicalPlan[PartitionT],
projection: ExpressionsProjection,
partial_stateful_udfs: dict[str, PartialStatefulUDF],
resource_request: execution_step.ResourceRequest,
num_actors: int,
) -> InProgressPhysicalPlan[PartitionT]:
raise NotImplementedError("Execution of ActorPoolProjects not yet implemented")
stage_id = next(stage_id_counter)
actor_pool_name = f"ActorPool_stage{stage_id}"

# Keep track of materializations of the children tasks
#
# Our goal here is to saturate the actors, and so we need a sufficient number of completed child tasks to do so. However
# we do not want too many child tasks to be running (potentially starving our actors) and hence place an upper bound of `num_actors * 2`
child_materializations_buffer_len = num_actors * 2
child_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Keep track of materializations of the actor_pool tasks
actor_pool_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Perform separate accounting for the tasks' resource request and the actors' resource request:
# * When spinning up an actor, we consider resources that are required for the persistent state in an actor (namely, GPUs and memory)
# * When running a task, we consider resources that are required for placement of tasks (namely CPUs)
task_resource_request = ResourceRequest(num_cpus=resource_request.num_cpus)
actor_resource_request = ResourceRequest(
num_gpus=resource_request.num_gpus, memory_bytes=resource_request.memory_bytes
)

with get_context().runner().actor_pool_context(
actor_pool_name,
actor_resource_request,
num_actors,
projection,
) as actor_pool_id:
child_plan_exhausted = False

# Loop until the child plan is exhausted and there is no more work in the pipeline
while not (child_plan_exhausted and len(child_materializations) == 0 and len(actor_pool_materializations) == 0):
# Exhaustively pop ready child_steps and submit them to be run on the actor_pool
while len(child_materializations) > 0 and child_materializations[0].done():
next_ready_child = child_materializations.popleft()
actor_project_step = (
PartitionTaskBuilder[PartitionT](
inputs=[next_ready_child.partition()],
partial_metadatas=[next_ready_child.partition_metadata()],
actor_pool_id=actor_pool_id,
)
.add_instruction(
instruction=execution_step.StatefulUDFProject(projection),
resource_request=task_resource_request,
)
.finalize_partition_task_single_output(
stage_id=stage_id,
)
)
actor_pool_materializations.append(actor_project_step)
yield actor_project_step

# Exhaustively pop ready actor_pool steps and bubble it upwards as the start of a new pipeline
while len(actor_pool_materializations) > 0 and actor_pool_materializations[0].done():
next_ready_actor_pool_task = actor_pool_materializations.popleft()
new_pipeline_starter_task = PartitionTaskBuilder[PartitionT](
inputs=[next_ready_actor_pool_task.partition()],
partial_metadatas=[next_ready_actor_pool_task.partition_metadata()],
resource_request=ResourceRequest(),
)
yield new_pipeline_starter_task

# No more child work to be done: if there is pending work in the pipeline we yield None
if child_plan_exhausted:
if len(child_materializations) > 0 or len(actor_pool_materializations) > 0:
yield None

# If there is capacity in the pipeline, attempt to schedule child work
elif len(child_materializations) < child_materializations_buffer_len:
try:
child_step = next(child_plan)
except StopIteration:
child_plan_exhausted = True
else:
# Finalize and yield the child step to be run if it is a PartitionTaskBuilder
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
child_materializations.append(child_step)
yield child_step

# Otherwise, indicate that we need to wait for work to complete
else:
yield None


def monotonically_increasing_id(
Expand Down
4 changes: 0 additions & 4 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

from daft.udf import PartialStatefulUDF


def scan_with_tasks(
scan_tasks: list[ScanTask],
Expand Down Expand Up @@ -83,7 +81,6 @@ def project(
def actor_pool_project(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
projection: list[PyExpr],
partial_stateful_udfs: dict[str, PartialStatefulUDF],
resource_request: ResourceRequest | None,
num_actors: int,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
Expand All @@ -94,7 +91,6 @@ def actor_pool_project(
return physical_plan.actor_pool_project(
child_plan=input,
projection=expr_projection,
partial_stateful_udfs=partial_stateful_udfs,
resource_request=resource_request,
num_actors=num_actors,
)
Expand Down
5 changes: 4 additions & 1 deletion daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import daft.daft as native
from daft import context
from daft.daft import CountMode, ImageFormat, ImageMode, ResourceRequest
from daft.daft import CountMode, ImageFormat, ImageMode, ResourceRequest, bind_stateful_udfs
from daft.daft import PyExpr as _PyExpr
from daft.daft import col as _col
from daft.daft import date_lit as _date_lit
Expand Down Expand Up @@ -1131,6 +1131,9 @@ def __reduce__(self) -> tuple:
def _input_mapping(self) -> builtins.str | None:
return self._expr._input_mapping()

def _bind_stateful_udfs(self, initialized_funcs: dict[builtins.str, Callable]) -> Expression:
return Expression._from_pyexpr(bind_stateful_udfs(self._expr, initialized_funcs))


SomeExpressionNamespace = TypeVar("SomeExpressionNamespace", bound="ExpressionNamespace")

Expand Down
1 change: 1 addition & 0 deletions daft/pickle/cloudpickle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# type: ignore
"""
Taken from: https://github.com/cloudpipe/cloudpickle/blob/master/cloudpickle/cloudpickle.py
Expand Down
1 change: 1 addition & 0 deletions daft/pickle/cloudpickle_fast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# type: ignore
"""
Code from: https://github.com/cloudpipe/cloudpickle/blob/master/cloudpickle/cloudpickle_fast.py
Expand Down
Loading

0 comments on commit 8a5170f

Please sign in to comment.