Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job table to state API #5076

Merged
merged 13 commits into from
Jul 6, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
_config = _Config()

from ray.profiling import profile # noqa: E402
from ray.state import (global_state, nodes, tasks, objects, timeline,
from ray.state import (global_state, jobs, nodes, tasks, objects, timeline,
object_transfer_timeline, cluster_resources,
available_resources, errors) # noqa: E402
from ray.worker import (
Expand Down Expand Up @@ -101,6 +101,7 @@

__all__ = [
"global_state",
"jobs",
"nodes",
"tasks",
"objects",
Expand Down
1 change: 1 addition & 0 deletions python/ray/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
TablePrefix_OBJECT_string = "OBJECT"
TablePrefix_ERROR_INFO_string = "ERROR_INFO"
TablePrefix_PROFILE_string = "PROFILE"
TablePrefix_JOB_string = "JOB"


def construct_error_message(job_id, error_type, message, timestamp):
Expand Down
15 changes: 8 additions & 7 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ def to_shard_index(id_bin):
"entries from redis shard {}.".format(
len(keys) - num_deleted, shard_index))

def xray_job_removed_handler(self, unused_channel, data):
"""Handle a notification that a job has been removed.
def xray_job_notification_handler(self, unused_channel, data):
"""Handle a notification that a job has been added or removed.

Args:
unused_channel: The message channel.
Expand All @@ -209,10 +209,11 @@ def xray_job_removed_handler(self, unused_channel, data):
job_data = gcs_entries.entries[0]
message = ray.gcs_utils.JobTableData.FromString(job_data)
job_id = message.job_id
logger.info("Monitor: "
"XRay Driver {} has been removed.".format(
binary_to_hex(job_id)))
self._xray_clean_up_entries_for_job(job_id)
if message.is_dead:
logger.info("Monitor: "
"XRay Driver {} has been removed.".format(
binary_to_hex(job_id)))
self._xray_clean_up_entries_for_job(job_id)

def process_messages(self, max_messages=10000):
"""Process all messages ready in the subscription channels.
Expand Down Expand Up @@ -242,7 +243,7 @@ def process_messages(self, max_messages=10000):
message_handler = self.xray_heartbeat_batch_handler
elif channel == ray.gcs_utils.XRAY_JOB_CHANNEL:
# Handles driver death.
message_handler = self.xray_job_removed_handler
message_handler = self.xray_job_notification_handler
else:
raise Exception("This code should be unreachable.")

Expand Down
74 changes: 74 additions & 0 deletions python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,66 @@ def client_table(self):

return _parse_client_table(self.redis_client)

def _job_table(self, job_id):
"""Fetch and parse the job table information for a single job ID.

Args:
pcmoritz marked this conversation as resolved.
Show resolved Hide resolved
job_id: A job ID to get information about.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should note that it can be a JobID or hex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not actually fixed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it got rolled back

Returns:
A dictionary with information about the job ID in question.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to define a JobInfo class instead of using a dictionary for this. It's only a few fields and no logic right now, but this will likely grow in scope over time. Would also improve documentation over just having the fields in a header comment for another function.

Copy link
Contributor Author

@pcmoritz pcmoritz Jul 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree! Let's do this change in a followup PR, since all the other tables (tasks, objects, clients) are dictionaries in the same format at the moment and we should be consistent.

Other possible choices we could consider are something json compatible with json-schema (would make it easy to put this API behind a REST endpoint in the future), protobuf, namedtuple (meh), dataclass (meh bc might require new python), any thoughts? cc @simon-mo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few considerations:

  • there's a backport for dataclass pip install dataclasses so that won't be an issue.
  • in the api schema world, openapi schema is fairly big as well. (Kubernetes use it)
  • protobuf in python is hard to work with. Non-intuitive API, slow serialization
  • dataclass > namedtuple

"""
# Allow the argument to be either a DriverID or a hex string.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean JobID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually fixed in the comment above, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the fix got rolled back when I force pushed

if not isinstance(job_id, ray.JobID):
job_id = ray.JobID(hex_to_binary(job_id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also assert the type of job_id is str?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


# Return information about a single job ID.
message = self.redis_client.execute_command("RAY.TABLE_LOOKUP",
gcs_utils.TablePrefix.Value("JOB"), "", job_id.binary())

if message is None:
return {}

gcs_entry = gcs_utils.GcsEntry.FromString(message)

assert len(gcs_entry.entries) > 0

job_info = {}

for i in range(len(gcs_entry.entries)):
entry = gcs_utils.JobTableData.FromString(gcs_entry.entries[i])
assert entry.job_id == job_id.binary()
job_info["JobID"] = job_id.hex()
job_info["NodeManagerAddress"] = entry.node_manager_address
job_info["DriverPid"] = entry.driver_pid
if entry.is_dead:
job_info["StopTime"] = entry.timestamp
else:
job_info["StartTime"] = entry.timestamp

return job_info

def job_table(self):
"""Fetch and parse the Redis job table.
Returns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is wrong and missing newline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Information about the Ray jobs in the cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Information in what format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"""
self._check_connected()

job_keys = self.redis_client.keys(
gcs_utils.TablePrefix_JOB_string + "*")

job_ids_binary = {
key[len(gcs_utils.TablePrefix_JOB_string):]
for key in job_keys
}

results = []

for job_id_binary in job_ids_binary:
results.append(self._job_table(binary_to_hex(job_id_binary)))

return results

def _profile_table(self, batch_id):
"""Get the profile events for a given batch of profile events.

Expand Down Expand Up @@ -982,6 +1042,20 @@ def error_messages(self, job_id=None):
global_state = DeprecatedGlobalState()


def jobs():
"""Get a list of the jobs in the cluster.

Returns:
Information from the job table, namely a list of dicts with keys:
- "JobID" (sha1 identifier for the job),
- "NodeManagerAddress" (IP address of the driver for this job),
- "DriverPid" (process ID of the driver for this job),
- "StartTime" (UNIX timestamp of the start time of this job),
- "StopTime" (UNIX timestamp of the stop time of this job, if any)
"""
return state.job_table()


def nodes():
"""Get a list of the nodes in the cluster.

Expand Down
9 changes: 9 additions & 0 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2432,6 +2432,9 @@ def test_global_state_api(shutdown_only):
with pytest.raises(Exception):
ray.nodes()

with pytest.raises(Exception):
ray.jobs()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to make sure it actually raises the error message we expect.

with pytest.raises(Exception, match="The ray global state API cannot be used before ray.init has been called."):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})

resources = {"CPU": 5, "GPU": 3, "CustomResource": 1}
Expand Down Expand Up @@ -2509,6 +2512,12 @@ def wait_for_object_table():
object_table_entry = ray.objects(result_id)
assert object_table[result_id] == object_table_entry

job_table = ray.jobs()

assert len(job_table) == 1
assert job_table[0]["JobID"] == job_id
assert job_table[0]["NodeManagerAddress"] == node_ip_address


# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we
# should use those, but they seem to conflict with Ray's use of faulthandler.
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,8 @@ void TestLogSubscribeAll(const JobID &job_id,
auto subscribe_callback = [job_ids](gcs::AsyncGcsClient *client) {
// We have subscribed. Do the writes to the table.
for (size_t i = 0; i < job_ids.size(); i++) {
RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false));
RAY_CHECK_OK(client->job_table().AppendJobData(job_ids[i], false,
0, "localhost", 1));
}
};

Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,14 @@ std::string ProfileTable::DebugString() const {
return Log<UniqueID, ProfileTableData>::DebugString();
}

Status JobTable::AppendJobData(const JobID &job_id, bool is_dead) {
Status JobTable::AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could just compute the timestamp inside this method instead of passing it in, but I don't really have a preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have it outside, that makes it possible for somebody to use this function to use this function with a different timestamp than the current time.

const std::string& node_manager_address, int64_t driver_pid) {
auto data = std::make_shared<JobTableData>();
data->set_job_id(job_id.Binary());
data->set_is_dead(is_dead);
data->set_timestamp(timestamp);
data->set_node_manager_address(node_manager_address);
data->set_driver_pid(driver_pid);
return Append(JobID(job_id), job_id, data, /*done_callback=*/nullptr);
}

Expand Down
6 changes: 5 additions & 1 deletion src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,12 @@ class JobTable : public Log<JobID, JobTableData> {
///
/// \param job_id The job id.
/// \param is_dead Whether the job is dead.
/// \param timestamp The UNIX timestamp when the driver was started/stopped.
/// \param node_manager_address IP address of the node the driver is running on.
/// \param driver_pid Process ID of the driver process.
/// \return The return status.
Status AppendJobData(const JobID &job_id, bool is_dead);
Status AppendJobData(const JobID &job_id, bool is_dead, int64_t timestamp,
const std::string& node_manager_address, int64_t driver_pid);
};

/// Actor table starts with an ALIVE entry, which represents the first time the actor
Expand Down
6 changes: 6 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ message JobTableData {
bytes job_id = 1;
// Whether it's dead.
bool is_dead = 2;
// The UNIX timestamp corresponding to this event (job added or removed).
int64 timestamp = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to declare 2 fields start_timestamp and stop_timestamp explicitly instead of 1 timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each entry in the JobTable is a log entry (either job addition, or job removal, if is_dead = true), so a single timestamp is indeed the more natural representation here. They get aggregated into start time and stop time in the client API.

// IP of the node this job was started on.
string node_manager_address = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just node address?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to be consistent with the ClientTable

// Process ID of the driver running this job.
int64 driver_pid = 5;
}

// This table stores the actor checkpoint data. An actor checkpoint
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ int main(int argc, char *argv[]) {
ray::raylet::ResourceSet(std::move(static_resource_conf));
RAY_LOG(DEBUG) << "Starting raylet with static resource configuration: "
<< node_manager_config.resource_config.ToString();
node_manager_config.node_manager_address = node_ip_address;
node_manager_config.node_manager_port = node_manager_port;
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.num_workers_per_process =
Expand Down
14 changes: 12 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -854,12 +854,19 @@ void NodeManager::ProcessRegisterClientRequestMessage(
} else {
// Register the new driver.
const WorkerID driver_id = from_flatbuf<WorkerID>(*message->worker_id());
const JobID job_id = from_flatbuf<JobID>(*message->job_id());
// Compute a dummy driver task id from a given driver.
const TaskID driver_task_id = TaskID::ComputeDriverTaskId(driver_id);
worker->AssignTaskId(driver_task_id);
worker->AssignJobId(from_flatbuf<JobID>(*message->job_id()));
worker->AssignJobId(job_id);
worker_pool_.RegisterDriver(std::move(worker));
local_queues_.AddDriverTaskId(driver_task_id);
RAY_CHECK_OK(
gcs_client_->job_table().AppendJobData(job_id,
/*is_dead=*/false,
std::time(nullptr),
initial_config_.node_manager_address,
message->worker_pid()));
}
}

Expand Down Expand Up @@ -1026,7 +1033,10 @@ void NodeManager::ProcessDisconnectClientMessage(
} else if (is_driver) {
// The client is a driver.
RAY_CHECK_OK(gcs_client_->job_table().AppendJobData(JobID(client->GetClientId()),
/*is_dead=*/true));
/*is_dead=*/true,
std::time(nullptr),
initial_config_.node_manager_address,
worker->Pid()));
auto job_id = worker->GetAssignedTaskId();
RAY_CHECK(!job_id.IsNil());
local_queues_.RemoveDriverTaskId(job_id);
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ using rpc::JobTableData;
struct NodeManagerConfig {
/// The node's resource configuration.
ResourceSet resource_config;
/// The IP address this node manager is running on.
std::string node_manager_address;
/// The port to use for listening to incoming connections. If this is 0 then
/// the node manager will choose its own port.
int node_manager_port;
Expand Down