Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate metric items into raylet #4602

Merged
merged 28 commits into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8ae184b
refine tables.
Apr 8, 2019
ed8a40a
Add AvailableResource and TotalResource metrics.
jovany-wang Apr 11, 2019
7755698
Add CurrentDriver metric.
jovany-wang Apr 15, 2019
cd2dce7
Add actor stats.
jovany-wang Apr 15, 2019
a45b779
Add Object Stats metric in raylet
jovany-wang Apr 16, 2019
0edf061
Add LineageStats metric.
jovany-wang Apr 16, 2019
41ee5d4
Add task dependencies manager metrics.
jovany-wang Apr 16, 2019
2a74e6d
Add SchedulingQueueStats metric.
jovany-wang Apr 18, 2019
d269725
Add ReconstructionPolicy metrics.
jovany-wang Apr 18, 2019
c87027e
Refine
jovany-wang Apr 18, 2019
e07a539
Refine
jovany-wang Apr 18, 2019
07f8aa9
Refine
jovany-wang Apr 18, 2019
1dc7e65
Update src/ray/gcs/redis_context.cc
raulchen Apr 18, 2019
fd94ce5
Update src/ray/gcs/redis_context.cc
raulchen Apr 18, 2019
25642d9
Update src/ray/stats/metric_defs.h
raulchen Apr 18, 2019
7e982d6
Address comments.
jovany-wang Apr 19, 2019
5668f6a
run format.sh
jovany-wang Apr 19, 2019
4ddc7f1
Fix CI
jovany-wang Apr 19, 2019
4b0f055
Update src/ray/stats/metric_defs.h
raulchen Apr 22, 2019
fe86988
Update src/ray/stats/metric_defs.h
raulchen Apr 22, 2019
e05aaa0
Update src/ray/stats/metric_defs.h
raulchen Apr 22, 2019
82610de
Update src/ray/stats/metric_defs.h
raulchen Apr 22, 2019
569cf10
Update src/ray/stats/metric_defs.h
raulchen Apr 22, 2019
ecfe1af
Address comments.
jovany-wang Apr 22, 2019
b73ca64
Fix linting,
jovany-wang Apr 23, 2019
9edf32b
Address comment
jovany-wang Apr 23, 2019
7c5a9e8
Fix lint
jovany-wang Apr 23, 2019
77dcccc
Fix CI
jovany-wang Apr 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

#include <sstream>

#include "ray/stats/stats.h"
#include "ray/util/util.h"

extern "C" {
#include "ray/thirdparty/hiredis/adapters/ae.h"
#include "ray/thirdparty/hiredis/async.h"
Expand All @@ -18,13 +21,21 @@ namespace {
/// A helper function to call the callback and delete it from the callback
/// manager if necessary.
void ProcessCallback(int64_t callback_index, const std::string &data) {
if (callback_index >= 0) {
bool delete_callback =
ray::gcs::RedisCallbackManager::instance().get(callback_index)(data);
// Delete the callback if necessary.
if (delete_callback) {
ray::gcs::RedisCallbackManager::instance().remove(callback_index);
}
RAY_CHECK(callback_index >= 0) << "The callback index must be greater than 0, "
<< "but it actually is " << callback_index;
auto callback_item = ray::gcs::RedisCallbackManager::instance().get(callback_index);
if (!callback_item.is_subscription) {
// Record the redis latency for non-subscription redis operations.
auto end_time = current_sys_time_us();
ray::stats::RedisLatency().Record(end_time - callback_item.start_time);
}
// Invoke the callback.
if (callback_item.callback != nullptr) {
callback_item.callback(data);
}
if (!callback_item.is_subscription) {
// Delete the callback if it's not a subscription callback.
ray::gcs::RedisCallbackManager::instance().remove(callback_index);
}
}

Expand Down Expand Up @@ -104,18 +115,20 @@ void SubscribeRedisCallback(void *c, void *r, void *privdata) {
ProcessCallback(callback_index, data);
}

int64_t RedisCallbackManager::add(const RedisCallback &function) {
callbacks_.emplace(num_callbacks_, function);
int64_t RedisCallbackManager::add(const RedisCallback &function, bool is_subscription) {
auto start_time = current_sys_time_us();
callback_items_.emplace(num_callbacks_,
CallbackItem(function, is_subscription, start_time));
return num_callbacks_++;
}

RedisCallback &RedisCallbackManager::get(int64_t callback_index) {
RAY_CHECK(callbacks_.find(callback_index) != callbacks_.end());
return callbacks_[callback_index];
RedisCallbackManager::CallbackItem &RedisCallbackManager::get(int64_t callback_index) {
RAY_CHECK(callback_items_.find(callback_index) != callback_items_.end());
return callback_items_[callback_index];
}

void RedisCallbackManager::remove(int64_t callback_index) {
callbacks_.erase(callback_index);
callback_items_.erase(callback_index);
}

#define REDIS_CHECK_ERROR(CONTEXT, REPLY) \
Expand Down Expand Up @@ -217,8 +230,7 @@ Status RedisContext::RunAsync(const std::string &command, const UniqueID &id,
const uint8_t *data, int64_t length,
const TablePrefix prefix, const TablePubsub pubsub_channel,
RedisCallback redisCallback, int log_length) {
int64_t callback_index =
redisCallback != nullptr ? RedisCallbackManager::instance().add(redisCallback) : -1;
int64_t callback_index = RedisCallbackManager::instance().add(redisCallback, false);
if (length > 0) {
if (log_length >= 0) {
std::string redis_command = command + " %d %d %b %b %d";
Expand Down Expand Up @@ -278,7 +290,7 @@ Status RedisContext::SubscribeAsync(const ClientID &client_id,
RAY_CHECK(pubsub_channel != TablePubsub::NO_PUBLISH)
<< "Client requested subscribe on a table that does not support pubsub";

int64_t callback_index = RedisCallbackManager::instance().add(redisCallback);
int64_t callback_index = RedisCallbackManager::instance().add(redisCallback, true);
RAY_CHECK(out_callback_index != nullptr);
*out_callback_index = callback_index;
int status = 0;
Expand Down
26 changes: 20 additions & 6 deletions src/ray/gcs/redis_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ namespace ray {

namespace gcs {
/// Every callback should take in a vector of the results from the Redis
/// operation and return a bool indicating whether the callback should be
/// deleted once called.
using RedisCallback = std::function<bool(const std::string &)>;
/// operation.
using RedisCallback = std::function<void(const std::string &)>;

class RedisCallbackManager {
public:
Expand All @@ -30,9 +29,24 @@ class RedisCallbackManager {
return instance;
}

int64_t add(const RedisCallback &function);
struct CallbackItem {
CallbackItem() = default;

RedisCallback &get(int64_t callback_index);
CallbackItem(const RedisCallback &callback, bool is_subscription,
int64_t start_time) {
this->callback = callback;
this->is_subscription = is_subscription;
this->start_time = start_time;
}

RedisCallback callback;
bool is_subscription;
int64_t start_time;
};

int64_t add(const RedisCallback &function, bool is_subscription);

CallbackItem &get(int64_t callback_index);

/// Remove a callback.
void remove(int64_t callback_index);
Expand All @@ -43,7 +57,7 @@ class RedisCallbackManager {
~RedisCallbackManager() {}

int64_t num_callbacks_ = 0;
std::unordered_map<int64_t, RedisCallback> callbacks_;
std::unordered_map<int64_t, CallbackItem> callback_items_;
};

class RedisContext {
Expand Down
10 changes: 1 addition & 9 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ Status Log<ID, Data>::Append(const JobID &job_id, const ID &id,
if (done != nullptr) {
(done)(client_, id, *dataT);
}
return true;
};
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
Expand All @@ -73,7 +72,6 @@ Status Log<ID, Data>::AppendAt(const JobID &job_id, const ID &id,
(failure)(client_, id, *dataT);
}
}
return true;
};
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
Expand Down Expand Up @@ -101,7 +99,6 @@ Status Log<ID, Data>::Lookup(const JobID &job_id, const ID &id, const Callback &
}
lookup(client_, id, results);
}
return true;
};
std::vector<uint8_t> nil;
return GetRedisContext(id)->RunAsync("RAY.TABLE_LOOKUP", id, nil.data(), nil.size(),
Expand Down Expand Up @@ -153,10 +150,8 @@ Status Log<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id,
subscribe(client_, id, root->notification_mode(), results);
}
}
// We do not delete the callback after calling it since there may be
// more subscription messages.
return false;
};

subscribe_callback_index_ = 1;
for (auto &context : shard_contexts_) {
RAY_RETURN_NOT_OK(context->SubscribeAsync(client_id, pubsub_channel_, callback,
Expand Down Expand Up @@ -229,7 +224,6 @@ Status Table<ID, Data>::Add(const JobID &job_id, const ID &id,
if (done != nullptr) {
(done)(client_, id, *dataT);
}
return true;
};
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
Expand Down Expand Up @@ -295,7 +289,6 @@ Status Set<ID, Data>::Add(const JobID &job_id, const ID &id,
if (done != nullptr) {
(done)(client_, id, *dataT);
}
return true;
};
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
Expand All @@ -313,7 +306,6 @@ Status Set<ID, Data>::Remove(const JobID &job_id, const ID &id,
if (done != nullptr) {
(done)(client_, id, *dataT);
}
return true;
};
flatbuffers::FlatBufferBuilder fbb;
fbb.ForceDefaults(true);
Expand Down
21 changes: 21 additions & 0 deletions src/ray/object_manager/connection_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,25 @@ std::string ConnectionPool::DebugString() const {
return result.str();
}

void ConnectionPool::RecordMetrics() const {
stats::ConnectionPoolStats().Record(
message_send_connections_.size(),
{{stats::ValueTypeKey, "num_message_send_connections"}});
stats::ConnectionPoolStats().Record(
transfer_send_connections_.size(),
{{stats::ValueTypeKey, "num_transfer_send_connections"}});
stats::ConnectionPoolStats().Record(
available_transfer_send_connections_.size(),
{{stats::ValueTypeKey, "num_avail_message_send_connections"}});
stats::ConnectionPoolStats().Record(
available_transfer_send_connections_.size(),
{{stats::ValueTypeKey, "num_avail_transfer_send_connections"}});
stats::ConnectionPoolStats().Record(
message_receive_connections_.size(),
{{stats::ValueTypeKey, "num_message_receive_connections"}});
stats::ConnectionPoolStats().Record(
transfer_receive_connections_.size(),
{{stats::ValueTypeKey, "num_transfer_receive_connections"}});
}

} // namespace ray
4 changes: 4 additions & 0 deletions src/ray/object_manager/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "ray/object_manager/format/object_manager_generated.h"
#include "ray/object_manager/object_directory.h"
#include "ray/object_manager/object_manager_client_connection.h"
#include "ray/stats/stats.h"

namespace asio = boost::asio;

Expand Down Expand Up @@ -95,6 +96,9 @@ class ConnectionPool {
/// \return string.
std::string DebugString() const;

/// Record metrics.
void RecordMetrics() const;

/// This object cannot be copied for thread-safety.
RAY_DISALLOW_COPY_AND_ASSIGN(ConnectionPool);

Expand Down
16 changes: 16 additions & 0 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "ray/object_manager/object_manager.h"
#include "ray/common/common_protocol.h"
#include "ray/stats/stats.h"
#include "ray/util/util.h"

namespace asio = boost::asio;
Expand Down Expand Up @@ -966,4 +967,19 @@ std::string ObjectManager::DebugString() const {
return result.str();
}

void ObjectManager::RecordMetrics() const {
stats::ObjectManagerStats().Record(local_objects_.size(),
{{stats::ValueTypeKey, "num_local_objects"}});
stats::ObjectManagerStats().Record(active_wait_requests_.size(),
{{stats::ValueTypeKey, "num_active_wait_requests"}});
stats::ObjectManagerStats().Record(
unfulfilled_push_requests_.size(),
{{stats::ValueTypeKey, "num_unfulfilled_push_requests"}});
stats::ObjectManagerStats().Record(pull_requests_.size(),
{{stats::ValueTypeKey, "num_pull_requests"}});
stats::ObjectManagerStats().Record(profile_events_.size(),
{{stats::ValueTypeKey, "num_profile_events"}});
connection_pool_.RecordMetrics();
}

} // namespace ray
3 changes: 3 additions & 0 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class ObjectManager : public ObjectManagerInterface {
/// \return string.
std::string DebugString() const;

/// Record metrics.
void RecordMetrics() const;

private:
friend class TestObjectManager;

Expand Down
12 changes: 12 additions & 0 deletions src/ray/raylet/lineage_cache.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "lineage_cache.h"
#include "ray/stats/stats.h"

#include <sstream>

Expand Down Expand Up @@ -471,6 +472,17 @@ std::string LineageCache::DebugString() const {
return result.str();
}

void LineageCache::RecordMetrics() const {
stats::LineageCacheStats().Record(committed_tasks_.size(),
{{stats::ValueTypeKey, "num_committed_tasks"}});
stats::LineageCacheStats().Record(lineage_.GetChildrenSize(),
{{stats::ValueTypeKey, "num_children"}});
stats::LineageCacheStats().Record(subscribed_tasks_.size(),
{{stats::ValueTypeKey, "num_subscribed_tasks"}});
stats::LineageCacheStats().Record(lineage_.GetEntries().size(),
{{stats::ValueTypeKey, "num_lineages"}});
}

} // namespace raylet

} // namespace ray
3 changes: 3 additions & 0 deletions src/ray/raylet/lineage_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ class LineageCache {
/// \return string.
std::string DebugString() const;

/// Record metrics.
void RecordMetrics() const;

private:
FRIEND_TEST(LineageCacheTest, BarReturnsZeroOnNull);
/// Flush a task that is in UNCOMMITTED_READY state.
Expand Down
Loading