Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Add runner logic in PyRunner for ActorPoolProject #2677

Merged
merged 8 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class ResourceRequest:
def with_num_cpus(self, num_cpus: float | None) -> ResourceRequest: ...
def with_num_gpus(self, num_gpus: float | None) -> ResourceRequest: ...
def with_memory_bytes(self, memory_bytes: int | None) -> ResourceRequest: ...
def __mul__(self, factor: float) -> ResourceRequest: ...
def __add__(self, other: ResourceRequest) -> ResourceRequest: ...
def __repr__(self) -> str: ...
def __eq__(self, other: ResourceRequest) -> bool: ... # type: ignore[override]
Expand Down Expand Up @@ -1198,6 +1199,9 @@ def stateful_udf(
concurrency: int | None,
) -> PyExpr: ...
def check_column_name_validity(name: str, schema: PySchema): ...
def extract_partial_stateful_udf_py(expression: PyExpr) -> dict[str, PartialStatefulUDF]: ...
def bind_stateful_udfs(expression: PyExpr, initialized_funcs: dict[str, Callable]) -> PyExpr: ...
def resolve_expr(expr: PyExpr, schema: PySchema) -> tuple[PyExpr, PyField]: ...
def hash(expr: PyExpr, seed: Any | None = None) -> PyExpr: ...
def cosine_distance(expr: PyExpr, other: PyExpr) -> PyExpr: ...
def url_download(
Expand Down
26 changes: 26 additions & 0 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class PartitionTask(Generic[PartitionT]):
num_results: int
stage_id: int
partial_metadatas: list[PartialPartitionMetadata]

# Indicates that this PartitionTask must be executed on the executor with the supplied ID
# This is used when a specific executor (e.g. an Actor pool) must be provisioned and used for the task
actor_pool_id: str | None

_id: int = field(default_factory=lambda: next(ID_GEN))

def id(self) -> str:
Expand Down Expand Up @@ -87,6 +92,7 @@ def __init__(
inputs: list[PartitionT],
partial_metadatas: list[PartialPartitionMetadata] | None,
resource_request: ResourceRequest = ResourceRequest(),
actor_pool_id: str | None = None,
) -> None:
self.inputs = inputs
if partial_metadatas is not None:
Expand All @@ -96,6 +102,7 @@ def __init__(
self.resource_request: ResourceRequest = resource_request
self.instructions: list[Instruction] = list()
self.num_results = len(inputs)
self.actor_pool_id = actor_pool_id

def add_instruction(
self,
Expand Down Expand Up @@ -133,6 +140,7 @@ def finalize_partition_task_single_output(self, stage_id: int) -> SingleOutputPa
num_results=1,
resource_request=resource_request_final_cpu,
partial_metadatas=self.partial_metadatas,
actor_pool_id=self.actor_pool_id,
)

def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPartitionTask[PartitionT]:
Expand All @@ -153,6 +161,7 @@ def finalize_partition_task_multi_output(self, stage_id: int) -> MultiOutputPart
num_results=self.num_results,
resource_request=resource_request_final_cpu,
partial_metadatas=self.partial_metadatas,
actor_pool_id=self.actor_pool_id,
)

def __str__(self) -> str:
Expand Down Expand Up @@ -530,6 +539,23 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
]


@dataclass(frozen=True)
class StatefulUDFProject(SingleOutputInstruction):
projection: ExpressionsProjection

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
raise NotImplementedError("UDFProject instruction cannot be run from outside an Actor. Please file an issue.")

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
return [
PartialPartitionMetadata(
num_rows=None, # UDFs can potentially change cardinality
size_bytes=None,
boundaries=None, # TODO: figure out if the stateful UDF projection changes boundaries
)
]


def _prune_boundaries(boundaries: Boundaries, projection: ExpressionsProjection) -> Boundaries | None:
"""
If projection expression is a nontrivial computation (i.e. not a direct col() reference and not an alias) on top of a boundary
Expand Down
86 changes: 82 additions & 4 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

from daft.udf import PartialStatefulUDF


# A PhysicalPlan that is still being built - may yield both PartitionTaskBuilders and PartitionTasks.
InProgressPhysicalPlan = Iterator[Union[None, PartitionTask[PartitionT], PartitionTaskBuilder[PartitionT]]]
Expand Down Expand Up @@ -204,11 +202,91 @@ def pipeline_instruction(
def actor_pool_project(
child_plan: InProgressPhysicalPlan[PartitionT],
projection: ExpressionsProjection,
partial_stateful_udfs: dict[str, PartialStatefulUDF],
resource_request: execution_step.ResourceRequest,
num_actors: int,
) -> InProgressPhysicalPlan[PartitionT]:
raise NotImplementedError("Execution of ActorPoolProjects not yet implemented")
stage_id = next(stage_id_counter)
actor_pool_name = f"ActorPool_stage{stage_id}"

# Keep track of materializations of the children tasks
#
# Our goal here is to saturate the actors, and so we need a sufficient number of completed child tasks to do so. However
# we do not want too many child tasks to be running (potentially starving our actors) and hence place an upper bound of `num_actors * 2`
child_materializations_buffer_len = num_actors * 2
jaychia marked this conversation as resolved.
Show resolved Hide resolved
child_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Keep track of materializations of the actor_pool tasks
actor_pool_materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque()

# Perform separate accounting for the tasks' resource request and the actors' resource request:
# * When spinning up an actor, we consider resources that are required for the persistent state in an actor (namely, GPUs and memory)
# * When running a task, we consider resources that are required for placement of tasks (namely CPUs)
task_resource_request = ResourceRequest(num_cpus=resource_request.num_cpus)
actor_resource_request = ResourceRequest(
num_gpus=resource_request.num_gpus, memory_bytes=resource_request.memory_bytes
)

with get_context().runner().actor_pool_context(
jaychia marked this conversation as resolved.
Show resolved Hide resolved
actor_pool_name,
actor_resource_request,
num_actors,
projection,
) as actor_pool_id:
child_plan_exhausted = False

# Loop until the child plan is exhausted and there is no more work in the pipeline
while not (child_plan_exhausted and len(child_materializations) == 0 and len(actor_pool_materializations) == 0):
# Exhaustively pop ready child_steps and submit them to be run on the actor_pool
while len(child_materializations) > 0 and child_materializations[0].done():
next_ready_child = child_materializations.popleft()
actor_project_step = (
PartitionTaskBuilder[PartitionT](
inputs=[next_ready_child.partition()],
partial_metadatas=[next_ready_child.partition_metadata()],
actor_pool_id=actor_pool_id,
)
.add_instruction(
instruction=execution_step.StatefulUDFProject(projection),
resource_request=task_resource_request,
)
.finalize_partition_task_single_output(
stage_id=stage_id,
)
)
actor_pool_materializations.append(actor_project_step)
yield actor_project_step

# Exhaustively pop ready actor_pool steps and bubble it upwards as the start of a new pipeline
while len(actor_pool_materializations) > 0 and actor_pool_materializations[0].done():
next_ready_actor_pool_task = actor_pool_materializations.popleft()
new_pipeline_starter_task = PartitionTaskBuilder[PartitionT](
inputs=[next_ready_actor_pool_task.partition()],
partial_metadatas=[next_ready_actor_pool_task.partition_metadata()],
resource_request=ResourceRequest(),
)
yield new_pipeline_starter_task

# No more child work to be done: if there is pending work in the pipeline we yield None
if child_plan_exhausted:
if len(child_materializations) > 0 or len(actor_pool_materializations) > 0:
yield None

# If there is capacity in the pipeline, attempt to schedule child work
elif len(child_materializations) < child_materializations_buffer_len:
try:
child_step = next(child_plan)
except StopIteration:
child_plan_exhausted = True
else:
# Finalize and yield the child step to be run if it is a PartitionTaskBuilder
if isinstance(child_step, PartitionTaskBuilder):
child_step = child_step.finalize_partition_task_single_output(stage_id=stage_id)
child_materializations.append(child_step)
yield child_step

# Otherwise, indicate that we need to wait for work to complete
else:
yield None


def monotonically_increasing_id(
Expand Down
4 changes: 0 additions & 4 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

from daft.udf import PartialStatefulUDF


def scan_with_tasks(
scan_tasks: list[ScanTask],
Expand Down Expand Up @@ -83,7 +81,6 @@ def project(
def actor_pool_project(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
projection: list[PyExpr],
partial_stateful_udfs: dict[str, PartialStatefulUDF],
resource_request: ResourceRequest | None,
num_actors: int,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
Expand All @@ -94,7 +91,6 @@ def actor_pool_project(
return physical_plan.actor_pool_project(
child_plan=input,
projection=expr_projection,
partial_stateful_udfs=partial_stateful_udfs,
resource_request=resource_request,
num_actors=num_actors,
)
Expand Down
5 changes: 4 additions & 1 deletion daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import daft.daft as native
from daft import context
from daft.daft import CountMode, ImageFormat, ImageMode, ResourceRequest
from daft.daft import CountMode, ImageFormat, ImageMode, ResourceRequest, bind_stateful_udfs
from daft.daft import PyExpr as _PyExpr
from daft.daft import col as _col
from daft.daft import date_lit as _date_lit
Expand Down Expand Up @@ -1131,6 +1131,9 @@ def __reduce__(self) -> tuple:
def _input_mapping(self) -> builtins.str | None:
return self._expr._input_mapping()

def _bind_stateful_udfs(self, initialized_funcs: dict[builtins.str, Callable]) -> Expression:
return Expression._from_pyexpr(bind_stateful_udfs(self._expr, initialized_funcs))


SomeExpressionNamespace = TypeVar("SomeExpressionNamespace", bound="ExpressionNamespace")

Expand Down
1 change: 1 addition & 0 deletions daft/pickle/cloudpickle.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# type: ignore
"""
Taken from: https://github.com/cloudpipe/cloudpickle/blob/master/cloudpickle/cloudpickle.py

Expand Down
1 change: 1 addition & 0 deletions daft/pickle/cloudpickle_fast.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# type: ignore
"""
Code from: https://github.com/cloudpipe/cloudpickle/blob/master/cloudpickle/cloudpickle_fast.py

Expand Down
Loading
Loading