Skip to content

Commit

Permalink
[core][dashboard] push down job_or_submission_id to GCS. (ray-project…
Browse files Browse the repository at this point in the history
…#47492)

GCS API GetAllJobInfo serves Dashboard APIs, even for only 1 job. This becomes slow when the number of jobs are high. This PR pushes down the job filter to GCS to save Dashboard workload.

This API is kind of strange because the filter `job_or_submission_id` is actually Either a Job ID Or a job_submission_id. We don't have an index on the latter, and some jobs don't have one. So we still GetAll from Redis; and filter by both IDs after that and before doing more RPC calls.

---------

Signed-off-by: Ruiyang Wang <[email protected]>
Signed-off-by: Jiajun Yao <[email protected]>
Co-authored-by: Jiajun Yao <[email protected]>
Co-authored-by: Alexey Kudinkin <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
3 people authored and ujjawal-khare committed Oct 15, 2024
1 parent a6a63e2 commit 6e790d9
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 173 deletions.
10 changes: 1 addition & 9 deletions python/ray/_private/gcs_aio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,10 @@ async def internal_kv_keys(

async def get_all_job_info(
self,
*,
job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[float] = None,
) -> Dict[JobID, gcs_pb2.JobTableData]:
"""
Return dict key: bytes of job_id; value: JobTableData pb message.
"""
return await self._async_proxy.get_all_job_info(
job_or_submission_id=job_or_submission_id,
skip_submission_job_info_field=skip_submission_job_info_field,
skip_is_running_tasks_field=skip_is_running_tasks_field,
timeout=timeout,
)
return await self._async_proxy.get_all_job_info(job_or_submission_id, timeout)
9 changes: 3 additions & 6 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2910,10 +2910,8 @@ cdef class OldGcsClient:
return result

@_auto_reconnect
def get_all_job_info(
self, *, job_or_submission_id: str = None, skip_submission_job_info_field=False,
skip_is_running_tasks_field=False, timeout=None
) -> Dict[JobID, JobTableData]:
def get_all_job_info(self, job_or_submission_id: str = None,
timeout=None) -> Dict[JobID, JobTableData]:
# Ideally we should use json_format.MessageToDict(job_info),
# but `job_info` is a cpp pb message not a python one.
# Manually converting each and every protobuf field is out of question,
Expand All @@ -2933,8 +2931,7 @@ cdef class OldGcsClient:
make_optional[c_string](c_job_or_submission_id)
with nogil:
check_status(self.inner.get().GetAllJobInfo(
c_optional_job_or_submission_id, c_skip_submission_job_info_field,
c_skip_is_running_tasks_field, timeout_ms, c_job_infos))
c_optional_job_or_submission_id, timeout_ms, c_job_infos))
for c_job_info in c_job_infos:
serialized_job_infos.push_back(c_job_info.SerializeAsString())
result = {}
Expand Down
5 changes: 1 addition & 4 deletions python/ray/dashboard/modules/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ async def get_driver_jobs(
jobs with the job id or submission id.
"""
job_infos = await gcs_aio_client.get_all_job_info(
job_or_submission_id=job_or_submission_id,
skip_submission_job_info_field=True,
skip_is_running_tasks_field=True,
timeout=timeout,
job_or_submission_id=job_or_submission_id, timeout=timeout
)
# Sort jobs from GCS to follow convention of returning only last driver
# of submission job.
Expand Down
9 changes: 2 additions & 7 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -407,15 +407,11 @@ cdef extern from "ray/gcs/gcs_client/accessor.h" nogil:
cdef cppclass CJobInfoAccessor "ray::gcs::JobInfoAccessor":
CRayStatus GetAll(
const optional[c_string] &job_or_submission_id,
c_bool skip_submission_job_info_field,
c_bool skip_is_running_tasks_field,
c_vector[CJobTableData] &result,
int64_t timeout_ms)

CRayStatus AsyncGetAll(
const optional[c_string] &job_or_submission_id,
c_bool skip_submission_job_info_field,
c_bool skip_is_running_tasks_field,
const MultiItemPyCallback[CJobTableData] &callback,
int64_t timeout_ms)

Expand Down Expand Up @@ -629,9 +625,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
CRayStatus GetAllNodeInfo(
int64_t timeout_ms, c_vector[CGcsNodeInfo]& result)
CRayStatus GetAllJobInfo(
const optional[c_string] &job_or_submission_id,
c_bool skip_submission_job_info_field, c_bool skip_is_running_tasks_field,
int64_t timeout_ms, c_vector[CJobTableData]& result)
const optional[c_string] &job_or_submission_id, int64_t timeout_ms,
c_vector[CJobTableData]& result)
CRayStatus GetAllResourceUsage(
int64_t timeout_ms, c_string& serialized_reply)
CRayStatus RequestClusterResourceConstraint(
Expand Down
13 changes: 3 additions & 10 deletions python/ray/includes/gcs_client.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,7 @@ cdef class NewGcsClient:
#############################################################

def get_all_job_info(
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
self, job_or_submission_id: Optional[str] = None,
timeout: Optional[float] = None
) -> Dict[JobID, gcs_pb2.JobTableData]:
cdef c_string c_job_or_submission_id
Expand All @@ -451,14 +449,11 @@ cdef class NewGcsClient:
make_optional[c_string](c_job_or_submission_id)
with nogil:
status = self.inner.get().Jobs().GetAll(
c_optional_job_or_submission_id, c_skip_submission_job_info_field,
c_skip_is_running_tasks_field, reply, timeout_ms)
c_optional_job_or_submission_id, reply, timeout_ms)
return raise_or_return((convert_get_all_job_info(status, move(reply))))

def async_get_all_job_info(
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
self, job_or_submission_id: Optional[str] = None,
timeout: Optional[float] = None
) -> Future[Dict[JobID, gcs_pb2.JobTableData]]:
cdef:
Expand All @@ -476,8 +471,6 @@ cdef class NewGcsClient:
check_status_timeout_as_rpc_error(
self.inner.get().Jobs().AsyncGetAll(
c_optional_job_or_submission_id,
c_skip_submission_job_info_field,
c_skip_is_running_tasks_field,
MultiItemPyCallback[CJobTableData](
&convert_get_all_job_info,
assign_and_decrement_fut,
Expand Down
2 changes: 0 additions & 2 deletions src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class MockJobInfoAccessor : public JobInfoAccessor {
MOCK_METHOD(Status,
AsyncGetAll,
(const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
const MultiItemCallback<rpc::JobTableData> &callback,
int64_t timeout_ms),
(override));
Expand Down
15 changes: 2 additions & 13 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ Status JobInfoAccessor::AsyncSubscribeAll(
done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(/*job_or_submission_id=*/std::nullopt,
/*skip_submission_job_info_field=*/true,
/*skip_is_running_tasks_field=*/true,
callback,
/*timeout_ms=*/-1));
RAY_CHECK_OK(
AsyncGetAll(/*job_or_submission_id=*/std::nullopt, callback, /*timeout_ms=*/-1));
};
subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
return client_impl_->GetGcsSubscriber().SubscribeAllJobs(subscribe, done);
Expand All @@ -110,15 +107,11 @@ void JobInfoAccessor::AsyncResubscribe() {

Status JobInfoAccessor::AsyncGetAll(
const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
const MultiItemCallback<rpc::JobTableData> &callback,
int64_t timeout_ms) {
RAY_LOG(DEBUG) << "Getting all job info.";
RAY_CHECK(callback);
rpc::GetAllJobInfoRequest request;
request.set_skip_submission_job_info_field(skip_submission_job_info_field);
request.set_skip_is_running_tasks_field(skip_is_running_tasks_field);
if (job_or_submission_id.has_value()) {
request.set_job_or_submission_id(job_or_submission_id.value());
}
Expand All @@ -133,13 +126,9 @@ Status JobInfoAccessor::AsyncGetAll(
}

Status JobInfoAccessor::GetAll(const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
std::vector<rpc::JobTableData> &job_data_list,
int64_t timeout_ms) {
rpc::GetAllJobInfoRequest request;
request.set_skip_submission_job_info_field(skip_submission_job_info_field);
request.set_skip_is_running_tasks_field(skip_is_running_tasks_field);
if (job_or_submission_id.has_value()) {
request.set_job_or_submission_id(job_or_submission_id.value());
}
Expand Down
4 changes: 0 additions & 4 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ class JobInfoAccessor {
/// \param callback Callback that will be called after lookup finished.
/// \return Status
virtual Status AsyncGetAll(const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
const MultiItemCallback<rpc::JobTableData> &callback,
int64_t timeout_ms);

Expand All @@ -274,8 +272,6 @@ class JobInfoAccessor {
/// \param timeout_ms -1 means infinite.
/// \return Status
virtual Status GetAll(const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
std::vector<rpc::JobTableData> &job_data_list,
int64_t timeout_ms);

Expand Down
4 changes: 0 additions & 4 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,17 +504,13 @@ Status PythonGcsClient::GetAllNodeInfo(int64_t timeout_ms,

Status PythonGcsClient::GetAllJobInfo(
const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
int64_t timeout_ms,
std::vector<rpc::JobTableData> &result) {
grpc::ClientContext context;
PrepareContext(context, timeout_ms);

absl::ReaderMutexLock lock(&mutex_);
rpc::GetAllJobInfoRequest request;
request.set_skip_submission_job_info_field(skip_submission_job_info_field);
request.set_skip_is_running_tasks_field(skip_is_running_tasks_field);
if (job_or_submission_id.has_value()) {
request.set_job_or_submission_id(job_or_submission_id.value());
}
Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ class RAY_EXPORT PythonGcsClient {
Status PinRuntimeEnvUri(const std::string &uri, int expiration_s, int64_t timeout_ms);
Status GetAllNodeInfo(int64_t timeout_ms, std::vector<rpc::GcsNodeInfo> &result);
Status GetAllJobInfo(const std::optional<std::string> &job_or_submission_id,
bool skip_submission_job_info_field,
bool skip_is_running_tasks_field,
int64_t timeout_ms,
std::vector<rpc::JobTableData> &result);
Status GetAllResourceUsage(int64_t timeout_ms, std::string &serialized_reply);
Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ std::vector<std::string> GlobalStateAccessor::GetAllJobInfo(
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Jobs().AsyncGetAll(
/*job_or_submission_id=*/std::nullopt,
skip_submission_job_info_field,
skip_is_running_tasks_field,
TransformForMultiItemCallback<rpc::JobTableData>(job_table_data, promise),
/*timeout_ms=*/-1));
}
Expand Down
Loading

0 comments on commit 6e790d9

Please sign in to comment.