From d7e34de64aed887aa77936aeef29721589f2b735 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Wed, 10 Jan 2024 21:57:59 +0800 Subject: [PATCH] Implement an unify key-value iterator for Kvrocks Currently, we need to iterate all keys in the database in different places like the cluster migration and kvrocks2redis, but don't have an iterator for this purpose. It's very error-prone to implement this in different places since Kvrocks may add a new column family in the future, and we must be careful to iterate all keys in all column families. This would be a burden for maintenance, So we want to implement an iterator for iterating keys. ```C++ DBIter iter(storage, read_option); for (iter.Seek(); iter.Valid(); iter.Next()) { if (iter.Type() == kRedisString || iter.Type() == kRedisJSON) { // the string/json type didn't have subkeys continue; } auto subkey_iter = iter.GetSubKeyIterator(); for (subkey_iter.Seek(); subkey_iter.Valid(); subkey_iter.Next()) { // handle its subkey and value here } } ``` When using this iterator, it will iterate the metadata column family first and check its type, if it's not a string or json, then it will iterate the corresponding column family to get subkeys. That said, if we have a key foo with type hash, then the iterator will iterate foo and foo:field1, foo:field2, and so on. This solution can bring those benefits: - The codes look more intutive - Can reuse this iterator if we want to iterate keys only --- src/common/status.h | 2 +- src/storage/iterator.cc | 162 ++++++++++++++++++ src/storage/iterator.h | 82 +++++++++ tests/cppunit/iterator_test.cc | 295 +++++++++++++++++++++++++++++++++ 4 files changed, 540 insertions(+), 1 deletion(-) create mode 100644 src/storage/iterator.cc create mode 100644 src/storage/iterator.h create mode 100644 tests/cppunit/iterator_test.cc diff --git a/src/common/status.h b/src/common/status.h index 37eae9d8281..a03dd46420c 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -168,7 +168,7 @@ struct StringInStatusOr> : StringInStatusOr(StringInStatusOr&& v) : BaseType(new std::string(*std::move(v))) {} // NOLINT template ::inplace, int> = 0> StringInStatusOr(StringInStatusOr&& v) // NOLINT - : BaseType((typename StringInStatusOr::BaseType &&)(std::move(v))) {} + : BaseType((typename StringInStatusOr::BaseType&&)(std::move(v))) {} StringInStatusOr(const StringInStatusOr& v) = delete; diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc new file mode 100644 index 00000000000..0dc8a6289e0 --- /dev/null +++ b/src/storage/iterator.cc @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "iterator.h" + +#include + +#include "db_util.h" + +namespace engine { +DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot) + : storage_(storage), read_options_(std::move(read_options)), slot_(slot) { + metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName); + metadata_iter_ = std::move(util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_))); +} + +void DBIterator::Next() { + metadata_iter_->Next(); + nextUntilValid(); +} + +void DBIterator::nextUntilValid() { + // slot_ != -1 means we would like to iterate all keys in the slot + // so we can skip the afterwards keys if the slot id doesn't match + if (slot_ != -1 && metadata_iter_->Valid()) { + auto [_, user_key] = ExtractNamespaceKey(metadata_iter_->key(), storage_->IsSlotIdEncoded()); + // Release the iterator if the slot id doesn't match + if (GetSlotIdFromKey(user_key.ToString()) != slot_) { + Reset(); + return; + } + } + + while (metadata_iter_->Valid()) { + Metadata metadata(kRedisNone, false); + // Skip the metadata if it's expired + if (metadata.Decode(metadata_iter_->value()).ok() && !metadata.Expired()) { + metadata_ = metadata; + break; + } + metadata_iter_->Next(); + } +} + +bool DBIterator::Valid() const { return metadata_iter_ && metadata_iter_->Valid(); } + +Slice DBIterator::Key() const { return Valid() ? metadata_iter_->key() : Slice(); } + +std::tuple DBIterator::UserKey() const { + if (!Valid()) { + return {}; + } + return ExtractNamespaceKey(metadata_iter_->key(), slot_ != -1); +} + +Slice DBIterator::Value() const { return Valid() ? metadata_iter_->value() : Slice(); } + +RedisType DBIterator::Type() const { return Valid() ? metadata_.Type() : kRedisNone; } + +void DBIterator::Reset() { + if (metadata_iter_) metadata_iter_.reset(); +} + +void DBIterator::Seek(const std::string& target) { + metadata_iter_->Reset(); + + // Iterate with the slot id but storage didn't enable slot id encoding + if (slot_ != -1 && !storage_->IsSlotIdEncoded()) { + Reset(); + return; + } + std::string prefix = target; + if (slot_ != -1) { + // Use the slot id as the prefix if it's specified + prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot_) + target; + } + + metadata_iter_->Seek(prefix); + nextUntilValid(); +} + +std::unique_ptr DBIterator::GetSubKeyIterator() const { + if (!Valid()) { + return nullptr; + } + + // The string/json type doesn't have sub keys + RedisType type = metadata_.Type(); + if (type == kRedisNone || type == kRedisString || type == kRedisJson) { + return nullptr; + } + + auto prefix = InternalKey(Key(), "", metadata_.version, storage_->IsSlotIdEncoded()).Encode(); + return std::make_unique(storage_, read_options_, type, prefix); +} + +SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix) + : storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) { + if (type_ == kRedisStream) { + cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName); + } else { + cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName); + } + iter_ = std::move(util::UniqueIterator(storage_->NewIterator(read_options_, cf_handle_))); +} + +void SubKeyIterator::Next() { + iter_->Next(); + + if (!Valid()) return; + + if (!iter_->key().starts_with(prefix_)) { + Reset(); + } +} + +bool SubKeyIterator::Valid() const { return iter_ && iter_->Valid(); } + +Slice SubKeyIterator::Key() const { return Valid() ? iter_->key() : Slice(); } + +Slice SubKeyIterator::UserKey() const { + if (!Valid()) return {}; + + const InternalKey internal_key(iter_->key(), storage_->IsSlotIdEncoded()); + return internal_key.GetSubKey(); +} + +Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); } + +void SubKeyIterator::Seek() { + iter_->Reset(); + iter_->Seek(prefix_); + + if (!iter_->Valid()) return; + // For the subkey iterator, it MUST contain the prefix key itself + if (!iter_->key().starts_with(prefix_)) { + Reset(); + } +} + +void SubKeyIterator::Reset() { + if (iter_) iter_.reset(); +} + +} // namespace engine diff --git a/src/storage/iterator.h b/src/storage/iterator.h new file mode 100644 index 00000000000..40b93bc3799 --- /dev/null +++ b/src/storage/iterator.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#pragma once + +#include +#include + +#include "storage.h" + +namespace engine { + +class SubKeyIterator { + public: + explicit SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix); + ~SubKeyIterator() = default; + bool Valid() const; + void Seek(); + void Next(); + // return the raw key in rocksdb + Slice Key() const; + // return the user key without prefix + Slice UserKey() const; + Slice Value() const; + void Reset(); + + private: + Storage *storage_; + rocksdb::ReadOptions read_options_; + RedisType type_; + std::string prefix_; + std::unique_ptr iter_; + rocksdb::ColumnFamilyHandle *cf_handle_ = nullptr; +}; + +class DBIterator { + public: + explicit DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot = -1); + ~DBIterator() = default; + + bool Valid() const; + void Seek(const std::string &target = ""); + void Next(); + // return the raw key in rocksdb + Slice Key() const; + // return the namespace and user key without prefix + std::tuple UserKey() const; + Slice Value() const; + RedisType Type() const; + void Reset(); + std::unique_ptr GetSubKeyIterator() const; + + private: + void nextUntilValid(); + + Storage *storage_; + rocksdb::ReadOptions read_options_; + int slot_ = -1; + Metadata metadata_ = Metadata(kRedisNone, false); + + rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = nullptr; + std::unique_ptr metadata_iter_; + std::unique_ptr subkey_iter_; +}; + +} // namespace engine diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc new file mode 100644 index 00000000000..70a7b39e91d --- /dev/null +++ b/tests/cppunit/iterator_test.cc @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_base.h" +#include "types/redis_string.h" + +class IteratorTest : public TestBase { + protected: + explicit IteratorTest() {} + ~IteratorTest() override = default; + + void SetUp() override { + { // string + redis::String string(storage_, "test_ns0"); + string.Set("a", "1"); + string.Set("b", "2"); + string.Set("c", "3"); + // Make sure the key "c" is expired + auto s = string.Expire("c", 1); + ASSERT_TRUE(s.ok()); + string.Set("d", "4"); + } + + { // hash + uint64_t ret = 0; + redis::Hash hash(storage_, "test_ns1"); + hash.MSet("hash-1", {{"f0", "v0"}, {"f1", "v1"}, {"f2", "v2"}, {"f3", "v3"}}, false, &ret); + } + + { // set + uint64_t ret = 0; + redis::Set set(storage_, "test_ns2"); + set.Add("set-1", {"e0", "e1", "e2"}, &ret); + } + + { // sorted set + uint64_t ret = 0; + redis::ZSet zset(storage_, "test_ns3"); + auto mscores = std::vector{{"z0", 0}, {"z1", 1}, {"z2", 2}}; + zset.Add("zset-1", ZAddFlags(), &mscores, &ret); + } + + { // list + uint64_t ret = 0; + redis::List list(storage_, "test_ns4"); + list.Push("list-1", {"l0", "l1", "l2"}, false, &ret); + } + + { // stream + redis::Stream stream(storage_, "test_ns5"); + redis::StreamEntryID ret; + redis::StreamAddOptions options; + options.next_id_strategy = std::make_unique(); + stream.Add("stream-1", options, {"x0"}, &ret); + stream.Add("stream-1", options, {"x1"}, &ret); + stream.Add("stream-1", options, {"x2"}, &ret); + // TODO(@git-hulk): add stream group after it's finished + } + + { // bitmap + redis::Bitmap bitmap(storage_, "test_ns6"); + bool ret = false; + bitmap.SetBit("bitmap-1", 0, true, &ret); + bitmap.SetBit("bitmap-1", 8 * 1024, true, &ret); + bitmap.SetBit("bitmap-1", 2 * 8 * 1024, true, &ret); + } + } +}; + +TEST_F(IteratorTest, AllKeys) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + std::vector live_keys = {"a", "b", "d", "hash-1", "set-1", "zset-1", "list-1", "stream-1", "bitmap-1"}; + std::reverse(live_keys.begin(), live_keys.end()); + for (iter.Seek(); iter.Valid(); iter.Next()) { + ASSERT_TRUE(!live_keys.empty()); + auto [_, user_key] = iter.UserKey(); + ASSERT_EQ(live_keys.back(), user_key.ToString()); + live_keys.pop_back(); + } + ASSERT_TRUE(live_keys.empty()); +} + +TEST_F(IteratorTest, BasicString) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + + std::vector expected_keys = {"a", "b", "d"}; + std::reverse(expected_keys.begin(), expected_keys.end()); + auto prefix = ComposeNamespaceKey("test_ns0", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + if (expected_keys.empty()) { + FAIL() << "Unexpected key: " << iter.Key().ToString(); + } + ASSERT_EQ(kRedisString, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns0", ns.ToString()); + ASSERT_EQ(expected_keys.back(), key.ToString()); + expected_keys.pop_back(); + // Make sure there is no subkey iterator + ASSERT_TRUE(!iter.GetSubKeyIterator()); + } + // Make sure all keys are iterated except the expired one: "c" + ASSERT_TRUE(expected_keys.empty()); +} + +TEST_F(IteratorTest, BasicHash) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + auto prefix = ComposeNamespaceKey("test_ns1", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + ASSERT_EQ(kRedisHash, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns1", ns.ToString()); + + auto subkey_iter = iter.GetSubKeyIterator(); + ASSERT_TRUE(!subkey_iter); + std::vector expected_keys = {"f3", "f2", "f1", "f0"}; + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + if (expected_keys.empty()) { + FAIL() << "Unexpected key: " << subkey_iter->UserKey().ToString(); + } + ASSERT_EQ(expected_keys.back(), subkey_iter->UserKey().ToString()); + expected_keys.pop_back(); + } + ASSERT_TRUE(expected_keys.empty()); + } +} + +TEST_F(IteratorTest, BasicSet) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + auto prefix = ComposeNamespaceKey("test_ns2", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + ASSERT_EQ(kRedisSet, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns2", ns.ToString()); + + auto subkey_iter = iter.GetSubKeyIterator(); + ASSERT_TRUE(!subkey_iter); + std::vector expected_keys = {"e2", "e1", "e0"}; + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + if (expected_keys.empty()) { + FAIL() << "Unexpected key: " << subkey_iter->UserKey().ToString(); + } + ASSERT_EQ(expected_keys.back(), subkey_iter->UserKey().ToString()); + expected_keys.pop_back(); + } + ASSERT_TRUE(expected_keys.empty()); + } +} + +TEST_F(IteratorTest, BasicZSet) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + auto prefix = ComposeNamespaceKey("test_ns3", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + ASSERT_EQ(kRedisZSet, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns3", ns.ToString()); + + auto subkey_iter = iter.GetSubKeyIterator(); + ASSERT_TRUE(!subkey_iter); + std::vector expected_members = {"z2", "z1", "z0"}; + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + if (expected_members.empty()) { + FAIL() << "Unexpected key: " << subkey_iter->UserKey().ToString(); + } + ASSERT_EQ(expected_members.back(), subkey_iter->UserKey().ToString()); + expected_members.pop_back(); + } + ASSERT_TRUE(expected_members.empty()); + } +} + +TEST_F(IteratorTest, BasicList) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + auto prefix = ComposeNamespaceKey("test_ns4", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + ASSERT_EQ(kRedisList, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns4", ns.ToString()); + + auto subkey_iter = iter.GetSubKeyIterator(); + ASSERT_TRUE(!subkey_iter); + std::vector expected_values = {"l2", "l1", "l0"}; + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + if (expected_values.empty()) { + FAIL() << "Unexpected value: " << subkey_iter->Value().ToString(); + } + ASSERT_EQ(expected_values.back(), subkey_iter->Value().ToString()); + expected_values.pop_back(); + } + ASSERT_TRUE(expected_values.empty()); + } +} + +TEST_F(IteratorTest, BasicStream) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + auto prefix = ComposeNamespaceKey("test_ns5", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + ASSERT_EQ(kRedisStream, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns5", ns.ToString()); + + auto subkey_iter = iter.GetSubKeyIterator(); + ASSERT_TRUE(!subkey_iter); + std::vector expected_values = {"x2", "x1", "x0"}; + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + if (expected_values.empty()) { + FAIL() << "Unexpected value: " << subkey_iter->Value().ToString(); + } + std::vector elems; + auto s = redis::DecodeRawStreamEntryValue(subkey_iter->Value().ToString(), &elems); + ASSERT_TRUE(s.IsOK() && !elems.empty()); + ASSERT_EQ(expected_values.back(), elems[0]); + expected_values.pop_back(); + } + ASSERT_TRUE(expected_values.empty()); + } +} + +TEST_F(IteratorTest, BasicBitmap) { + engine::DBIterator iter(storage_, rocksdb::ReadOptions()); + auto prefix = ComposeNamespaceKey("test_ns6", "", storage_->IsSlotIdEncoded()); + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + ASSERT_EQ(kRedisBitmap, iter.Type()); + auto [ns, key] = iter.UserKey(); + ASSERT_EQ("test_ns6", ns.ToString()); + + auto subkey_iter = iter.GetSubKeyIterator(); + ASSERT_TRUE(!subkey_iter); + std::vector expected_values = {"\x1", "\x1", "\x1"}; + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + if (expected_values.empty()) { + FAIL() << "Unexpected value: " << subkey_iter->Value().ToString(); + } + ASSERT_EQ(expected_values.back(), subkey_iter->Value().ToString()); + expected_values.pop_back(); + } + ASSERT_TRUE(expected_values.empty()); + } +} + +class SlotIteratorTest : public TestBase { + protected: + explicit SlotIteratorTest() {} + ~SlotIteratorTest() override = default; + void SetUp() override { storage_->GetConfig()->slot_id_encoded = true; } +}; + +TEST_F(SlotIteratorTest, LiveKeys) { + redis::String string(storage_, kDefaultNamespace); + std::vector keys = {"{x}a", "{x}b", "{y}c", "{y}d", "{x}e"}; + for (const auto &key : keys) { + string.Set(key, "1"); + } + + std::set same_slot_keys; + auto slot_id = GetSlotIdFromKey(keys[0]); + for (const auto &key : keys) { + if (GetSlotIdFromKey(key) == slot_id) { + same_slot_keys.insert(key); + } + } + engine::DBIterator iter(storage_, rocksdb::ReadOptions(), slot_id); + int count = 0; + for (iter.Seek(); iter.Valid(); iter.Next()) { + auto [_, user_key] = iter.UserKey(); + ASSERT_EQ(slot_id, GetSlotIdFromKey(user_key.ToString())); + count++; + } + ASSERT_EQ(count, same_slot_keys.size()); +}