Skip to content

Commit

Permalink
[data] fix pandas type error while transfer from webdataset to lance
Browse files Browse the repository at this point in the history
Signed-off-by: jukejian <[email protected]>
  • Loading branch information
Jay-ju committed Nov 9, 2024
1 parent a198e13 commit e281f8d
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions python/ray/data/_internal/execution/interfaces/physical_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,38 @@ def on_task_finished(self):
self._task_done_callback()


class TaskBackPressureState:
def __init__(self):
self._in_task_submission_backpressure = False
self._in_task_submission_backpressure_reason = ""
self._in_task_output_backpressure = False
self._in_task_output_backpressure_reason = ""

def is_in_task_submission_backpressure(
self, in_backpressure: bool, reason: str
) -> bool:
return (
self._in_task_submission_backpressure != in_backpressure
or self._in_task_submission_backpressure_reason != reason
)

def set_in_task_submission_backpressure(self, in_backpressure: bool, reason: str):
self._in_task_submission_backpressure = in_backpressure
self._in_task_submission_backpressure_reason = reason

def is_in_task_output_backpressure(
self, in_backpressure: bool, reason: str
) -> bool:
return (
self._in_task_output_backpressure != in_backpressure
or self._in_task_output_backpressure_reason != reason
)

def set_in_task_output_backpressure(self, in_backpressure: bool, reason: str):
self._in_task_output_backpressure = in_backpressure
self._in_task_output_backpressure_reason = reason


class PhysicalOperator(Operator):
"""Abstract class for physical operators.
Expand Down Expand Up @@ -188,11 +220,8 @@ def __init__(
self._inputs_complete = not input_dependencies
self._target_max_block_size = target_max_block_size
self._started = False
self._in_task_submission_backpressure = False
self._in_task_submission_backpressure_reason = ""
self._in_task_output_backpressure = False
self._in_task_output_backpressure_reason = ""
self._metrics = OpRuntimeMetrics(self)
self._backpressure_state = TaskBackPressureState()
self._estimated_num_output_bundles = None
self._estimated_output_num_rows = None
self._execution_completed = False
Expand Down Expand Up @@ -494,31 +523,24 @@ def notify_in_task_submission_backpressure(
reason: reason for in_backpressure
"""
# only update on change to in_backpressure
if (
self._in_task_submission_backpressure != in_backpressure
or self._in_task_submission_backpressure_reason != reason
if self._backpressure_state.is_in_task_submission_backpressure(
in_backpressure, reason
):
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
self._in_task_submission_backpressure = in_backpressure
self._in_task_submission_backpressure_reason = reason
self._backpressure_state.set_in_task_submission_backpressure(
in_backpressure, reason
)

def notify_in_task_output_backpressure(
self, in_backpressure: bool, reason: str
) -> None:
"""Called periodically from the executor to update internal in backpressure
status for stats collection purposes.
Args:
in_backpressure: Value this operator's in_backpressure should be set to.
reason: reason for in_backpressure
"""
# only update on change to in_backpressure
if (
self._in_task_output_backpressure != in_backpressure
or self._in_task_output_backpressure_reason != reason
if self._backpressure_state.is_in_task_output_backpressure(
in_backpressure, reason
):
self._in_task_output_backpressure = in_backpressure
self._in_task_output_backpressure_reason = reason
self._backpressure_state.set_in_task_output_backpressure(
in_backpressure, reason
)

def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
Expand Down

0 comments on commit e281f8d

Please sign in to comment.