diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 90e3d09bf774..888971966f4f 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -67,7 +67,7 @@ _config = _Config() from ray.profiling import profile # noqa: E402 -from ray.state import (global_state, nodes, tasks, objects, timeline, +from ray.state import (global_state, jobs, nodes, tasks, objects, timeline, object_transfer_timeline, cluster_resources, available_resources, errors) # noqa: E402 from ray.worker import ( @@ -101,6 +101,7 @@ __all__ = [ "global_state", + "jobs", "nodes", "tasks", "objects", diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 25157a62e1f5..1e5441485263 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -59,6 +59,7 @@ TablePrefix_OBJECT_string = "OBJECT" TablePrefix_ERROR_INFO_string = "ERROR_INFO" TablePrefix_PROFILE_string = "PROFILE" +TablePrefix_JOB_string = "JOB" def construct_error_message(job_id, error_type, message, timestamp): diff --git a/python/ray/monitor.py b/python/ray/monitor.py index d02486277225..59571e3ed0ae 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -198,8 +198,8 @@ def to_shard_index(id_bin): "entries from redis shard {}.".format( len(keys) - num_deleted, shard_index)) - def xray_job_removed_handler(self, unused_channel, data): - """Handle a notification that a job has been removed. + def xray_job_notification_handler(self, unused_channel, data): + """Handle a notification that a job has been added or removed. Args: unused_channel: The message channel. @@ -209,10 +209,11 @@ def xray_job_removed_handler(self, unused_channel, data): job_data = gcs_entries.entries[0] message = ray.gcs_utils.JobTableData.FromString(job_data) job_id = message.job_id - logger.info("Monitor: " - "XRay Driver {} has been removed.".format( - binary_to_hex(job_id))) - self._xray_clean_up_entries_for_job(job_id) + if message.is_dead: + logger.info("Monitor: " + "XRay Driver {} has been removed.".format( + binary_to_hex(job_id))) + self._xray_clean_up_entries_for_job(job_id) def process_messages(self, max_messages=10000): """Process all messages ready in the subscription channels. @@ -242,7 +243,7 @@ def process_messages(self, max_messages=10000): message_handler = self.xray_heartbeat_batch_handler elif channel == ray.gcs_utils.XRAY_JOB_CHANNEL: # Handles driver death. - message_handler = self.xray_job_removed_handler + message_handler = self.xray_job_notification_handler else: raise Exception("This code should be unreachable.") diff --git a/python/ray/state.py b/python/ray/state.py index 288b64dc1d8b..75fd8a014de5 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -385,6 +385,76 @@ def client_table(self): return _parse_client_table(self.redis_client) + def _job_table(self, job_id): + """Fetch and parse the job table information for a single job ID. + + Args: + job_id: A job ID or hex string to get information about. + + Returns: + A dictionary with information about the job ID in question. + """ + # Allow the argument to be either a JobID or a hex string. + if not isinstance(job_id, ray.JobID): + assert isinstance(job_id, str) + job_id = ray.JobID(hex_to_binary(job_id)) + + # Return information about a single job ID. + message = self.redis_client.execute_command( + "RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("JOB"), "", + job_id.binary()) + + if message is None: + return {} + + gcs_entry = gcs_utils.GcsEntry.FromString(message) + + assert len(gcs_entry.entries) > 0 + + job_info = {} + + for i in range(len(gcs_entry.entries)): + entry = gcs_utils.JobTableData.FromString(gcs_entry.entries[i]) + assert entry.job_id == job_id.binary() + job_info["JobID"] = job_id.hex() + job_info["NodeManagerAddress"] = entry.node_manager_address + job_info["DriverPid"] = entry.driver_pid + if entry.is_dead: + job_info["StopTime"] = entry.timestamp + else: + job_info["StartTime"] = entry.timestamp + + return job_info + + def job_table(self): + """Fetch and parse the Redis job table. + + Returns: + Information about the Ray jobs in the cluster, + namely a list of dicts with keys: + - "JobID" (identifier for the job), + - "NodeManagerAddress" (IP address of the driver for this job), + - "DriverPid" (process ID of the driver for this job), + - "StartTime" (UNIX timestamp of the start time of this job), + - "StopTime" (UNIX timestamp of the stop time of this job, if any) + """ + self._check_connected() + + job_keys = self.redis_client.keys(gcs_utils.TablePrefix_JOB_string + + "*") + + job_ids_binary = { + key[len(gcs_utils.TablePrefix_JOB_string):] + for key in job_keys + } + + results = [] + + for job_id_binary in job_ids_binary: + results.append(self._job_table(binary_to_hex(job_id_binary))) + + return results + def _profile_table(self, batch_id): """Get the profile events for a given batch of profile events. @@ -982,6 +1052,20 @@ def error_messages(self, job_id=None): global_state = DeprecatedGlobalState() +def jobs(): + """Get a list of the jobs in the cluster. + + Returns: + Information from the job table, namely a list of dicts with keys: + - "JobID" (identifier for the job), + - "NodeManagerAddress" (IP address of the driver for this job), + - "DriverPid" (process ID of the driver for this job), + - "StartTime" (UNIX timestamp of the start time of this job), + - "StopTime" (UNIX timestamp of the stop time of this job, if any) + """ + return state.job_table() + + def nodes(): """Get a list of the nodes in the cluster. diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index cff8d45b9971..9223a2f6132e 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -2423,15 +2423,22 @@ def wait_for_num_objects(num_objects, timeout=10): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't have a Python API yet.") def test_global_state_api(shutdown_only): - with pytest.raises(Exception): + + error_message = ("The ray global state API cannot be used " + "before ray.init has been called.") + + with pytest.raises(Exception, match=error_message): ray.objects() - with pytest.raises(Exception): + with pytest.raises(Exception, match=error_message): ray.tasks() - with pytest.raises(Exception): + with pytest.raises(Exception, match=error_message): ray.nodes() + with pytest.raises(Exception, match=error_message): + ray.jobs() + ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1}) resources = {"CPU": 5, "GPU": 3, "CustomResource": 1} @@ -2509,6 +2516,12 @@ def wait_for_object_table(): object_table_entry = ray.objects(result_id) assert object_table[result_id] == object_table_entry + job_table = ray.jobs() + + assert len(job_table) == 1 + assert job_table[0]["JobID"] == job_id + assert job_table[0]["NodeManagerAddress"] == node_ip_address + # TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we # should use those, but they seem to conflict with Ray's use of faulthandler. diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 7fdd48f32e3c..fb4fdedb37ef 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -581,7 +581,8 @@ void TestLogSubscribeAll(const JobID &job_id, auto subscribe_callback = [job_ids](gcs::AsyncGcsClient *client) { // We have subscribed. Do the writes to the table. for (size_t i = 0; i < job_ids.size(); i++) { - RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false)); + RAY_CHECK_OK( + client->job_table().AppendJobData(job_ids[i], false, 0, "localhost", 1)); } }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 5fc004ee69c4..d3135288e9a9 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -475,10 +475,15 @@ std::string ProfileTable::DebugString() const { return Log::DebugString(); } -Status JobTable::AppendJobData(const JobID &job_id, bool is_dead) { +Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, + const std::string &node_manager_address, + int64_t driver_pid) { auto data = std::make_shared(); data->set_job_id(job_id.Binary()); data->set_is_dead(is_dead); + data->set_timestamp(timestamp); + data->set_node_manager_address(node_manager_address); + data->set_driver_pid(driver_pid); return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr); } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 4984fdde7938..f8f5ab3dc04f 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -654,8 +654,12 @@ class JobTable : public Log { /// /// \param job_id The job id. /// \param is_dead Whether the job is dead. + /// \param timestamp The UNIX timestamp when the driver was started/stopped. + /// \param node_manager_address IP address of the node the driver is running on. + /// \param driver_pid Process ID of the driver process. /// \return The return status. - Status AppendJobData(const JobID &job_id, bool is_dead); + Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, + const std::string &node_manager_address, int64_t driver_pid); }; /// Actor table starts with an ALIVE entry, which represents the first time the actor diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index b4de8b9cac5d..b33e4afb8ef9 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -227,6 +227,12 @@ message JobTableData { bytes job_id = 1; // Whether it's dead. bool is_dead = 2; + // The UNIX timestamp corresponding to this event (job added or removed). + int64 timestamp = 3; + // IP of the node this job was started on. + string node_manager_address = 4; + // Process ID of the driver running this job. + int64 driver_pid = 5; } // This table stores the actor checkpoint data. An actor checkpoint diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index a39d0c975799..8da446d7a0e8 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -108,6 +108,7 @@ int main(int argc, char *argv[]) { ray::raylet::ResourceSet(std::move(static_resource_conf)); RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " << node_manager_config.resource_config.ToString(); + node_manager_config.node_manager_address = node_ip_address; node_manager_config.node_manager_port = node_manager_port; node_manager_config.num_initial_workers = num_initial_workers; node_manager_config.num_workers_per_process = diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 35da34fe5207..cd2211ec731d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -854,12 +854,17 @@ void NodeManager::ProcessRegisterClientRequestMessage( } else { // Register the new driver. const WorkerID driver_id = from_flatbuf(*message->worker_id()); + const JobID job_id = from_flatbuf(*message->job_id()); // Compute a dummy driver task id from a given driver. const TaskID driver_task_id = TaskID::ComputeDriverTaskId(driver_id); worker->AssignTaskId(driver_task_id); - worker->AssignJobId(from_flatbuf(*message->job_id())); + worker->AssignJobId(job_id); worker_pool_.RegisterDriver(std::move(worker)); local_queues_.AddDriverTaskId(driver_task_id); + RAY_CHECK_OK(gcs_client_->job_table().AppendJobData( + JobID(driver_id), + /*is_dead=*/false, std::time(nullptr), initial_config_.node_manager_address, + message->worker_pid())); } } @@ -1025,8 +1030,10 @@ void NodeManager::ProcessDisconnectClientMessage( DispatchTasks(local_queues_.GetReadyTasksWithResources()); } else if (is_driver) { // The client is a driver. - RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(JobID(client->GetClientId()), - /*is_dead=*/true)); + RAY_CHECK_OK(gcs_client_->job_table().AppendJobData( + JobID(client->GetClientId()), + /*is_dead=*/true, std::time(nullptr), initial_config_.node_manager_address, + worker->Pid())); auto job_id = worker->GetAssignedTaskId(); RAY_CHECK(!job_id.IsNil()); local_queues_.RemoveDriverTaskId(job_id); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index bea7fb97be90..d485fff8a909 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -35,6 +35,8 @@ using rpc::JobTableData; struct NodeManagerConfig { /// The node's resource configuration. ResourceSet resource_config; + /// The IP address this node manager is running on. + std::string node_manager_address; /// The port to use for listening to incoming connections. If this is 0 then /// the node manager will choose its own port. int node_manager_port;