Skip to content

Commit

Permalink
[core] Add thread check to job mgr callback (ray-project#48005)
Browse files Browse the repository at this point in the history
This PR followup for comment
ray-project#47793 (comment),
and adds a thread checking to GCS job manager callback to make sure no
concurrent access for data members.

Signed-off-by: dentiny <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
dentiny authored and ujjawal-khare committed Oct 15, 2024
1 parent 93b4a6c commit 2394e58
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 0 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ ray_cc_library(
"@boost//:bimap",
"@com_github_grpc_grpc//src/proto/grpc/health/v1:health_proto",
"@com_google_absl//absl/container:btree",
"//src/ray/util:thread_checker",
],
)

Expand Down
10 changes: 10 additions & 0 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
reply,
send_reply_callback =
std::move(send_reply_callback)](const Status &status) {
RAY_CHECK(thread_checker_.IsOnSameThread());

if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << job_table_data.driver_pid();
Expand Down Expand Up @@ -136,6 +138,8 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
job_table_data.set_is_dead(true);
auto on_done = [this, job_id, job_table_data, done_callback = std::move(done_callback)](
const Status &status) {
RAY_CHECK(thread_checker_.IsOnSameThread());

if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
Expand Down Expand Up @@ -176,6 +180,8 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
job_id,
[this, job_id, send_reply](const Status &status,
const std::optional<rpc::JobTableData> &result) {
RAY_CHECK(thread_checker_.IsOnSameThread());

if (status.ok() && result) {
MarkJobAsFinished(*result, send_reply);
return;
Expand Down Expand Up @@ -266,6 +272,8 @@ void GcsJobManager::HandleGetAllJobInfo(rpc::GetAllJobInfoRequest request,
};
auto on_done = [this, filter_ok, request, reply, send_reply_callback, limit](
const absl::flat_hash_map<JobID, JobTableData> &&result) {
RAY_CHECK(thread_checker_.IsOnSameThread());

// Internal KV keys for jobs that were submitted via the Ray Job API.
std::vector<std::string> job_api_data_keys;

Expand Down Expand Up @@ -447,6 +455,8 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) {
<< "Node failed, mark all jobs from this node as finished";

auto on_done = [this, node_id](const absl::flat_hash_map<JobID, JobTableData> &result) {
RAY_CHECK(thread_checker_.IsOnSameThread());

// If job is not dead and from driver in current node, then mark it as finished
for (auto &data : result) {
if (!data.second.is_dead() &&
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/rpc/worker/core_worker_client_pool.h"
#include "ray/util/event.h"
#include "ray/util/thread_checker.h"

namespace ray {
namespace gcs {
Expand Down Expand Up @@ -107,6 +108,10 @@ class GcsJobManager : public rpc::JobInfoHandler {
void MarkJobAsFinished(rpc::JobTableData job_table_data,
std::function<void(Status)> done_callback);

// Used to validate invariants for threading; for example, all callbacks are executed on
// the same thread.
ThreadChecker thread_checker_;

// Running Job IDs, used to report metrics.
absl::flat_hash_set<JobID> running_job_ids_;

Expand Down

0 comments on commit 2394e58

Please sign in to comment.