diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.cc b/src/ray/gcs/gcs_server/gcs_kv_manager.cc index 268ddbe40972..70cd9fd857e2 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.cc @@ -169,99 +169,6 @@ void RedisInternalKV::Keys(const std::string &ns, })); } -void MemoryInternalKV::Get(const std::string &ns, - const std::string &key, - std::function)> callback) { - absl::ReaderMutexLock lock(&mu_); - auto true_prefix = MakeKey(ns, key); - auto it = map_.find(true_prefix); - auto val = it == map_.end() ? std::nullopt : std::make_optional(it->second); - if (callback != nullptr) { - io_context_.post(std::bind(std::move(callback), std::move(val)), - "MemoryInternalKV.Get"); - } -} - -void MemoryInternalKV::Put(const std::string &ns, - const std::string &key, - const std::string &value, - bool overwrite, - std::function callback) { - absl::WriterMutexLock _(&mu_); - auto true_key = MakeKey(ns, key); - auto it = map_.find(true_key); - bool inserted = false; - if (it != map_.end()) { - if (overwrite) { - it->second = value; - } - } else { - map_[true_key] = value; - inserted = true; - } - if (callback != nullptr) { - io_context_.post(std::bind(std::move(callback), inserted), "MemoryInternalKV.Put"); - } -} - -void MemoryInternalKV::Del(const std::string &ns, - const std::string &key, - bool del_by_prefix, - std::function callback) { - absl::WriterMutexLock _(&mu_); - auto true_key = MakeKey(ns, key); - auto it = map_.lower_bound(true_key); - int64_t del_num = 0; - while (it != map_.end()) { - if (!del_by_prefix) { - if (it->first == true_key) { - map_.erase(it); - ++del_num; - } - break; - } - - if (absl::StartsWith(it->first, true_key)) { - it = map_.erase(it); - ++del_num; - } else { - break; - } - } - - if (callback != nullptr) { - io_context_.post(std::bind(std::move(callback), del_num), "MemoryInternalKV.Del"); - } -} - -void MemoryInternalKV::Exists(const std::string &ns, - const std::string &key, - std::function callback) { - absl::ReaderMutexLock lock(&mu_); - auto true_key = MakeKey(ns, key); - bool existed = map_.find(true_key) != map_.end(); - if (callback != nullptr) { - io_context_.post(std::bind(std::move(callback), existed), "MemoryInternalKV.Exists"); - } -} - -void MemoryInternalKV::Keys(const std::string &ns, - const std::string &prefix, - std::function)> callback) { - absl::ReaderMutexLock lock(&mu_); - std::vector keys; - auto true_prefix = MakeKey(ns, prefix); - auto iter = map_.lower_bound(true_prefix); - while (iter != map_.end() && absl::StartsWith(iter->first, true_prefix)) { - keys.emplace_back(ExtractKey(iter->first)); - iter++; - } - if (callback != nullptr) { - io_context_.post(std::bind(std::move(callback), std::move(keys)), - "MemoryInternalKV.Keys"); - } -} - void GcsInternalKVManager::HandleInternalKVGet( const rpc::InternalKVGetRequest &request, rpc::InternalKVGetReply *reply, diff --git a/src/ray/gcs/gcs_server/gcs_kv_manager.h b/src/ray/gcs/gcs_server/gcs_kv_manager.h index 8570ac9aafdd..6d2a5374a240 100644 --- a/src/ray/gcs/gcs_server/gcs_kv_manager.h +++ b/src/ray/gcs/gcs_server/gcs_kv_manager.h @@ -130,38 +130,6 @@ class RedisInternalKV : public InternalKVInterface { boost::asio::io_service::work work_; }; -class MemoryInternalKV : public InternalKVInterface { - public: - MemoryInternalKV(instrumented_io_context &io_context) : io_context_(io_context) {} - void Get(const std::string &ns, - const std::string &key, - std::function)> callback) override; - - void Put(const std::string &ns, - const std::string &key, - const std::string &value, - bool overwrite, - std::function callback) override; - - void Del(const std::string &ns, - const std::string &key, - bool del_by_prefix, - std::function callback) override; - - void Exists(const std::string &ns, - const std::string &key, - std::function callback) override; - - void Keys(const std::string &ns, - const std::string &prefix, - std::function)> callback) override; - - private: - instrumented_io_context &io_context_; - absl::Mutex mu_; - absl::btree_map map_ GUARDED_BY(mu_); -}; - /// This implementation class of `InternalKVHandler`. class GcsInternalKVManager : public rpc::InternalKVHandler { public: diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 20acafd4eceb..a146d614b41a 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -28,6 +28,7 @@ #include "ray/gcs/gcs_server/gcs_resource_report_poller.h" #include "ray/gcs/gcs_server/gcs_worker_manager.h" #include "ray/gcs/gcs_server/stats_handler_impl.h" +#include "ray/gcs/gcs_server/store_client_kv.h" #include "ray/pubsub/publisher.h" namespace ray { @@ -446,7 +447,8 @@ void GcsServer::InitKVManager() { if (storage_type_ == "redis") { instance = std::make_unique(GetRedisClientOptions()); } else if (storage_type_ == "memory") { - instance = std::make_unique(main_service_); + instance = std::make_unique( + std::make_unique(main_service_)); } kv_manager_ = std::make_unique(std::move(instance)); diff --git a/src/ray/gcs/gcs_server/store_client_kv.cc b/src/ray/gcs/gcs_server/store_client_kv.cc index 92bd81e9a792..e83922afbed0 100644 --- a/src/ray/gcs/gcs_server/store_client_kv.cc +++ b/src/ray/gcs/gcs_server/store_client_kv.cc @@ -26,6 +26,9 @@ void StoreClientInternalKV::Get( const std::string &table_name, const std::string &key, std::function)> callback) { + if (!callback) { + callback = [](auto) {}; + } RAY_CHECK_OK(delegate_->AsyncGet( table_name, key, [callback = std::move(callback)](auto status, auto result) { callback(result.has_value() ? std::optional(result.value()) @@ -38,6 +41,9 @@ void StoreClientInternalKV::Put(const std::string &table_name, const std::string &value, bool overwrite, std::function callback) { + if (!callback) { + callback = [](auto) {}; + } RAY_CHECK_OK(delegate_->AsyncPut(table_name, key, value, overwrite, callback)); } @@ -45,6 +51,9 @@ void StoreClientInternalKV::Del(const std::string &table_name, const std::string &key, bool del_by_prefix, std::function callback) { + if (!callback) { + callback = [](auto) {}; + } if (!del_by_prefix) { RAY_CHECK_OK(delegate_->AsyncDelete( table_name, key, [callback = std::move(callback)](bool deleted) { @@ -66,12 +75,18 @@ void StoreClientInternalKV::Del(const std::string &table_name, void StoreClientInternalKV::Exists(const std::string &table_name, const std::string &key, std::function callback) { + if (!callback) { + callback = [](auto) {}; + } RAY_CHECK_OK(delegate_->AsyncExists(table_name, key, std::move(callback))); } void StoreClientInternalKV::Keys(const std::string &table_name, const std::string &prefix, std::function)> callback) { + if (!callback) { + callback = [](auto) {}; + } RAY_CHECK_OK(delegate_->AsyncGetKeys(table_name, prefix, std::move(callback))); } diff --git a/src/ray/gcs/gcs_server/test/gcs_kv_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_kv_manager_test.cc index 3f7e9d174b4d..9c4d69120a6d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_kv_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_kv_manager_test.cc @@ -35,14 +35,12 @@ class GcsKVManagerTest : public ::testing::TestWithParam { "127.0.0.1", ray::TEST_REDIS_SERVER_PORTS.front(), "", false); if (GetParam() == "redis") { kv_instance = std::make_unique(redis_client_options); - } else if (GetParam() == "memory") { - kv_instance = std::make_unique(io_service); } else if (GetParam() == "redis_client") { auto client = std::make_shared(redis_client_options); RAY_CHECK_OK(client->Connect(io_service)); kv_instance = std::make_unique( std::make_unique(client)); - } else if (GetParam() == "memory_client") { + } else if (GetParam() == "memory") { kv_instance = std::make_unique( std::make_unique(io_service)); } @@ -107,10 +105,9 @@ TEST_P(GcsKVManagerTest, TestInternalKV) { } } -INSTANTIATE_TEST_SUITE_P( - GcsKVManagerTestFixture, - GcsKVManagerTest, - ::testing::Values("redis", "memory", "redis_client", "memory_client")); +INSTANTIATE_TEST_SUITE_P(GcsKVManagerTestFixture, + GcsKVManagerTest, + ::testing::Values("redis", "redis_client", "memory")); int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv);