Skip to content

Commit

Permalink
Add job table to state API (#5076)
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz authored and robertnishihara committed Jul 6, 2019
1 parent 53d5a8a commit c5253cc
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 17 deletions.
3 changes: 2 additions & 1 deletion python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
_config = _Config()

from ray.profiling import profile # noqa: E402
from ray.state import (global_state, nodes, tasks, objects, timeline,
from ray.state import (global_state, jobs, nodes, tasks, objects, timeline,
object_transfer_timeline, cluster_resources,
available_resources, errors) # noqa: E402
from ray.worker import (
Expand Down Expand Up @@ -101,6 +101,7 @@

__all__ = [
"global_state",
"jobs",
"nodes",
"tasks",
"objects",
Expand Down
1 change: 1 addition & 0 deletions python/ray/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
TablePrefix_OBJECT_string = "OBJECT"
TablePrefix_ERROR_INFO_string = "ERROR_INFO"
TablePrefix_PROFILE_string = "PROFILE"
TablePrefix_JOB_string = "JOB"


def construct_error_message(job_id, error_type, message, timestamp):
Expand Down
15 changes: 8 additions & 7 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ def to_shard_index(id_bin):
"entries from redis shard {}.".format(
len(keys) - num_deleted, shard_index))

def xray_job_removed_handler(self, unused_channel, data):
"""Handle a notification that a job has been removed.
def xray_job_notification_handler(self, unused_channel, data):
"""Handle a notification that a job has been added or removed.
Args:
unused_channel: The message channel.
Expand All @@ -209,10 +209,11 @@ def xray_job_removed_handler(self, unused_channel, data):
job_data = gcs_entries.entries[0]
message = ray.gcs_utils.JobTableData.FromString(job_data)
job_id = message.job_id
logger.info("Monitor: "
"XRay Driver {} has been removed.".format(
binary_to_hex(job_id)))
self._xray_clean_up_entries_for_job(job_id)
if message.is_dead:
logger.info("Monitor: "
"XRay Driver {} has been removed.".format(
binary_to_hex(job_id)))
self._xray_clean_up_entries_for_job(job_id)

def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
Expand Down Expand Up @@ -242,7 +243,7 @@ def process_messages(self, max_messages=10000):
message_handler = self.xray_heartbeat_batch_handler
elif channel == ray.gcs_utils.XRAY_JOB_CHANNEL:
# Handles driver death.
message_handler = self.xray_job_removed_handler
message_handler = self.xray_job_notification_handler
else:
raise Exception("This code should be unreachable.")

Expand Down
84 changes: 84 additions & 0 deletions python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,76 @@ def client_table(self):

return _parse_client_table(self.redis_client)

def _job_table(self, job_id):
"""Fetch and parse the job table information for a single job ID.
Args:
job_id: A job ID or hex string to get information about.
Returns:
A dictionary with information about the job ID in question.
"""
# Allow the argument to be either a JobID or a hex string.
if not isinstance(job_id, ray.JobID):
assert isinstance(job_id, str)
job_id = ray.JobID(hex_to_binary(job_id))

# Return information about a single job ID.
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("JOB"), "",
job_id.binary())

if message is None:
return {}

gcs_entry = gcs_utils.GcsEntry.FromString(message)

assert len(gcs_entry.entries) > 0

job_info = {}

for i in range(len(gcs_entry.entries)):
entry = gcs_utils.JobTableData.FromString(gcs_entry.entries[i])
assert entry.job_id == job_id.binary()
job_info["JobID"] = job_id.hex()
job_info["NodeManagerAddress"] = entry.node_manager_address
job_info["DriverPid"] = entry.driver_pid
if entry.is_dead:
job_info["StopTime"] = entry.timestamp
else:
job_info["StartTime"] = entry.timestamp

return job_info

def job_table(self):
"""Fetch and parse the Redis job table.
Returns:
Information about the Ray jobs in the cluster,
namely a list of dicts with keys:
- "JobID" (identifier for the job),
- "NodeManagerAddress" (IP address of the driver for this job),
- "DriverPid" (process ID of the driver for this job),
- "StartTime" (UNIX timestamp of the start time of this job),
- "StopTime" (UNIX timestamp of the stop time of this job, if any)
"""
self._check_connected()

job_keys = self.redis_client.keys(gcs_utils.TablePrefix_JOB_string +
"*")

job_ids_binary = {
key[len(gcs_utils.TablePrefix_JOB_string):]
for key in job_keys
}

results = []

for job_id_binary in job_ids_binary:
results.append(self._job_table(binary_to_hex(job_id_binary)))

return results

def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.
Expand Down Expand Up @@ -982,6 +1052,20 @@ def error_messages(self, job_id=None):
global_state = DeprecatedGlobalState()


def jobs():
"""Get a list of the jobs in the cluster.
Returns:
Information from the job table, namely a list of dicts with keys:
- "JobID" (identifier for the job),
- "NodeManagerAddress" (IP address of the driver for this job),
- "DriverPid" (process ID of the driver for this job),
- "StartTime" (UNIX timestamp of the start time of this job),
- "StopTime" (UNIX timestamp of the stop time of this job, if any)
"""
return state.job_table()


def nodes():
"""Get a list of the nodes in the cluster.
Expand Down
19 changes: 16 additions & 3 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2423,15 +2423,22 @@ def wait_for_num_objects(num_objects, timeout=10):
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't have a Python API yet.")
def test_global_state_api(shutdown_only):
with pytest.raises(Exception):

error_message = ("The ray global state API cannot be used "
"before ray.init has been called.")

with pytest.raises(Exception, match=error_message):
ray.objects()

with pytest.raises(Exception):
with pytest.raises(Exception, match=error_message):
ray.tasks()

with pytest.raises(Exception):
with pytest.raises(Exception, match=error_message):
ray.nodes()

with pytest.raises(Exception, match=error_message):
ray.jobs()

ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})

resources = {"CPU": 5, "GPU": 3, "CustomResource": 1}
Expand Down Expand Up @@ -2509,6 +2516,12 @@ def wait_for_object_table():
object_table_entry = ray.objects(result_id)
assert object_table[result_id] == object_table_entry

job_table = ray.jobs()

assert len(job_table) == 1
assert job_table[0]["JobID"] == job_id
assert job_table[0]["NodeManagerAddress"] == node_ip_address


# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we
# should use those, but they seem to conflict with Ray's use of faulthandler.
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ void TestLogSubscribeAll(const JobID &job_id,
auto subscribe_callback = [job_ids](gcs::AsyncGcsClient *client) {
// We have subscribed. Do the writes to the table.
for (size_t i = 0; i < job_ids.size(); i++) {
RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false));
RAY_CHECK_OK(
client->job_table().AppendJobData(job_ids[i], false, 0, "localhost", 1));
}
};

Expand Down
7 changes: 6 additions & 1 deletion src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,15 @@ std::string ProfileTable::DebugString() const {
return Log<UniqueID, ProfileTableData>::DebugString();
}

Status JobTable::AppendJobData(const JobID &job_id, bool is_dead) {
Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp,
const std::string &node_manager_address,
int64_t driver_pid) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id.Binary());
data->set_is_dead(is_dead);
data->set_timestamp(timestamp);
data->set_node_manager_address(node_manager_address);
data->set_driver_pid(driver_pid);
return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr);
}

Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,12 @@ class JobTable : public Log<JobID, JobTableData> {
///
/// \param job_id The job id.
/// \param is_dead Whether the job is dead.
/// \param timestamp The UNIX timestamp when the driver was started/stopped.
/// \param node_manager_address IP address of the node the driver is running on.
/// \param driver_pid Process ID of the driver process.
/// \return The return status.
Status AppendJobData(const JobID &job_id, bool is_dead);
Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp,
const std::string &node_manager_address, int64_t driver_pid);
};

/// Actor table starts with an ALIVE entry, which represents the first time the actor
Expand Down
6 changes: 6 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ message JobTableData {
bytes job_id = 1;
// Whether it's dead.
bool is_dead = 2;
// The UNIX timestamp corresponding to this event (job added or removed).
int64 timestamp = 3;
// IP of the node this job was started on.
string node_manager_address = 4;
// Process ID of the driver running this job.
int64 driver_pid = 5;
}

// This table stores the actor checkpoint data. An actor checkpoint
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ int main(int argc, char *argv[]) {
ray::raylet::ResourceSet(std::move(static_resource_conf));
RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: "
<< node_manager_config.resource_config.ToString();
node_manager_config.node_manager_address = node_ip_address;
node_manager_config.node_manager_port = node_manager_port;
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.num_workers_per_process =
Expand Down
13 changes: 10 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -854,12 +854,17 @@ void NodeManager::ProcessRegisterClientRequestMessage(
} else {
// Register the new driver.
const WorkerID driver_id = from_flatbuf<WorkerID>(*message->worker_id());
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
// Compute a dummy driver task id from a given driver.
const TaskID driver_task_id = TaskID::ComputeDriverTaskId(driver_id);
worker->AssignTaskId(driver_task_id);
worker->AssignJobId(from_flatbuf<JobID>(*message->job_id()));
worker->AssignJobId(job_id);
worker_pool_.RegisterDriver(std::move(worker));
local_queues_.AddDriverTaskId(driver_task_id);
RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(
JobID(driver_id),
/*is_dead=*/false, std::time(nullptr), initial_config_.node_manager_address,
message->worker_pid()));
}
}

Expand Down Expand Up @@ -1025,8 +1030,10 @@ void NodeManager::ProcessDisconnectClientMessage(
DispatchTasks(local_queues_.GetReadyTasksWithResources());
} else if (is_driver) {
// The client is a driver.
RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(JobID(client->GetClientId()),
/*is_dead=*/true));
RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(
JobID(client->GetClientId()),
/*is_dead=*/true, std::time(nullptr), initial_config_.node_manager_address,
worker->Pid()));
auto job_id = worker->GetAssignedTaskId();
RAY_CHECK(!job_id.IsNil());
local_queues_.RemoveDriverTaskId(job_id);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ using rpc::JobTableData;
struct NodeManagerConfig {
/// The node's resource configuration.
ResourceSet resource_config;
/// The IP address this node manager is running on.
std::string node_manager_address;
/// The port to use for listening to incoming connections. If this is 0 then
/// the node manager will choose its own port.
int node_manager_port;
Expand Down

0 comments on commit c5253cc

Please sign in to comment.