Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS] Optimize GetAllJobInfo API for performance #47530

Merged
merged 12 commits into from
Sep 11, 2024

Conversation

liuxsh9
Copy link
Contributor

@liuxsh9 liuxsh9 commented Sep 6, 2024

Why are these changes needed?

The GetAllJobInfo API is frequently called by job, log management, and dashboard data collection interfaces. Currently, the API retrieves three fields: 1) job table data, 2) is_running_tasks, and 3) JobInfo. The latter two fields are costly, but they appear to be unused (based on my knowledge and search). To improve performance, we added two optional parameters to control whether to retrieve those two fields (defaulting to not). This optimization reduces the API response time from 1s to approximately 0.05s when there are hundreds of jobs in the cluster. If the latter two fields are indeed unnecessary, consider removing the collection operations altogether.

image

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: liuxsh9 <[email protected]>
Copy link
Contributor

@rynewang rynewang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM! One issue though: the field is used by places like:

get_total_num_running_jobs_to_report

@routes.get("/api/jobs/")

/api/jobs/{job_or_submission_id}

get_job_info

And those APIs may be consumed by external code. To keep backwards compatibility can you add args to the web APIs with default fetch=True? Then in call sites like

job_infos = await self.client.get_job_info(timeout=DEFAULT_RPC_TIMEOUT)
we can set fetch=False.

Especially, we have a @PublicAPI , @jjyao can you help evaluate if this PublicAPI contained such fields, and shall we change its semantics for this better perf?

@@ -86,7 +87,8 @@ def __init__(self, inner, loop, executor):

def _function_to_async(self, func):
async def wrapper(*args, **kwargs):
return await self.loop.run_in_executor(self.executor, func, *args, **kwargs)
partial_func = partial(func, *args, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Let's avoid bundling up unrelated changes

Copy link
Contributor Author

@liuxsh9 liuxsh9 Sep 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discovered that run_in_executor doesn't support **kwargs ref: asyncio.loop.run_in_executor. The implicit params passing was exposing this issue. For now, we've changed to explicit params passing. So have reverted this change.
Anyway, should we adjust it to accommodate run_in_executor's limitations?

@@ -2907,18 +2907,23 @@ cdef class OldGcsClient:
return result

@_auto_reconnect
def get_all_job_info(self, timeout=None) -> Dict[JobID, JobTableData]:
def get_all_job_info(self, timeout=None, **kwargs) -> Dict[JobID, JobTableData]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if we can make the fields just defined here, instead of **kwargs:

def get_all_job_info(self, query_job_info_field, c_query_is_running_tasks_field, timeout=None) -> Dict[JobID, JobTableData]:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Let's err on the side of properly codifying params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, have updated to use explicit field definitions instead.

@@ -433,20 +433,28 @@ cdef class NewGcsClient:
#############################################################

def get_all_job_info(
self, timeout: Optional[float] = None
self, timeout: Optional[float] = None, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, explicit args

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

reply->mutable_job_info_list(i)->set_is_running_tasks(false);
core_worker_clients_.Disconnect(worker_id);
if (request.has_query_is_running_tasks_field() &&
request.query_is_running_tasks_field()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can omit the has_ check. if it's not set, query_is_running_tasks_field() defaults to false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion!

@liuxsh9
Copy link
Contributor Author

liuxsh9 commented Sep 7, 2024

Generally LGTM! One issue though: the field is used by places like:

get_total_num_running_jobs_to_report

@routes.get("/api/jobs/")

/api/jobs/{job_or_submission_id}

get_job_info

And those APIs may be consumed by external code. To keep backwards compatibility can you add args to the web APIs with default fetch=True? Then in call sites like

job_infos = await self.client.get_job_info(timeout=DEFAULT_RPC_TIMEOUT)

we can set fetch=False.

Sorry, I am a little confused. Do you mean we're still fetching all 3 fields by default, and then setting query_x_field=False in individual APIs that definitely don't need them?

@jjyao
Copy link
Collaborator

jjyao commented Sep 7, 2024

Sorry, I am a little confused. Do you mean we're still fetching all 3 fields by default, and then setting query_x_field=False in individual APIs that definitely don't need them?

Yes.

@liuxsh9
Copy link
Contributor Author

liuxsh9 commented Sep 9, 2024

Updated the default to retrieving all fields, like before. To accommodate proto3's boolean fields defaulting to false, we've changed the params to skip_x_field. So now we can set skip_x_field=True when method doesn't need the field. Additionally, we've optimized functions like get_driver_jobs to use GetAllJobInfo with selective field retrieval.

@jjyao jjyao added the go add ONLY when ready to merge, run all tests label Sep 9, 2024
Signed-off-by: liuxsh9 <[email protected]>
Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lg, thanks!

python/ray/_private/gcs_aio_client.py Outdated Show resolved Hide resolved
Comment on lines 86 to 87
/*skip_job_info_field=*/false,
/*skip_is_running_tasks_field=*/false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can those be True as well If they are not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes!

Comment on lines 71 to 72
/*skip_job_info_field=*/false,
/*skip_is_running_tasks_field=*/false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can those be True? Are those two fields used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlobalStateAccessor::GetAllJobInfo() is invoked by multiple functions, including

def job_table(self):

which explicitly sets two params to True as it doesn't utilize those fields.

And

may be called by external methods, so we're leaving it unchanged and defaulting to False.

To accommodate these varying callers, have added explicit skip params to GetAllJobInfo().

@jjyao
Copy link
Collaborator

jjyao commented Sep 10, 2024

Failed tests are probably related.

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

The remaining comments are about changing them to keyword arguments.

python/ray/_private/gcs_aio_client.py Show resolved Hide resolved
@@ -181,7 +181,7 @@ def job_table(self):
"""
self._check_connected()

job_table = self.global_state_accessor.get_job_table()
job_table = self.global_state_accessor.get_job_table(True, True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using keyword arguments otherwise it's hard to know what these two True are

def get_all_job_info(self, job_or_submission_id: str = None,
timeout=None) -> Dict[JobID, JobTableData]:
def get_all_job_info(
self, job_or_submission_id: str = None, skip_submission_job_info_field=False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, job_or_submission_id: str = None, skip_submission_job_info_field=False,
self, *, job_or_submission_id: str = None, skip_submission_job_info_field=False,

@@ -48,10 +48,16 @@ cdef class GlobalStateAccessor:
with nogil:
self.inner.get().Disconnect()

def get_job_table(self):
def get_job_table(
self, skip_submission_job_info_field=False, skip_is_running_tasks_field=False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, skip_submission_job_info_field=False, skip_is_running_tasks_field=False
self, *, skip_submission_job_info_field=False, skip_is_running_tasks_field=False

reply->mutable_job_info_list(i)->set_is_running_tasks(false);
core_worker_clients_.Disconnect(worker_id);
(*num_processed_jobs)++;
;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be deleted

@@ -434,11 +434,15 @@ cdef class NewGcsClient:

def get_all_job_info(
self, job_or_submission_id: Optional[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, job_or_submission_id: Optional[str] = None,
self, *, job_or_submission_id: Optional[str] = None,

@@ -447,17 +451,22 @@ cdef class NewGcsClient:
make_optional[c_string](c_job_or_submission_id)
with nogil:
status = self.inner.get().Jobs().GetAll(
c_optional_job_or_submission_id, reply, timeout_ms)
c_optional_job_or_submission_id, c_skip_submission_job_info_field,
c_skip_is_running_tasks_field, reply, timeout_ms)
return raise_or_return((convert_get_all_job_info(status, move(reply))))

def async_get_all_job_info(
self, job_or_submission_id: Optional[str] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self, job_or_submission_id: Optional[str] = None,
self, *, job_or_submission_id: Optional[str] = None,

@liuxsh9
Copy link
Contributor Author

liuxsh9 commented Sep 11, 2024

Updates applied, kindly review at your convenience. @jjyao

@jjyao jjyao merged commit 432dbce into ray-project:master Sep 11, 2024
5 checks passed
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants