diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index fa4d0ca2ee5e..bcf2e8b25640 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -96,8 +96,6 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request, reply, send_reply_callback = std::move(send_reply_callback)](const Status &status) { - RAY_CHECK(thread_checker_.IsOnSameThread()); - if (!status.ok()) { RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id << ", driver pid = " << job_table_data.driver_pid(); @@ -117,7 +115,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request, // multiple times and requires idempotency (i.e. due to retry). running_job_ids_.insert(job_id); } - WriteDriverJobExportEvent(mutable_job_table_data); + WriteDriverJobExportEvent(job_table_data); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; @@ -138,8 +136,6 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data, job_table_data.set_is_dead(true); auto on_done = [this, job_id, job_table_data, done_callback = std::move(done_callback)]( const Status &status) { - RAY_CHECK(thread_checker_.IsOnSameThread()); - if (!status.ok()) { RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id; } else { @@ -150,6 +146,13 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data, } function_manager_.RemoveJobReference(job_id); WriteDriverJobExportEvent(job_table_data); + + // Update running job status. + auto iter = running_job_ids_.find(job_id); + RAY_CHECK(iter != running_job_ids_.end()); + running_job_ids_.erase(iter); + ++finished_jobs_count_; + done_callback(status); }; diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.h b/src/ray/gcs/gcs_server/gcs_job_manager.h index 887d7635522c..b74b24f3e1d1 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.h +++ b/src/ray/gcs/gcs_server/gcs_job_manager.h @@ -96,16 +96,17 @@ class GcsJobManager : public rpc::JobInfoHandler { void WriteDriverJobExportEvent(rpc::JobTableData job_data) const; + /// Record metrics. + /// For job manager, (1) running jobs count gauge and (2) new finished jobs (whether + /// succeed or fail) will be reported periodically. + void RecordMetrics(); + private: void ClearJobInfos(const rpc::JobTableData &job_data); void MarkJobAsFinished(rpc::JobTableData job_table_data, std::function done_callback); - // Used to validate invariants for threading; for example, all callbacks are executed on - // the same thread. - ThreadChecker thread_checker_; - // Running Job IDs, used to report metrics. absl::flat_hash_set running_job_ids_;