-
Notifications
You must be signed in to change notification settings - Fork 356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Generic Tasks #8724
Merged
Merged
feat: Generic Tasks #8724
Changes from 82 commits
Commits
Show all changes
90 commits
Select commit
Hold shift + click to select a range
7294fa7
Create Generic Task
39069cd
Get Task Config
8897866
Fork Task
0b28abb
Create Child Task
b361a0a
small fix
5ea39bb
fix fork tree merge
fe11347
Kill task
623f8fc
fix kill merge
0952d88
remove duplicates
d1f7295
fix slots
321cac6
error for non-generic tasks
6856d57
fix bindings and kill call
5ace3d1
Pause & Resume Tasks
dca9970
pause/resume merge fixes
cdebb73
bindings clean build
7794cfa
Apply suggestions from code review
AmanuelAaron 2c81f01
remove duplicate test
48aaa5c
unify migrations
cdf8eff
Merge branch 'generic-task-final' of github.com:determined-ai/determi…
ce9e809
formatting
6ed6749
hide from -h
59aa76d
pause -> unpause
9269b1e
Merge branch 'main' into generic-task-final
1050216
Create Generic Task
6cdcadb
Get Task Config
e2dea62
Fork Task
6a3a88c
Create Child Task
89d7390
small fix
ca719f7
fix fork tree merge
eee37b2
Kill task
6f22886
fix kill merge
5943408
remove duplicates
018f7f2
fix slots
8904900
error for non-generic tasks
2f35942
fix bindings and kill call
9792cab
Pause & Resume Tasks
ffcb7bf
pause/resume merge fixes
2509e5c
bindings clean build
bfa0ac7
remove duplicate test
0b928d8
unify migrations
336391b
Apply suggestions from code review
AmanuelAaron 7b982c9
formatting
99a8cb6
hide from -h
48ae5e9
pause -> unpause
816bdc0
Merge branch 'generic-task-final' of github.com:determined-ai/determi…
2d8f64b
Merge branch 'main' into generic-task-final
AmanuelAaron c7fcd1b
fix single node check and naming
3d947b0
fix injection
75aacb2
lint + test fixes
8c60a09
fixes
11055fd
no pause in task.py
81dcb1a
go lint
21f105c
Merge branch 'main' into generic-task-final
005c302
add e2e test for generic tasks
e4f5967
isort test
d03ca94
fix test
4608633
shorten test task run time
b2a6b1e
single node default true in api_command
c41ca37
move to Bun
b17fb78
Merge branch 'main' into generic-task-final
AmanuelAaron 94e3fc5
fix pb
8f490f4
lint migrations
81910af
Merge branch 'main' into generic-task-final
86d5077
fix missing order by
84c513d
fix intg test
6c940de
Apply suggestions from code review
AmanuelAaron 714c6d2
Merge branch 'main' into generic-task-final
81e47a6
fix merge
7fce67f
fix style
9b2876d
add task state check
6c36667
fix comment
be9d7c4
update migrations
c0cf141
fix todo
ce8f7b4
remove action for get task config
f0395e5
update e2e test
4104029
reduce test fixture time
3baef06
remove run python file
b9917df
add completion tests
4492c65
fix imports
5cfd6c3
test lint
b5a63f4
Merge branch 'main' into generic-task-final
3916141
update tests
0bf0bf1
Merge branch 'main' into generic-task-final
6f19bd2
fix api.ts conflict
2a4676a
send dummy job for generic tasks
23f7ad4
set weight
904c486
Merge branch 'main' into generic-task-final
AmanuelAaron 1c77dd7
lint migrations
b7263a1
Merge branch 'generic-task-final' of github.com:determined-ai/determi…
aae7fee
implement v1Job
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
entrypoint: ["echo", "task ran"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
entrypoint: ["exit", "1"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
entrypoint: ["echo", "forked"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
entrypoint: ["sleep", "5"] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
import subprocess | ||
import time | ||
|
||
import pytest | ||
|
||
from determined.cli import ntsc | ||
from determined.common import api, util | ||
from determined.common.api import bindings | ||
from tests import api_utils | ||
from tests import config as conf | ||
|
||
|
||
def wait_for_task_state( | ||
test_session: api.Session, | ||
task_id: str, | ||
expected_state: bindings.v1GenericTaskState, | ||
timeout: int, | ||
) -> bool: | ||
deadline = time.time() + timeout | ||
while time.time() < deadline: | ||
resp = bindings.get_GetTask(test_session, taskId=task_id) | ||
if expected_state == resp.task.taskState: | ||
return True | ||
time.sleep(0.1) | ||
return False | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_create_generic_task() -> None: | ||
""" | ||
Start a simple task with a context directory called from the task CLI | ||
""" | ||
command = [ | ||
"det", | ||
"-m", | ||
conf.make_master_url(), | ||
"task", | ||
"create", | ||
conf.fixtures_path("generic_task/test_config.yaml"), | ||
"--context", | ||
conf.fixtures_path("generic_task"), | ||
] | ||
|
||
subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True) | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_generic_task_completion() -> None: | ||
""" | ||
Start a simple task and check for task completion | ||
""" | ||
test_session = api_utils.determined_test_session() | ||
|
||
with open(conf.fixtures_path("generic_task/test_config.yaml"), "r") as config_file: | ||
# Create task | ||
config_text = config_file.read() | ||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=None, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Check for complete state | ||
is_valid_state = wait_for_task_state( | ||
test_session, task_resp.taskId, bindings.v1GenericTaskState.COMPLETED, timeout=30 | ||
) | ||
if not is_valid_state: | ||
pytest.fail("task failed to complete after 30 seconds") | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_create_generic_task_error() -> None: | ||
""" | ||
Start a simple task that fails and check for error task state | ||
""" | ||
test_session = api_utils.determined_test_session() | ||
|
||
with open(conf.fixtures_path("generic_task/test_config_error.yaml"), "r") as config_file: | ||
# Create task | ||
config_text = config_file.read() | ||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=None, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Check for error state | ||
is_valid_state = wait_for_task_state( | ||
test_session, task_resp.taskId, bindings.v1GenericTaskState.ERROR, timeout=30 | ||
) | ||
if not is_valid_state: | ||
pytest.fail("task failed to complete after 30 seconds") | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_generic_task_config() -> None: | ||
""" | ||
Start a simple task without a context directory and grab its config | ||
""" | ||
test_session = api_utils.determined_test_session() | ||
|
||
with open(conf.fixtures_path("generic_task/test_config.yaml"), "r") as config_file: | ||
# Create task | ||
config_text = config_file.read() | ||
rb-determined-ai marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=None, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Get config | ||
command = ["det", "-m", conf.make_master_url(), "task", "config", task_resp.taskId] | ||
|
||
res = subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True) | ||
|
||
result_config = util.yaml_safe_load(res.stdout) | ||
expected_config = {"entrypoint": ["echo", "task ran"]} | ||
assert result_config == expected_config | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_generic_task_create_with_fork() -> None: | ||
""" | ||
Start a simple task without a context directory and grab its config | ||
""" | ||
test_session = api_utils.determined_test_session() | ||
|
||
with open(conf.fixtures_path("generic_task/test_config.yaml"), "r") as config_file: | ||
# Create initial task | ||
config = ntsc.parse_config(config_file, None, [], []) | ||
config_text = util.yaml_safe_dump(config) | ||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=None, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Create fork task | ||
with open(conf.fixtures_path("generic_task/test_config_fork.yaml"), "r") as fork_config_file: | ||
config = ntsc.parse_config(fork_config_file, None, [], []) | ||
config_text = util.yaml_safe_dump(config) | ||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=task_resp.taskId, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
fork_task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Get fork task Config | ||
command = ["det", "-m", conf.make_master_url(), "task", "config", fork_task_resp.taskId] | ||
|
||
res = subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True) | ||
result_config = util.yaml_safe_load(res.stdout) | ||
expected_config = {"entrypoint": ["echo", "forked"]} | ||
assert result_config == expected_config | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_kill_generic_task() -> None: | ||
""" | ||
Start a simple task without a context directory and grab its config | ||
""" | ||
test_session = api_utils.determined_test_session() | ||
|
||
with open(conf.fixtures_path("generic_task/test_config.yaml"), "r") as config_file: | ||
# Create task | ||
config = ntsc.parse_config(config_file, None, [], []) | ||
config_text = util.yaml_safe_dump(config) | ||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=None, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Kill task | ||
command = ["det", "-m", conf.make_master_url(), "task", "kill", task_resp.taskId] | ||
|
||
subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True) | ||
|
||
kill_resp = bindings.get_GetTask(test_session, taskId=task_resp.taskId) | ||
assert kill_resp.task.taskState == bindings.v1GenericTaskState.CANCELED | ||
|
||
|
||
@pytest.mark.e2e_cpu | ||
def test_pause_and_unpause_generic_task() -> None: | ||
""" | ||
Start a simple task without a context directory and grab its config | ||
""" | ||
test_session = api_utils.determined_test_session() | ||
|
||
with open(conf.fixtures_path("generic_task/test_config_pause.yaml"), "r") as config_file: | ||
# Create task | ||
config = ntsc.parse_config(config_file, None, [], []) | ||
config_text = util.yaml_safe_dump(config) | ||
|
||
req = bindings.v1CreateGenericTaskRequest( | ||
config=config_text, | ||
contextDirectory=[], | ||
projectId=None, | ||
forkedFrom=None, | ||
parentId=None, | ||
inheritContext=False, | ||
noPause=False, | ||
) | ||
task_resp = bindings.post_CreateGenericTask(test_session, body=req) | ||
|
||
# Pause task | ||
command = ["det", "-m", conf.make_master_url(), "task", "pause", task_resp.taskId] | ||
|
||
subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True) | ||
|
||
pause_resp = bindings.get_GetTask(test_session, taskId=task_resp.taskId) | ||
assert pause_resp.task.taskState == bindings.v1GenericTaskState.PAUSED | ||
|
||
# Unpause task | ||
command = ["det", "-m", conf.make_master_url(), "task", "unpause", task_resp.taskId] | ||
|
||
subprocess.run(command, universal_newlines=True, stdout=subprocess.PIPE, check=True) | ||
|
||
unpause_resp = bindings.get_GetTask(test_session, taskId=task_resp.taskId) | ||
assert unpause_resp.task.taskState == bindings.v1GenericTaskState.ACTIVE |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it guaranteed that
det task create
will fail if the task crashes?That's not guaranteed for
det cmd run
, where the exit code ofdet cmd run
reflects if there was a failure in the CLI itself, not whether or not there was a failure in the commnad that ran.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(question applies throughout)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
det task create
will only fail if the CLI returns an error. Tasks that crash or error out will have their task state be ERROR. The same withdet task fork
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, that's good, because it's consistent with how other CLI commands run in our system.
Should this test then make sure that the task completed successfully? If it was properly created, it should not fail, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests added to check task completion and failure