From 746bf37ef9b2f588c6dc923d6c837ac96af46730 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 30 Jun 2019 22:20:12 -0700 Subject: [PATCH 01/12] implement job table --- python/ray/__init__.py | 3 +- python/ray/gcs_utils.py | 1 + python/ray/state.py | 68 ++++++++++++++++++++++++++++++++++ python/ray/tests/test_basic.py | 9 +++++ src/ray/gcs/client_test.cc | 3 +- src/ray/gcs/tables.cc | 6 ++- src/ray/gcs/tables.h | 6 ++- src/ray/protobuf/gcs.proto | 6 +++ src/ray/raylet/main.cc | 1 + src/ray/raylet/node_manager.cc | 14 ++++++- src/ray/raylet/node_manager.h | 2 + 11 files changed, 113 insertions(+), 6 deletions(-) diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 90e3d09bf774..888971966f4f 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -67,7 +67,7 @@ _config = _Config() from ray.profiling import profile # noqa: E402 -from ray.state import (global_state, nodes, tasks, objects, timeline, +from ray.state import (global_state, jobs, nodes, tasks, objects, timeline, object_transfer_timeline, cluster_resources, available_resources, errors) # noqa: E402 from ray.worker import ( @@ -101,6 +101,7 @@ __all__ = [ "global_state", + "jobs", "nodes", "tasks", "objects", diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 25157a62e1f5..1e5441485263 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -59,6 +59,7 @@ TablePrefix_OBJECT_string = "OBJECT" TablePrefix_ERROR_INFO_string = "ERROR_INFO" TablePrefix_PROFILE_string = "PROFILE" +TablePrefix_JOB_string = "JOB" def construct_error_message(job_id, error_type, message, timestamp): diff --git a/python/ray/state.py b/python/ray/state.py index 288b64dc1d8b..368034c90a38 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -385,6 +385,66 @@ def client_table(self): return _parse_client_table(self.redis_client) + def _job_table(self, job_id): + """Fetch and parse the job table information for a single job ID. + + Args: + job_id: A job ID to get information about. + Returns: + A dictionary with information about the job ID in question. + """ + # Allow the argument to be either a DriverID or a hex string. + if not isinstance(job_id, ray.JobID): + job_id = ray.JobID(hex_to_binary(job_id)) + + # Return information about a single job ID. + message = self.redis_client.execute_command("RAY.TABLE_LOOKUP", + gcs_utils.TablePrefix.Value("JOB"), "", job_id.binary()) + + if message is None: + return {} + + gcs_entry = gcs_utils.GcsEntry.FromString(message) + + assert len(gcs_entry.entries) > 0 + + job_info = {} + + for i in range(len(gcs_entry.entries)): + entry = gcs_utils.JobTableData.FromString(gcs_entry.entries[i]) + assert entry.job_id == job_id.binary() + job_info["JobID"] = job_id.hex() + job_info["NodeManagerAddress"] = entry.node_manager_address + job_info["DriverPid"] = entry.driver_pid + if entry.is_dead: + job_info["StopTime"] = entry.timestamp + else: + job_info["StartTime"] = entry.timestamp + + return job_info + + def job_table(self): + """Fetch and parse the Redis job table. + Returns: + Information about the Ray jobs in the cluster. + """ + self._check_connected() + + job_keys = self.redis_client.keys( + gcs_utils.TablePrefix_JOB_string + "*") + + job_ids_binary = { + key[len(gcs_utils.TablePrefix_JOB_string):] + for key in job_keys + } + + results = [] + + for job_id_binary in job_ids_binary: + results.append(self._job_table(binary_to_hex(job_id_binary))) + + return results + def _profile_table(self, batch_id): """Get the profile events for a given batch of profile events. @@ -982,6 +1042,14 @@ def error_messages(self, job_id=None): global_state = DeprecatedGlobalState() +def jobs(): + """Get a list of the jobs in the cluster. + Returns: + Information from the job table. + """ + return state.job_table() + + def nodes(): """Get a list of the nodes in the cluster. diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index cff8d45b9971..c44b319ab1af 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -2432,6 +2432,9 @@ def test_global_state_api(shutdown_only): with pytest.raises(Exception): ray.nodes() + with pytest.raises(Exception): + ray.jobs() + ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1}) resources = {"CPU": 5, "GPU": 3, "CustomResource": 1} @@ -2509,6 +2512,12 @@ def wait_for_object_table(): object_table_entry = ray.objects(result_id) assert object_table[result_id] == object_table_entry + job_table = ray.jobs() + + assert len(job_table) == 1 + assert job_table[0]["JobID"] == job_id + assert job_table[0]["NodeManagerAddress"] == node_ip_address + # TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we # should use those, but they seem to conflict with Ray's use of faulthandler. diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 7fdd48f32e3c..8429f5f1b995 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -581,7 +581,8 @@ void TestLogSubscribeAll(const JobID &job_id, auto subscribe_callback = [job_ids](gcs::AsyncGcsClient *client) { // We have subscribed. Do the writes to the table. for (size_t i = 0; i < job_ids.size(); i++) { - RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false)); + RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false, + "localhost", 1)); } }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 5fc004ee69c4..81680619a0f6 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -475,10 +475,14 @@ std::string ProfileTable::DebugString() const { return Log::DebugString(); } -Status JobTable::AppendJobData(const JobID &job_id, bool is_dead) { +Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, + const std::string& node_manager_address, int64_t driver_pid) { auto data = std::make_shared(); data->set_job_id(job_id.Binary()); data->set_is_dead(is_dead); + data->set_timestamp(timestamp); + data->set_node_manager_address(node_manager_address); + data->set_driver_pid(driver_pid); return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr); } diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 4984fdde7938..709a7c94f1a3 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -654,8 +654,12 @@ class JobTable : public Log { /// /// \param job_id The job id. /// \param is_dead Whether the job is dead. + /// \param timestamp The UNIX timestamp when the driver was started/stopped. + /// \param node_manager_address IP address of the node the driver is running on. + /// \param driver_pid Process ID of the driver process. /// \return The return status. - Status AppendJobData(const JobID &job_id, bool is_dead); + Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, + const std::string& node_manager_address, int64_t driver_pid); }; /// Actor table starts with an ALIVE entry, which represents the first time the actor diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index b4de8b9cac5d..b33e4afb8ef9 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -227,6 +227,12 @@ message JobTableData { bytes job_id = 1; // Whether it's dead. bool is_dead = 2; + // The UNIX timestamp corresponding to this event (job added or removed). + int64 timestamp = 3; + // IP of the node this job was started on. + string node_manager_address = 4; + // Process ID of the driver running this job. + int64 driver_pid = 5; } // This table stores the actor checkpoint data. An actor checkpoint diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index a39d0c975799..8da446d7a0e8 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -108,6 +108,7 @@ int main(int argc, char *argv[]) { ray::raylet::ResourceSet(std::move(static_resource_conf)); RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: " << node_manager_config.resource_config.ToString(); + node_manager_config.node_manager_address = node_ip_address; node_manager_config.node_manager_port = node_manager_port; node_manager_config.num_initial_workers = num_initial_workers; node_manager_config.num_workers_per_process = diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 8e2cdf6846c6..958c6bbb19d6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -858,11 +858,18 @@ void NodeManager::ProcessRegisterClientRequestMessage( // which is set to the worker ID. // TODO(qwang): Use driver_task_id instead here. const WorkerID driver_id = from_flatbuf(*message->driver_id()); + const JobID job_id = from_flatbuf(*message->client_id()); TaskID driver_task_id = TaskID::GetDriverTaskID(driver_id); worker->AssignTaskId(driver_task_id); - worker->AssignJobId(from_flatbuf(*message->client_id())); + worker->AssignJobId(job_id); worker_pool_.RegisterDriver(std::move(worker)); local_queues_.AddDriverTaskId(driver_task_id); + RAY_CHECK_OK( + gcs_client_->job_table().AppendJobData(job_id, + /*is_dead=*/false, + std::time(nullptr), + initial_config_.node_manager_address, + message->worker_pid())); } } @@ -1029,7 +1036,10 @@ void NodeManager::ProcessDisconnectClientMessage( } else if (is_driver) { // The client is a driver. RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(JobID(client->GetClientId()), - /*is_dead=*/true)); + /*is_dead=*/true, + std::time(nullptr), + initial_config_.node_manager_address, + worker->Pid())); auto job_id = worker->GetAssignedTaskId(); RAY_CHECK(!job_id.IsNil()); local_queues_.RemoveDriverTaskId(job_id); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index bea7fb97be90..d485fff8a909 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -35,6 +35,8 @@ using rpc::JobTableData; struct NodeManagerConfig { /// The node's resource configuration. ResourceSet resource_config; + /// The IP address this node manager is running on. + std::string node_manager_address; /// The port to use for listening to incoming connections. If this is 0 then /// the node manager will choose its own port. int node_manager_port; From 97525d53ae14b22076affca22703577d59b67060 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 30 Jun 2019 22:27:09 -0700 Subject: [PATCH 02/12] add docs --- python/ray/state.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/ray/state.py b/python/ray/state.py index 368034c90a38..86f3d5d906e3 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -1044,8 +1044,14 @@ def error_messages(self, job_id=None): def jobs(): """Get a list of the jobs in the cluster. + Returns: - Information from the job table. + Information from the job table, namely a list of dicts with keys: + - "JobID" (sha1 identifier for the job), + - "NodeManagerAddress" (IP address of the driver for this job), + - "DriverPid" (process ID of the driver for this job), + - "StartTime" (UNIX timestamp of the start time of this job), + - "StopTime" (UNIX timestamp of the stop time of this job, if any) """ return state.job_table() From 00461d6b605c6a32bf2440380cc00588762ff33a Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sun, 30 Jun 2019 22:57:27 -0700 Subject: [PATCH 03/12] update --- src/ray/gcs/client_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 8429f5f1b995..3cdb9a81925f 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -582,7 +582,7 @@ void TestLogSubscribeAll(const JobID &job_id, // We have subscribed. Do the writes to the table. for (size_t i = 0; i < job_ids.size(); i++) { RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false, - "localhost", 1)); + 0, "localhost", 1)); } }; From 54efe0959da044d05eff804a1850f0e6176e33ec Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 2 Jul 2019 10:48:31 -0700 Subject: [PATCH 04/12] WTF --- python/ray/monitor.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/ray/monitor.py b/python/ray/monitor.py index d02486277225..59571e3ed0ae 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -198,8 +198,8 @@ def to_shard_index(id_bin): "entries from redis shard {}.".format( len(keys) - num_deleted, shard_index)) - def xray_job_removed_handler(self, unused_channel, data): - """Handle a notification that a job has been removed. + def xray_job_notification_handler(self, unused_channel, data): + """Handle a notification that a job has been added or removed. Args: unused_channel: The message channel. @@ -209,10 +209,11 @@ def xray_job_removed_handler(self, unused_channel, data): job_data = gcs_entries.entries[0] message = ray.gcs_utils.JobTableData.FromString(job_data) job_id = message.job_id - logger.info("Monitor: " - "XRay Driver {} has been removed.".format( - binary_to_hex(job_id))) - self._xray_clean_up_entries_for_job(job_id) + if message.is_dead: + logger.info("Monitor: " + "XRay Driver {} has been removed.".format( + binary_to_hex(job_id))) + self._xray_clean_up_entries_for_job(job_id) def process_messages(self, max_messages=10000): """Process all messages ready in the subscription channels. @@ -242,7 +243,7 @@ def process_messages(self, max_messages=10000): message_handler = self.xray_heartbeat_batch_handler elif channel == ray.gcs_utils.XRAY_JOB_CHANNEL: # Handles driver death. - message_handler = self.xray_job_removed_handler + message_handler = self.xray_job_notification_handler else: raise Exception("This code should be unreachable.") From 854300b9d9a3870614dc9f6977c09b76e9e35e8c Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Tue, 2 Jul 2019 16:41:04 -0700 Subject: [PATCH 05/12] update --- python/ray/state.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/ray/state.py b/python/ray/state.py index 86f3d5d906e3..509f82e6b8d6 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -395,6 +395,7 @@ def _job_table(self, job_id): """ # Allow the argument to be either a DriverID or a hex string. if not isinstance(job_id, ray.JobID): + assert isinstance(job_id, str) job_id = ray.JobID(hex_to_binary(job_id)) # Return information about a single job ID. @@ -426,7 +427,13 @@ def _job_table(self, job_id): def job_table(self): """Fetch and parse the Redis job table. Returns: - Information about the Ray jobs in the cluster. + Information about the Ray jobs in the cluster, + namely a list of dicts with keys: + - "JobID" (sha1 identifier for the job), + - "NodeManagerAddress" (IP address of the driver for this job), + - "DriverPid" (process ID of the driver for this job), + - "StartTime" (UNIX timestamp of the start time of this job), + - "StopTime" (UNIX timestamp of the stop time of this job, if any) """ self._check_connected() From 6c1749d9564a85b163b2deda0a7a49b0d5d3740b Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 15:57:53 -0700 Subject: [PATCH 06/12] update --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 151472ae571b..66807554790a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -862,7 +862,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( worker_pool_.RegisterDriver(std::move(worker)); local_queues_.AddDriverTaskId(driver_task_id); RAY_CHECK_OK( - gcs_client_->job_table().AppendJobData(job_id, + gcs_client_->job_table().AppendJobData(JobID(driver_id), /*is_dead=*/false, std::time(nullptr), initial_config_.node_manager_address, From 245058522b09f93e92684731e214043aadfae4b2 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 17:30:48 -0700 Subject: [PATCH 07/12] Update python/ray/state.py Co-Authored-By: Robert Nishihara --- python/ray/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/state.py b/python/ray/state.py index 509f82e6b8d6..d5c1a360db82 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -388,7 +388,7 @@ def client_table(self): def _job_table(self, job_id): """Fetch and parse the job table information for a single job ID. - Args: + Args: job_id: A job ID to get information about. Returns: A dictionary with information about the job ID in question. From f460176761f8d29c34498c9ba69a55b44b8a1676 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 17:40:04 -0700 Subject: [PATCH 08/12] update --- python/ray/state.py | 14 ++++++++------ python/ray/tests/test_basic.py | 13 +++++++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/python/ray/state.py b/python/ray/state.py index d5c1a360db82..3ad14695ccb3 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -389,11 +389,12 @@ def _job_table(self, job_id): """Fetch and parse the job table information for a single job ID. Args: - job_id: A job ID to get information about. - Returns: + job_id: A job ID or hex string to get information about. + + Returns: A dictionary with information about the job ID in question. """ - # Allow the argument to be either a DriverID or a hex string. + # Allow the argument to be either a JobID or a hex string. if not isinstance(job_id, ray.JobID): assert isinstance(job_id, str) job_id = ray.JobID(hex_to_binary(job_id)) @@ -426,10 +427,11 @@ def _job_table(self, job_id): def job_table(self): """Fetch and parse the Redis job table. - Returns: + + Returns: Information about the Ray jobs in the cluster, namely a list of dicts with keys: - - "JobID" (sha1 identifier for the job), + - "JobID" (identifier for the job), - "NodeManagerAddress" (IP address of the driver for this job), - "DriverPid" (process ID of the driver for this job), - "StartTime" (UNIX timestamp of the start time of this job), @@ -1054,7 +1056,7 @@ def jobs(): Returns: Information from the job table, namely a list of dicts with keys: - - "JobID" (sha1 identifier for the job), + - "JobID" (identifier for the job), - "NodeManagerAddress" (IP address of the driver for this job), - "DriverPid" (process ID of the driver for this job), - "StartTime" (UNIX timestamp of the start time of this job), diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index c44b319ab1af..26d142b8462a 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -2423,16 +2423,21 @@ def wait_for_num_objects(num_objects, timeout=10): os.environ.get("RAY_USE_NEW_GCS") == "on", reason="New GCS API doesn't have a Python API yet.") def test_global_state_api(shutdown_only): - with pytest.raises(Exception): + + error_message = ( + "The ray global state API cannot be used " + "before ray.init has been called.") + + with pytest.raises(Exception, match=error_message): ray.objects() - with pytest.raises(Exception): + with pytest.raises(Exception, match=error_message): ray.tasks() - with pytest.raises(Exception): + with pytest.raises(Exception, match=error_message): ray.nodes() - with pytest.raises(Exception): + with pytest.raises(Exception, match=error_message): ray.jobs() ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1}) From 04c02ac2e4ecafdfc2e64653487d8ca191e04abc Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 17:42:15 -0700 Subject: [PATCH 09/12] update --- python/ray/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/state.py b/python/ray/state.py index 3ad14695ccb3..de4da063a09c 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -1054,7 +1054,7 @@ def error_messages(self, job_id=None): def jobs(): """Get a list of the jobs in the cluster. - Returns: + Returns: Information from the job table, namely a list of dicts with keys: - "JobID" (identifier for the job), - "NodeManagerAddress" (IP address of the driver for this job), From 8f3458deae546744a9f5d0172afd3f5ebe136c78 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 18:29:10 -0700 Subject: [PATCH 10/12] update --- src/ray/gcs/client_test.cc | 4 ++-- src/ray/gcs/tables.cc | 3 ++- src/ray/gcs/tables.h | 2 +- src/ray/raylet/node_manager.cc | 19 ++++++++----------- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/ray/gcs/client_test.cc b/src/ray/gcs/client_test.cc index 3cdb9a81925f..fb4fdedb37ef 100644 --- a/src/ray/gcs/client_test.cc +++ b/src/ray/gcs/client_test.cc @@ -581,8 +581,8 @@ void TestLogSubscribeAll(const JobID &job_id, auto subscribe_callback = [job_ids](gcs::AsyncGcsClient *client) { // We have subscribed. Do the writes to the table. for (size_t i = 0; i < job_ids.size(); i++) { - RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false, - 0, "localhost", 1)); + RAY_CHECK_OK( + client->job_table().AppendJobData(job_ids[i], false, 0, "localhost", 1)); } }; diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 81680619a0f6..d3135288e9a9 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -476,7 +476,8 @@ std::string ProfileTable::DebugString() const { } Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, - const std::string& node_manager_address, int64_t driver_pid) { + const std::string &node_manager_address, + int64_t driver_pid) { auto data = std::make_shared(); data->set_job_id(job_id.Binary()); data->set_is_dead(is_dead); diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 709a7c94f1a3..f8f5ab3dc04f 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -659,7 +659,7 @@ class JobTable : public Log { /// \param driver_pid Process ID of the driver process. /// \return The return status. Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp, - const std::string& node_manager_address, int64_t driver_pid); + const std::string &node_manager_address, int64_t driver_pid); }; /// Actor table starts with an ALIVE entry, which represents the first time the actor diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 66807554790a..cd2211ec731d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -861,12 +861,10 @@ void NodeManager::ProcessRegisterClientRequestMessage( worker->AssignJobId(job_id); worker_pool_.RegisterDriver(std::move(worker)); local_queues_.AddDriverTaskId(driver_task_id); - RAY_CHECK_OK( - gcs_client_->job_table().AppendJobData(JobID(driver_id), - /*is_dead=*/false, - std::time(nullptr), - initial_config_.node_manager_address, - message->worker_pid())); + RAY_CHECK_OK(gcs_client_->job_table().AppendJobData( + JobID(driver_id), + /*is_dead=*/false, std::time(nullptr), initial_config_.node_manager_address, + message->worker_pid())); } } @@ -1032,11 +1030,10 @@ void NodeManager::ProcessDisconnectClientMessage( DispatchTasks(local_queues_.GetReadyTasksWithResources()); } else if (is_driver) { // The client is a driver. - RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(JobID(client->GetClientId()), - /*is_dead=*/true, - std::time(nullptr), - initial_config_.node_manager_address, - worker->Pid())); + RAY_CHECK_OK(gcs_client_->job_table().AppendJobData( + JobID(client->GetClientId()), + /*is_dead=*/true, std::time(nullptr), initial_config_.node_manager_address, + worker->Pid())); auto job_id = worker->GetAssignedTaskId(); RAY_CHECK(!job_id.IsNil()); local_queues_.RemoveDriverTaskId(job_id); From 6a6f331d602770d4eb64f39ed2279e2769b311d1 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 18:58:24 -0700 Subject: [PATCH 11/12] linting --- python/ray/state.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/state.py b/python/ray/state.py index de4da063a09c..cf43e8fae96f 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -400,8 +400,9 @@ def _job_table(self, job_id): job_id = ray.JobID(hex_to_binary(job_id)) # Return information about a single job ID. - message = self.redis_client.execute_command("RAY.TABLE_LOOKUP", - gcs_utils.TablePrefix.Value("JOB"), "", job_id.binary()) + message = self.redis_client.execute_command( + "RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("JOB"), "", + job_id.binary()) if message is None: return {} From 67ca8034502286bfc316608ad0899cdb8f3ad731 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 5 Jul 2019 19:07:05 -0700 Subject: [PATCH 12/12] lint --- python/ray/state.py | 4 ++-- python/ray/tests/test_basic.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/state.py b/python/ray/state.py index cf43e8fae96f..75fd8a014de5 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -440,8 +440,8 @@ def job_table(self): """ self._check_connected() - job_keys = self.redis_client.keys( - gcs_utils.TablePrefix_JOB_string + "*") + job_keys = self.redis_client.keys(gcs_utils.TablePrefix_JOB_string + + "*") job_ids_binary = { key[len(gcs_utils.TablePrefix_JOB_string):] diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 26d142b8462a..9223a2f6132e 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -2424,9 +2424,8 @@ def wait_for_num_objects(num_objects, timeout=10): reason="New GCS API doesn't have a Python API yet.") def test_global_state_api(shutdown_only): - error_message = ( - "The ray global state API cannot be used " - "before ray.init has been called.") + error_message = ("The ray global state API cannot be used " + "before ray.init has been called.") with pytest.raises(Exception, match=error_message): ray.objects()