-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add job table to state API #5076
Changes from all commits
746bf37
97525d5
00461d6
54efe09
523239c
854300b
6c1749d
2450585
f460176
04c02ac
8f3458d
6a6f331
67ca803
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -385,6 +385,76 @@ def client_table(self): | |
|
||
return _parse_client_table(self.redis_client) | ||
|
||
def _job_table(self, job_id): | ||
"""Fetch and parse the job table information for a single job ID. | ||
|
||
Args: | ||
job_id: A job ID or hex string to get information about. | ||
|
||
Returns: | ||
A dictionary with information about the job ID in question. | ||
""" | ||
# Allow the argument to be either a JobID or a hex string. | ||
if not isinstance(job_id, ray.JobID): | ||
assert isinstance(job_id, str) | ||
job_id = ray.JobID(hex_to_binary(job_id)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also assert the type of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
# Return information about a single job ID. | ||
message = self.redis_client.execute_command( | ||
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("JOB"), "", | ||
job_id.binary()) | ||
|
||
if message is None: | ||
return {} | ||
|
||
gcs_entry = gcs_utils.GcsEntry.FromString(message) | ||
|
||
assert len(gcs_entry.entries) > 0 | ||
|
||
job_info = {} | ||
|
||
for i in range(len(gcs_entry.entries)): | ||
entry = gcs_utils.JobTableData.FromString(gcs_entry.entries[i]) | ||
assert entry.job_id == job_id.binary() | ||
job_info["JobID"] = job_id.hex() | ||
job_info["NodeManagerAddress"] = entry.node_manager_address | ||
job_info["DriverPid"] = entry.driver_pid | ||
if entry.is_dead: | ||
job_info["StopTime"] = entry.timestamp | ||
else: | ||
job_info["StartTime"] = entry.timestamp | ||
|
||
return job_info | ||
|
||
def job_table(self): | ||
"""Fetch and parse the Redis job table. | ||
|
||
Returns: | ||
Information about the Ray jobs in the cluster, | ||
namely a list of dicts with keys: | ||
- "JobID" (identifier for the job), | ||
- "NodeManagerAddress" (IP address of the driver for this job), | ||
- "DriverPid" (process ID of the driver for this job), | ||
- "StartTime" (UNIX timestamp of the start time of this job), | ||
- "StopTime" (UNIX timestamp of the stop time of this job, if any) | ||
""" | ||
self._check_connected() | ||
|
||
job_keys = self.redis_client.keys(gcs_utils.TablePrefix_JOB_string + | ||
"*") | ||
|
||
job_ids_binary = { | ||
key[len(gcs_utils.TablePrefix_JOB_string):] | ||
for key in job_keys | ||
} | ||
|
||
results = [] | ||
|
||
for job_id_binary in job_ids_binary: | ||
results.append(self._job_table(binary_to_hex(job_id_binary))) | ||
|
||
return results | ||
|
||
def _profile_table(self, batch_id): | ||
"""Get the profile events for a given batch of profile events. | ||
|
||
|
@@ -982,6 +1052,20 @@ def error_messages(self, job_id=None): | |
global_state = DeprecatedGlobalState() | ||
|
||
|
||
def jobs(): | ||
"""Get a list of the jobs in the cluster. | ||
|
||
Returns: | ||
Information from the job table, namely a list of dicts with keys: | ||
- "JobID" (identifier for the job), | ||
- "NodeManagerAddress" (IP address of the driver for this job), | ||
- "DriverPid" (process ID of the driver for this job), | ||
- "StartTime" (UNIX timestamp of the start time of this job), | ||
- "StopTime" (UNIX timestamp of the stop time of this job, if any) | ||
""" | ||
return state.job_table() | ||
|
||
|
||
def nodes(): | ||
"""Get a list of the nodes in the cluster. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2423,15 +2423,22 @@ def wait_for_num_objects(num_objects, timeout=10): | |
os.environ.get("RAY_USE_NEW_GCS") == "on", | ||
reason="New GCS API doesn't have a Python API yet.") | ||
def test_global_state_api(shutdown_only): | ||
with pytest.raises(Exception): | ||
|
||
error_message = ("The ray global state API cannot be used " | ||
"before ray.init has been called.") | ||
|
||
with pytest.raises(Exception, match=error_message): | ||
ray.objects() | ||
|
||
with pytest.raises(Exception): | ||
with pytest.raises(Exception, match=error_message): | ||
ray.tasks() | ||
|
||
with pytest.raises(Exception): | ||
with pytest.raises(Exception, match=error_message): | ||
ray.nodes() | ||
|
||
with pytest.raises(Exception, match=error_message): | ||
ray.jobs() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be changed to make sure it actually raises the error message we expect.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1}) | ||
|
||
resources = {"CPU": 5, "GPU": 3, "CustomResource": 1} | ||
|
@@ -2509,6 +2516,12 @@ def wait_for_object_table(): | |
object_table_entry = ray.objects(result_id) | ||
assert object_table[result_id] == object_table_entry | ||
|
||
job_table = ray.jobs() | ||
|
||
assert len(job_table) == 1 | ||
assert job_table[0]["JobID"] == job_id | ||
assert job_table[0]["NodeManagerAddress"] == node_ip_address | ||
|
||
|
||
# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we | ||
# should use those, but they seem to conflict with Ray's use of faulthandler. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -475,10 +475,15 @@ std::string ProfileTable::DebugString() const { | |
return Log<UniqueID, ProfileTableData>::DebugString(); | ||
} | ||
|
||
Status JobTable::AppendJobData(const JobID &job_id, bool is_dead) { | ||
Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could just compute the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to have it outside, that makes it possible for somebody to use this function to use this function with a different timestamp than the current time. |
||
const std::string &node_manager_address, | ||
int64_t driver_pid) { | ||
auto data = std::make_shared<JobTableData>(); | ||
data->set_job_id(job_id.Binary()); | ||
data->set_is_dead(is_dead); | ||
data->set_timestamp(timestamp); | ||
data->set_node_manager_address(node_manager_address); | ||
data->set_driver_pid(driver_pid); | ||
return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr); | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -227,6 +227,12 @@ message JobTableData { | |
bytes job_id = 1; | ||
// Whether it's dead. | ||
bool is_dead = 2; | ||
// The UNIX timestamp corresponding to this event (job added or removed). | ||
int64 timestamp = 3; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's better to declare 2 fields There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each entry in the JobTable is a log entry (either job addition, or job removal, if is_dead = true), so a single timestamp is indeed the more natural representation here. They get aggregated into start time and stop time in the client API. |
||
// IP of the node this job was started on. | ||
string node_manager_address = 4; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just node address? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is to be consistent with the ClientTable |
||
// Process ID of the driver running this job. | ||
int64 driver_pid = 5; | ||
} | ||
|
||
// This table stores the actor checkpoint data. An actor checkpoint | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to define a
JobInfo
class instead of using a dictionary for this. It's only a few fields and no logic right now, but this will likely grow in scope over time. Would also improve documentation over just having the fields in a header comment for another function.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree! Let's do this change in a followup PR, since all the other tables (tasks, objects, clients) are dictionaries in the same format at the moment and we should be consistent.
Other possible choices we could consider are something json compatible with json-schema (would make it easy to put this API behind a REST endpoint in the future), protobuf, namedtuple (meh), dataclass (meh bc might require new python), any thoughts? cc @simon-mo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few considerations:
pip install dataclasses
so that won't be an issue.