Skip to content

Commit

Permalink
[core] Add metrics for gcs jobs (ray-project#47793)
Browse files Browse the repository at this point in the history
This PR adds metrics for job states within job manager.

In detail, a gauge stats is sent via opencensus exporter, so running ray
jobs could be tracked and alerts could be created later on.

Fault tolerance is not considered, according to
[doc](https://docs.ray.io/en/latest/ray-core/fault_tolerance/gcs.html),
state is re-constructed at restart.

On testing, the best way is to observe via opencensus backend (i.e.
google monitoring dashboard), but not easy for open-source contributors;
or to have a mock / fake exporter implementation, which I don't find in
the code base.

Signed-off-by: dentiny <[email protected]>
Co-authored-by: Ruiyang Wang <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
2 people authored and ujjawal-khare committed Oct 15, 2024
1 parent f0204bc commit 2a1bd56
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 30 deletions.
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ GcsActorManager::GcsActorManager(
actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()) {
RAY_CHECK(worker_client_factory_);
RAY_CHECK(destroy_owned_placement_group_if_needed_);
actor_state_counter_.reset(
new CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>());
actor_state_counter_ = std::make_shared<
CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>>();
actor_state_counter_->SetOnChangeCallback(
[this](const std::pair<rpc::ActorTableData::ActorState, std::string> key) mutable {
int64_t num_actors = actor_state_counter_->Get(key);
Expand Down
70 changes: 51 additions & 19 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@
#include "ray/gcs/gcs_server/gcs_job_manager.h"

#include "ray/gcs/pb_util.h"
#include "ray/stats/metric.h"

namespace ray {
namespace gcs {

void GcsJobManager::Initialize(const GcsInitData &gcs_init_data) {
for (auto &pair : gcs_init_data.Jobs()) {
const auto &job_id = pair.first;
const auto &job_table_data = pair.second;
for (const auto &[job_id, job_table_data] : gcs_init_data.Jobs()) {
cached_job_configs_[job_id] =
std::make_shared<rpc::JobConfig>(job_table_data.config());
function_manager_.AddJobReference(job_id);

// Recover [running_job_ids_] from storage.
if (!job_table_data.is_dead()) {
running_job_ids_.insert(job_id);
}
}
}

Expand Down Expand Up @@ -82,28 +86,36 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
auto time = current_sys_time_ms();
mutable_job_table_data.set_start_time(time);
mutable_job_table_data.set_timestamp(time);
JobID job_id = JobID::FromBinary(mutable_job_table_data.job_id());
const JobID job_id = JobID::FromBinary(mutable_job_table_data.job_id());
RAY_LOG(INFO) << "Adding job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();

auto on_done = [this, job_id, mutable_job_table_data, reply, send_reply_callback](
const Status &status) {
auto on_done = [this,
job_id,
job_table_data = mutable_job_table_data,
reply,
send_reply_callback =
std::move(send_reply_callback)](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();
<< ", driver pid = " << job_table_data.driver_pid();
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, mutable_job_table_data, nullptr));
if (mutable_job_table_data.config().has_runtime_env_info()) {
runtime_env_manager_.AddURIReference(
job_id.Hex(), mutable_job_table_data.config().runtime_env_info());
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, /*done=*/nullptr));
if (job_table_data.config().has_runtime_env_info()) {
runtime_env_manager_.AddURIReference(job_id.Hex(),
job_table_data.config().runtime_env_info());
}
function_manager_.AddJobReference(job_id);
RAY_LOG(INFO) << "Finished adding job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();
<< ", driver pid = " << job_table_data.driver_pid();
cached_job_configs_[job_id] =
std::make_shared<rpc::JobConfig>(mutable_job_table_data.config());
std::make_shared<rpc::JobConfig>(job_table_data.config());

// Intentionally not checking return value, since the function could be invoked for
// multiple times and requires idempotency (i.e. due to retry).
running_job_ids_.insert(job_id);
}
WriteDriverJobExportEvent(mutable_job_table_data);
WriteDriverJobExportEvent(job_table_data);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};

Expand All @@ -122,7 +134,8 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
job_table_data.set_timestamp(time);
job_table_data.set_end_time(time);
job_table_data.set_is_dead(true);
auto on_done = [this, job_id, job_table_data, done_callback](const Status &status) {
auto on_done = [this, job_id, job_table_data, done_callback = std::move(done_callback)](
const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
Expand All @@ -133,6 +146,13 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
}
function_manager_.RemoveJobReference(job_id);
WriteDriverJobExportEvent(job_table_data);

// Update running job status.
auto iter = running_job_ids_.find(job_id);
RAY_CHECK(iter != running_job_ids_.end());
running_job_ids_.erase(iter);
++finished_jobs_count_;

done_callback(status);
};

Expand All @@ -147,21 +167,28 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
rpc::SendReplyCallback send_reply_callback) {
const JobID job_id = JobID::FromBinary(request.job_id());

auto send_reply = [send_reply_callback, reply](Status status) {
auto send_reply = [send_reply_callback = std::move(send_reply_callback),
reply](Status status) {
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};

Status status = gcs_table_storage_->JobTable().Get(
job_id,
[this, job_id, send_reply](Status status,
[this, job_id, send_reply](const Status &status,
const std::optional<rpc::JobTableData> &result) {
if (status.ok() && result) {
MarkJobAsFinished(*result, send_reply);
} else {
return;
}

if (!result.has_value()) {
RAY_LOG(ERROR) << "Tried to mark job " << job_id
<< " as finished, but there was no record of it starting!";
send_reply(status);
} else if (!status.ok()) {
RAY_LOG(ERROR) << "Fails to mark job " << job_id << " as finished due to "
<< status;
}
send_reply(status);
});
if (!status.ok()) {
send_reply(status);
Expand Down Expand Up @@ -438,5 +465,10 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) {
RAY_CHECK_OK(gcs_table_storage_->JobTable().GetAll(on_done));
}

void GcsJobManager::RecordMetrics() {
ray::stats::STATS_running_jobs.Record(running_job_ids_.size());
ray::stats::STATS_finished_jobs.Record(finished_jobs_count_);
}

} // namespace gcs
} // namespace ray
29 changes: 24 additions & 5 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@

#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/runtime_env_manager.h"
#include "ray/gcs/gcs_server/gcs_function_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
Expand Down Expand Up @@ -88,7 +96,23 @@ class GcsJobManager : public rpc::JobInfoHandler {

void WriteDriverJobExportEvent(rpc::JobTableData job_data) const;

/// Record metrics.
/// For job manager, (1) running jobs count gauge and (2) new finished jobs (whether
/// succeed or fail) will be reported periodically.
void RecordMetrics();

private:
void ClearJobInfos(const rpc::JobTableData &job_data);

void MarkJobAsFinished(rpc::JobTableData job_table_data,
std::function<void(Status)> done_callback);

// Running Job IDs, used to report metrics.
absl::flat_hash_set<JobID> running_job_ids_;

// Number of finished jobs since start of this GCS Server, used to report metrics.
int64_t finished_jobs_count_ = 0;

std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;

Expand All @@ -104,11 +128,6 @@ class GcsJobManager : public rpc::JobInfoHandler {

/// The cached core worker clients which are used to communicate with workers.
rpc::CoreWorkerClientPool core_worker_clients_;

void ClearJobInfos(const rpc::JobTableData &job_data);

void MarkJobAsFinished(rpc::JobTableData job_table_data,
std::function<void(Status)> done_callback);
};

} // namespace gcs
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ void GcsServer::RecordMetrics() const {
gcs_actor_manager_->RecordMetrics();
gcs_placement_group_manager_->RecordMetrics();
gcs_task_manager_->RecordMetrics();
gcs_job_manager_->RecordMetrics();
execute_after(
main_service_,
[this] { RecordMetrics(); },
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_table_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ class GcsTableWithJobId : public GcsTable<Key, Data> {
/// \param key The key that will be written to the table. The job id can be obtained
/// from the key.
/// \param value The value of the key that will be written to the table.
/// \param callback Callback that will be called after write finishes.
/// \return Status
/// \param callback Callback that will be called after write finishes, whether it
/// succeeds or not. \return Status for issuing the asynchronous write operation.
Status Put(const Key &key, const Data &value, const StatusCallback &callback) override;

/// Get all the data of the specified job id from the table asynchronously.
Expand Down
15 changes: 15 additions & 0 deletions src/ray/stats/metric_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ DEFINE_stats(actors,
(),
ray::stats::GAUGE);

/// Job related stats.
DEFINE_stats(running_jobs,
"Number of jobs currently running.",
/*tags=*/(),
/*buckets=*/(),
ray::stats::GAUGE);

DEFINE_stats(finished_jobs,
"Number of jobs finished.",
// TODO(hjiang): Consider adding task completion status, for example, failed,
// completed in tags.
/*tags=*/(),
/*buckets=*/(),
ray::stats::COUNT);

/// Logical resource usage reported by raylets.
DEFINE_stats(resources,
// TODO(sang): Support placement_group_reserved_available | used
Expand Down
4 changes: 4 additions & 0 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ DECLARE_stats(tasks);
/// Actor stats, broken down by state.
DECLARE_stats(actors);

/// Job stats.
DECLARE_stats(running_jobs);
DECLARE_stats(finished_jobs);

/// Placement group stats, broken down by state.
DECLARE_stats(placement_groups);

Expand Down
5 changes: 3 additions & 2 deletions src/ray/util/counter_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <list>
#include <utility>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
Expand All @@ -35,7 +36,7 @@
template <typename K>
class CounterMap {
public:
CounterMap(){};
CounterMap() = default;

CounterMap(const CounterMap &other) = delete;

Expand All @@ -45,7 +46,7 @@ class CounterMap {
/// Changes are buffered until `FlushOnChangeCallbacks()` is called to enable
/// batching for performance reasons.
void SetOnChangeCallback(std::function<void(const K &)> on_change) {
on_change_ = on_change;
on_change_ = std::move(on_change);
}

/// Flush any pending on change callbacks.
Expand Down

0 comments on commit 2a1bd56

Please sign in to comment.