Skip to content

Commit

Permalink
fix: job queue test failures (#8843)
Browse files Browse the repository at this point in the history
* hotfix for test

* ensure task completion

* remove comment

* update test helpers

* default timeout

(cherry picked from commit 08dfa43)
  • Loading branch information
AmanuelAaron authored and determined-ci committed Feb 15, 2024
1 parent a74685f commit 1fc1496
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 43 deletions.
27 changes: 5 additions & 22 deletions e2e_tests/tests/cluster/test_master_restart.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,10 @@ def test_master_restart_generic_task(managed_cluster_restarts: ManagedCluster) -
task_resp = bindings.post_CreateGenericTask(test_session, body=req)

# Wait for task to start
started = task.wait_for_task_start(test_session, task_resp.taskId, timeout=30)
if not started:
pytest.fail("task failed to started")
task.wait_for_task_start(test_session, task_resp.taskId)
managed_cluster_restarts.kill_master()
managed_cluster_restarts.restart_master()
is_valid_state = task.wait_for_task_state(
test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED, timeout=30
)

if not is_valid_state:
pytest.fail("task failed to complete after 30 seconds")
task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED)


@pytest.mark.managed_devcluster
Expand All @@ -136,26 +129,16 @@ def test_master_restart_generic_task_pause(managed_cluster_restarts: ManagedClus
task_resp = bindings.post_CreateGenericTask(test_session, body=req)

# Wait for task to start
started = task.wait_for_task_start(test_session, task_resp.taskId, timeout=30)
if not started:
pytest.fail("task failed to started")
task.wait_for_task_start(test_session, task_resp.taskId)
# Pause task
bindings.post_PauseGenericTask(test_session, taskId=task_resp.taskId)
is_valid_state = task.wait_for_task_state(
test_session, task_resp.taskId, bindings.v1GenericTaskState.PAUSED, timeout=30
)
if not is_valid_state:
pytest.fail("task failed to complete after 30 seconds")
task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.PAUSED)
managed_cluster_restarts.kill_master()
managed_cluster_restarts.restart_master()

# Unpause task
bindings.post_UnpauseGenericTask(test_session, taskId=task_resp.taskId)
is_valid_state = task.wait_for_task_state(
test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED, timeout=30
)
if not is_valid_state:
pytest.fail("task failed to complete after 30 seconds")
task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED)


@pytest.mark.managed_devcluster
Expand Down
18 changes: 10 additions & 8 deletions e2e_tests/tests/task/task.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
import time

import pytest

from determined.common import api


def wait_for_task_state(
test_session: api.Session,
task_id: str,
expected_state: api.bindings.v1GenericTaskState,
timeout: int,
) -> bool:
timeout: int = 30,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
if expected_state == resp.task.taskState:
return True
return
time.sleep(0.1)
return False
pytest.fail(f"task failed to complete after {timeout} seconds")


def wait_for_task_start(
test_session: api.Session,
task_id: str,
timeout: int,
) -> bool:
timeout: int = 30,
) -> None:
deadline = time.time() + timeout
while time.time() < deadline:
resp = api.bindings.get_GetTask(test_session, taskId=task_id)
if resp.task.allocations[0].state == api.bindings.taskv1State.RUNNING:
return True
return
time.sleep(0.1)
return False
pytest.fail(f"task failed to start after {timeout} seconds")
33 changes: 20 additions & 13 deletions e2e_tests/tests/task/test_generic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ def test_create_generic_task() -> None:
conf.fixtures_path("generic_task"),
]

subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True)
res = subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True)

id_index = res.stdout.find("Created task ")
task_id = res.stdout[id_index + len("Created task ") :].strip()

test_session = api_utils.determined_test_session()
task.wait_for_task_state(test_session, task_id, bindings.v1GenericTaskState.COMPLETED)


@pytest.mark.e2e_cpu
Expand All @@ -52,11 +58,7 @@ def test_generic_task_completion() -> None:
task_resp = bindings.post_CreateGenericTask(test_session, body=req)

# Check for complete state
is_valid_state = task.wait_for_task_state(
test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED, timeout=30
)
if not is_valid_state:
pytest.fail("task failed to complete after 30 seconds")
task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED)


@pytest.mark.e2e_cpu
Expand All @@ -82,11 +84,7 @@ def test_create_generic_task_error() -> None:
task_resp = bindings.post_CreateGenericTask(test_session, body=req)

# Check for error state
is_valid_state = task.wait_for_task_state(
test_session, task_resp.taskId, bindings.v1GenericTaskState.ERROR, timeout=30
)
if not is_valid_state:
pytest.fail("task failed to complete after 30 seconds")
task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.ERROR)


@pytest.mark.e2e_cpu
Expand Down Expand Up @@ -120,6 +118,8 @@ def test_generic_task_config() -> None:
expected_config = {"entrypoint": ["echo", "task ran"]}
assert result_config == expected_config

task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED)


@pytest.mark.e2e_cpu
def test_generic_task_create_with_fork() -> None:
Expand Down Expand Up @@ -168,6 +168,11 @@ def test_generic_task_create_with_fork() -> None:
expected_config = {"entrypoint": ["echo", "forked"]}
assert result_config == expected_config

task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED)
task.wait_for_task_state(
test_session, fork_task_resp.taskId, bindings.v1GenericTaskState.COMPLETED
)


@pytest.mark.e2e_cpu
def test_kill_generic_task() -> None:
Expand Down Expand Up @@ -197,8 +202,8 @@ def test_kill_generic_task() -> None:

subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True)

kill_resp = bindings.get_GetTask(test_session, taskId=task_resp.taskId)
assert kill_resp.task.taskState == bindings.v1GenericTaskState.CANCELED
bindings.get_GetTask(test_session, taskId=task_resp.taskId)
task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.CANCELED)


@pytest.mark.e2e_cpu
Expand Down Expand Up @@ -239,3 +244,5 @@ def test_pause_and_unpause_generic_task() -> None:

unpause_resp = bindings.get_GetTask(test_session, taskId=task_resp.taskId)
assert unpause_resp.task.taskState == bindings.v1GenericTaskState.ACTIVE

task.wait_for_task_state(test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED)

0 comments on commit 1fc1496

Please sign in to comment.