Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Workflow] workflow.delete #19178

Merged
merged 12 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/source/workflows/management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Single workflow management APIs
# Cancel a workflow.
workflow.cancel(workflow_id="workflow_id")

# Delete the workflow.
workflow.delete(workflow_id="workflow_id")

Bulk workflow management APIs
-----------------------------

Expand Down
5 changes: 3 additions & 2 deletions python/ray/workflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
get_status,
resume,
cancel,
delete,
list_all,
resume_all,
)
Expand All @@ -15,8 +16,8 @@

__all__ = [
"step", "virtual_actor", "resume", "get_output", "get_actor",
"WorkflowExecutionError", "resume_all", "cancel", "get_status", "list_all",
"init"
"WorkflowExecutionError", "resume_all", "cancel", "delete", "get_status",
"list_all", "init"
]

globals().update(WorkflowStatus.__members__)
31 changes: 30 additions & 1 deletion python/ray/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ray.workflow import serialization
from ray.workflow.storage import Storage
from ray.workflow import workflow_access
from ray.workflow.workflow_storage import get_workflow_storage
from ray.util.annotations import PublicAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -316,7 +317,8 @@ def get_status(workflow_id: str) -> WorkflowStatus:

@PublicAPI(stability="beta")
def cancel(workflow_id: str) -> None:
"""Cancel a workflow.
"""Cancel a workflow. Workflow checkpoints will still be saved in storage. To
clean up saved checkpoints, see `workflow.delete()`.

Args:
workflow_id: The workflow to cancel.
Expand All @@ -329,12 +331,39 @@ def cancel(workflow_id: str) -> None:

Returns:
None

"""
ensure_ray_initialized()
if not isinstance(workflow_id, str):
raise TypeError("workflow_id has to be a string type.")
return execution.cancel(workflow_id)


@PublicAPI(stability="beta")
def delete(workflow_id: str) -> None:
"""Delete a workflow, its checkpoints, and other information it may have
persisted to storage. To stop a running workflow, see
`workflow.cancel()`.

NOTE: The caller should ensure that the workflow is not currently
running before deleting it.

Args:
workflow_id: The workflow to cancel.
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

Examples:
>>> workflow_step = some_job.step()
>>> output = workflow_step.run_async(workflow_id="some_job")
>>> workflow.cancel(workflow_id="some_job")
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
>>> assert [("some_job", workflow.CANCELED)] == workflow.list_all()
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

Returns:
None

"""
wf_storage = get_workflow_storage(workflow_id)
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
wf_storage.delete_workflow()


__all__ = ("step", "virtual_actor", "resume", "get_output", "get_actor",
"resume_all", "get_status", "cancel")
2 changes: 2 additions & 0 deletions python/ray/workflow/api.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def get_status(workflow_id: str) -> WorkflowStatus: ...

def cancel(workflow_id: str) -> None: ...

def delete_workflow(worfkflow_id: str) -> None ...

def get_actor(actor_id: str) -> VirtualActor: ...

def init(storage: Optional[Union[str, Storage]] = None) -> None: ...
67 changes: 67 additions & 0 deletions python/ray/workflow/tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
import ray
from ray._private import signature
from ray.tests.conftest import * # noqa
from ray import workflow
from ray.workflow import workflow_storage
from ray.workflow import storage
from ray.workflow.workflow_storage import asyncio_run
from ray.workflow.common import StepType
from ray.workflow.tests import utils
import subprocess
import time


def some_func(x):
Expand Down Expand Up @@ -36,6 +40,69 @@ async def test_kv_storage(workflow_start_regular):
# TODO(suquark): Test "delete" once fully implemented.


@pytest.mark.asyncio
def test_delete(workflow_start_regular):
_storage = storage.get_global_storage()

# Delete a workflow which has finished.
@workflow.step
def basic_step(arg):
return arg

result = basic_step.step("hello world").run(workflow_id="finishes")
assert result == "hello world"
ouput = workflow.get_output("finishes")
assert ray.get(ouput) == "hello world"

workflow.delete(workflow_id="finishes")

with pytest.raises(ValueError):
ouput = workflow.get_output("finishes")

with pytest.raises(ValueError):
workflow.resume("finishes")

# Deleting is idempotent.
workflow.delete(workflow_id="finishes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw JobNotFound error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't exist yet right?

Also nit: let's call it WorkflowNotFoundError ?


# The workflow can be re-run as if it was never run before.
assert basic_step.step("123").run(workflow_id="finishes") == "123"

# Delete a workflow that has not finished and is not running.
@workflow.step
def never_ends(x):
print("RUNNINGGGGGGGGGGG")
utils.set_global_mark()
time.sleep(1000000)
return x

never_ends.step("hello world").run_async("never_finishes")

# Make sure the step is actualy executing before killing the cluster
while not utils.check_global_mark():
time.sleep(1)

# Restart
ray.shutdown()
subprocess.check_output("ray stop --force", shell=True)
workflow.init(storage=_storage)

with pytest.raises(ray.exceptions.RaySystemError):
result = workflow.get_output("never_finishes")
ray.get(result)

workflow.delete("never_finishes")

with pytest.raises(ValueError):
ouput = workflow.get_output("never_finishes")

with pytest.raises(ValueError):
workflow.resume("never_finishes")

# Deleting is idempotent.
workflow.delete(workflow_id="never_finishes")


def test_workflow_storage(workflow_start_regular):
workflow_id = test_workflow_storage.__name__
wf_storage = workflow_storage.WorkflowStorage(workflow_id,
Expand Down
4 changes: 4 additions & 0 deletions python/ray/workflow/workflow_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ def get_latest_progress(self) -> "StepID":
return asyncio_run(self._get(self._key_workflow_progress(),
True))["step_id"]

def delete_workflow(self):
prefix = self._storage.make_key(self._workflow_id)
asyncio_run(self._storage.delete_prefix(prefix))

async def _put(self, paths: List[str], data: Any,
is_json: bool = False) -> str:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ RAY_CONFIG(uint32_t, cancellation_retry_ms, 2000)
RAY_CONFIG(int64_t, ping_gcs_rpc_server_interval_milliseconds, 1000)

/// Maximum number of times to retry ping gcs rpc server when gcs server restarts.
RAY_CONFIG(int32_t, ping_gcs_rpc_server_max_retries, 600)
RAY_CONFIG(int32_t, ping_gcs_rpc_server_max_retries, 10)
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

/// Minimum interval between reconnecting gcs rpc server when gcs server restarts.
RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000)
Expand Down