Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS][Storage unification 3/n] Deprecate MemoryInternalKV, use StoreClientInternalKV<MemoryStoreClient> instead #24211

Merged
merged 5 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 0 additions & 93 deletions src/ray/gcs/gcs_server/gcs_kv_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,99 +169,6 @@ void RedisInternalKV::Keys(const std::string &ns,
}));
}

void MemoryInternalKV::Get(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) {
absl::ReaderMutexLock lock(&mu_);
auto true_prefix = MakeKey(ns, key);
auto it = map_.find(true_prefix);
auto val = it == map_.end() ? std::nullopt : std::make_optional(it->second);
if (callback != nullptr) {
io_context_.post(std::bind(std::move(callback), std::move(val)),
"MemoryInternalKV.Get");
}
}

void MemoryInternalKV::Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) {
absl::WriterMutexLock _(&mu_);
auto true_key = MakeKey(ns, key);
auto it = map_.find(true_key);
bool inserted = false;
if (it != map_.end()) {
if (overwrite) {
it->second = value;
}
} else {
map_[true_key] = value;
inserted = true;
}
if (callback != nullptr) {
io_context_.post(std::bind(std::move(callback), inserted), "MemoryInternalKV.Put");
}
}

void MemoryInternalKV::Del(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback) {
absl::WriterMutexLock _(&mu_);
auto true_key = MakeKey(ns, key);
auto it = map_.lower_bound(true_key);
int64_t del_num = 0;
while (it != map_.end()) {
if (!del_by_prefix) {
if (it->first == true_key) {
map_.erase(it);
++del_num;
}
break;
}

if (absl::StartsWith(it->first, true_key)) {
it = map_.erase(it);
++del_num;
} else {
break;
}
}

if (callback != nullptr) {
io_context_.post(std::bind(std::move(callback), del_num), "MemoryInternalKV.Del");
}
}

void MemoryInternalKV::Exists(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback) {
absl::ReaderMutexLock lock(&mu_);
auto true_key = MakeKey(ns, key);
bool existed = map_.find(true_key) != map_.end();
if (callback != nullptr) {
io_context_.post(std::bind(std::move(callback), existed), "MemoryInternalKV.Exists");
}
}

void MemoryInternalKV::Keys(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback) {
absl::ReaderMutexLock lock(&mu_);
std::vector<std::string> keys;
auto true_prefix = MakeKey(ns, prefix);
auto iter = map_.lower_bound(true_prefix);
while (iter != map_.end() && absl::StartsWith(iter->first, true_prefix)) {
keys.emplace_back(ExtractKey(iter->first));
iter++;
}
if (callback != nullptr) {
io_context_.post(std::bind(std::move(callback), std::move(keys)),
"MemoryInternalKV.Keys");
}
}

void GcsInternalKVManager::HandleInternalKVGet(
const rpc::InternalKVGetRequest &request,
rpc::InternalKVGetReply *reply,
Expand Down
32 changes: 0 additions & 32 deletions src/ray/gcs/gcs_server/gcs_kv_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,38 +130,6 @@ class RedisInternalKV : public InternalKVInterface {
boost::asio::io_service::work work_;
};

class MemoryInternalKV : public InternalKVInterface {
public:
MemoryInternalKV(instrumented_io_context &io_context) : io_context_(io_context) {}
void Get(const std::string &ns,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) override;

void Put(const std::string &ns,
const std::string &key,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) override;

void Del(const std::string &ns,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback) override;

void Exists(const std::string &ns,
const std::string &key,
std::function<void(bool)> callback) override;

void Keys(const std::string &ns,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback) override;

private:
instrumented_io_context &io_context_;
absl::Mutex mu_;
absl::btree_map<std::string, std::string> map_ GUARDED_BY(mu_);
};

/// This implementation class of `InternalKVHandler`.
class GcsInternalKVManager : public rpc::InternalKVHandler {
public:
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/gcs/gcs_server/stats_handler_impl.h"
#include "ray/gcs/gcs_server/store_client_kv.h"
#include "ray/pubsub/publisher.h"

namespace ray {
Expand Down Expand Up @@ -446,7 +447,8 @@ void GcsServer::InitKVManager() {
if (storage_type_ == "redis") {
instance = std::make_unique<RedisInternalKV>(GetRedisClientOptions());
} else if (storage_type_ == "memory") {
instance = std::make_unique<MemoryInternalKV>(main_service_);
instance = std::make_unique<StoreClientInternalKV>(
std::make_unique<InMemoryStoreClient>(main_service_));
}
Comment on lines 447 to 452
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use store clients here? Also for redis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think this will mostly impact Ant. cc @raulchen to see if we should turn it on by default.


kv_manager_ = std::make_unique<GcsInternalKVManager>(std::move(instance));
Expand Down
15 changes: 15 additions & 0 deletions src/ray/gcs/gcs_server/store_client_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ void StoreClientInternalKV::Get(
const std::string &table_name,
const std::string &key,
std::function<void(std::optional<std::string>)> callback) {
if (!callback) {
callback = [](auto) {};
}
RAY_CHECK_OK(delegate_->AsyncGet(
table_name, key, [callback = std::move(callback)](auto status, auto result) {
callback(result.has_value() ? std::optional<std::string>(result.value())
Expand All @@ -38,13 +41,19 @@ void StoreClientInternalKV::Put(const std::string &table_name,
const std::string &value,
bool overwrite,
std::function<void(bool)> callback) {
if (!callback) {
callback = [](auto) {};
}
RAY_CHECK_OK(delegate_->AsyncPut(table_name, key, value, overwrite, callback));
}

void StoreClientInternalKV::Del(const std::string &table_name,
const std::string &key,
bool del_by_prefix,
std::function<void(int64_t)> callback) {
if (!callback) {
callback = [](auto) {};
}
if (!del_by_prefix) {
RAY_CHECK_OK(delegate_->AsyncDelete(
table_name, key, [callback = std::move(callback)](bool deleted) {
Expand All @@ -66,12 +75,18 @@ void StoreClientInternalKV::Del(const std::string &table_name,
void StoreClientInternalKV::Exists(const std::string &table_name,
const std::string &key,
std::function<void(bool)> callback) {
if (!callback) {
callback = [](auto) {};
}
RAY_CHECK_OK(delegate_->AsyncExists(table_name, key, std::move(callback)));
}

void StoreClientInternalKV::Keys(const std::string &table_name,
const std::string &prefix,
std::function<void(std::vector<std::string>)> callback) {
if (!callback) {
callback = [](auto) {};
}
RAY_CHECK_OK(delegate_->AsyncGetKeys(table_name, prefix, std::move(callback)));
}

Expand Down
11 changes: 4 additions & 7 deletions src/ray/gcs/gcs_server/test/gcs_kv_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ class GcsKVManagerTest : public ::testing::TestWithParam<std::string> {
"127.0.0.1", ray::TEST_REDIS_SERVER_PORTS.front(), "", false);
if (GetParam() == "redis") {
kv_instance = std::make_unique<ray::gcs::RedisInternalKV>(redis_client_options);
} else if (GetParam() == "memory") {
kv_instance = std::make_unique<ray::gcs::MemoryInternalKV>(io_service);
} else if (GetParam() == "redis_client") {
auto client = std::make_shared<ray::gcs::RedisClient>(redis_client_options);
RAY_CHECK_OK(client->Connect(io_service));
kv_instance = std::make_unique<ray::gcs::StoreClientInternalKV>(
std::make_unique<ray::gcs::RedisStoreClient>(client));
} else if (GetParam() == "memory_client") {
} else if (GetParam() == "memory") {
kv_instance = std::make_unique<ray::gcs::StoreClientInternalKV>(
std::make_unique<ray::gcs::InMemoryStoreClient>(io_service));
}
Expand Down Expand Up @@ -107,10 +105,9 @@ TEST_P(GcsKVManagerTest, TestInternalKV) {
}
}

INSTANTIATE_TEST_SUITE_P(
GcsKVManagerTestFixture,
GcsKVManagerTest,
::testing::Values("redis", "memory", "redis_client", "memory_client"));
INSTANTIATE_TEST_SUITE_P(GcsKVManagerTestFixture,
GcsKVManagerTest,
::testing::Values("redis", "redis_client", "memory"));

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
Expand Down