Skip to content

Commit

Permalink
gcs job metrics
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Sep 23, 2024
1 parent 18b2d94 commit 9e1f214
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,8 @@ GcsActorManager::GcsActorManager(
actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()) {
RAY_CHECK(worker_client_factory_);
RAY_CHECK(destroy_owned_placement_group_if_needed_);
actor_state_counter_.reset(
new CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>());
actor_state_counter_ = std::make_shared<
CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>>();
actor_state_counter_->SetOnChangeCallback(
[this](const std::pair<rpc::ActorTableData::ActorState, std::string> key) mutable {
int64_t num_actors = actor_state_counter_->Get(key);
Expand Down
23 changes: 21 additions & 2 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
#include "ray/gcs/gcs_server/gcs_job_manager.h"

#include "ray/gcs/pb_util.h"
#include "ray/stats/metric.h"

namespace ray {
namespace gcs {

namespace {
// Job state string used for metrics upload.
constexpr const char *const kRunningJobState = "RUNNING";
constexpr const char *const kFinishedJobState = "FINISHED";
} // namespace

void GcsJobManager::Initialize(const GcsInitData &gcs_init_data) {
for (auto &pair : gcs_init_data.Jobs()) {
const auto &job_id = pair.first;
Expand Down Expand Up @@ -82,7 +89,7 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
auto time = current_sys_time_ms();
mutable_job_table_data.set_start_time(time);
mutable_job_table_data.set_timestamp(time);
JobID job_id = JobID::FromBinary(mutable_job_table_data.job_id());
const JobID job_id = JobID::FromBinary(mutable_job_table_data.job_id());
RAY_LOG(INFO) << "Adding job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();

Expand All @@ -92,7 +99,8 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();
} else {
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, mutable_job_table_data, nullptr));
RAY_CHECK_OK(
gcs_publisher_->PublishJob(job_id, mutable_job_table_data, /*done=*/nullptr));
if (mutable_job_table_data.config().has_runtime_env_info()) {
runtime_env_manager_.AddURIReference(
job_id.Hex(), mutable_job_table_data.config().runtime_env_info());
Expand All @@ -111,6 +119,11 @@ void GcsJobManager::HandleAddJob(rpc::AddJobRequest request,
gcs_table_storage_->JobTable().Put(job_id, mutable_job_table_data, on_done);
if (!status.ok()) {
on_done(status);
} else {
const bool insert_suc = running_job_ids_.insert(job_id).second;
RAY_CHECK(insert_suc) << job_id.Hex() << " already inserted.";
ray::stats::STATS_jobs.Record(running_job_ids_.size(),
{{"State", kRunningJobState}, {"JobId", job_id.Hex()}});
}
}

Expand Down Expand Up @@ -139,6 +152,12 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
Status status = gcs_table_storage_->JobTable().Put(job_id, job_table_data, on_done);
if (!status.ok()) {
on_done(status);
} else {
auto iter = running_job_ids_.find(job_id);
RAY_CHECK(iter != running_job_ids_.end());
running_job_ids_.erase(iter);
ray::stats::STATS_jobs.Record(
running_job_ids_.size(), {{"State", kFinishedJobState}, {"JobId", job_id.Hex()}});
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/runtime_env_manager.h"
#include "ray/gcs/gcs_server/gcs_function_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
Expand Down Expand Up @@ -89,6 +96,14 @@ class GcsJobManager : public rpc::JobInfoHandler {
void WriteDriverJobExportEvent(rpc::JobTableData job_data) const;

private:
void ClearJobInfos(const rpc::JobTableData &job_data);

void MarkJobAsFinished(rpc::JobTableData job_table_data,
std::function<void(Status)> done_callback);

// Job ids, which are running.
absl::flat_hash_set<JobID> running_job_ids_;

std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;

Expand All @@ -104,11 +119,6 @@ class GcsJobManager : public rpc::JobInfoHandler {

/// The cached core worker clients which are used to communicate with workers.
rpc::CoreWorkerClientPool core_worker_clients_;

void ClearJobInfos(const rpc::JobTableData &job_data);

void MarkJobAsFinished(rpc::JobTableData job_table_data,
std::function<void(Status)> done_callback);
};

} // namespace gcs
Expand Down
9 changes: 9 additions & 0 deletions src/ray/stats/metric_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ DEFINE_stats(actors,
(),
ray::stats::GAUGE);

/// Track job by state, including RUNNING, FINISHED.
DEFINE_stats(jobs,
"Current number of jobs currently in a particular state.",
// State: latest state for the particular job.
// JobId: ID in hex format for this job.
("State", "JobId"),
/*buckets=*/(),
ray::stats::GAUGE);

/// Logical resource usage reported by raylets.
DEFINE_stats(resources,
// TODO(sang): Support placement_group_reserved_available | used
Expand Down
3 changes: 3 additions & 0 deletions src/ray/stats/metric_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ DECLARE_stats(tasks);
/// Actor stats, broken down by state.
DECLARE_stats(actors);

/// Job stats, broken down by state.
DECLARE_stats(jobs);

/// Placement group stats, broken down by state.
DECLARE_stats(placement_groups);

Expand Down
5 changes: 3 additions & 2 deletions src/ray/util/counter_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <list>
#include <utility>

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
Expand All @@ -35,7 +36,7 @@
template <typename K>
class CounterMap {
public:
CounterMap(){};
CounterMap() = default;

CounterMap(const CounterMap &other) = delete;

Expand All @@ -45,7 +46,7 @@ class CounterMap {
/// Changes are buffered until `FlushOnChangeCallbacks()` is called to enable
/// batching for performance reasons.
void SetOnChangeCallback(std::function<void(const K &)> on_change) {
on_change_ = on_change;
on_change_ = std::move(on_change);
}

/// Flush any pending on change callbacks.
Expand Down

0 comments on commit 9e1f214

Please sign in to comment.