Skip to content

Commit

Permalink
Implement an unify key-value iterator for Kvrocks
Browse files Browse the repository at this point in the history
Currently, we need to iterate all keys in the database in different places like
the cluster migration and kvrocks2redis, but don't have an iterator for this purpose.
It's very error-prone to implement this in different places since Kvrocks may add
a new column family in the future, and we must be careful to iterate all keys in all column families.
This would be a burden for maintenance, So we want to implement an iterator for iterating keys.

```C++

DBIter iter(storage, read_option);
for (iter.Seek(); iter.Valid(); iter.Next()) {
    if (iter.Type() == kRedisString || iter.Type() == kRedisJSON) {
        // the string/json type didn't have subkeys
        continue;
    }

    auto subkey_iter = iter.GetSubKeyIterator();
    for (subkey_iter.Seek(); subkey_iter.Valid(); subkey_iter.Next()) {
        // handle its subkey and value here
    }
}

```

When using this iterator, it will iterate the metadata column family first and check its type,
if it's not a string or json, then it will iterate the corresponding column family to get subkeys.
That said, if we have a key foo with type hash, then the iterator will iterate foo and foo:field1, foo:field2, and so on.

This solution can bring those benefits:

- The codes look more intutive
- Can reuse this iterator if we want to iterate keys only
  • Loading branch information
git-hulk committed Jan 11, 2024
1 parent ee41959 commit bfa1d0d
Show file tree
Hide file tree
Showing 4 changed files with 540 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ struct StringInStatusOr<T, std::enable_if_t<sizeof(T) < sizeof(std::string)>> :
StringInStatusOr(StringInStatusOr<U>&& v) : BaseType(new std::string(*std::move(v))) {} // NOLINT
template <typename U, typename std::enable_if_t<!StringInStatusOr<U>::inplace, int> = 0>
StringInStatusOr(StringInStatusOr<U>&& v) // NOLINT
: BaseType((typename StringInStatusOr<U>::BaseType &&)(std::move(v))) {}
: BaseType((typename StringInStatusOr<U>::BaseType&&)(std::move(v))) {}

StringInStatusOr(const StringInStatusOr& v) = delete;

Expand Down
162 changes: 162 additions & 0 deletions src/storage/iterator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "iterator.h"

#include <cluster/redis_slot.h>

#include "db_util.h"

namespace engine {
DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot)
: storage_(storage), read_options_(std::move(read_options)), slot_(slot) {
metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName);
metadata_iter_ = std::move(util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_)));
}

void DBIterator::Next() {
metadata_iter_->Next();
nextUntilValid();
}

void DBIterator::nextUntilValid() {
// slot_ != -1 means we would like to iterate all keys in the slot
// so we can skip the afterwards keys if the slot id doesn't match
if (slot_ != -1 && metadata_iter_->Valid()) {
auto [_, user_key] = ExtractNamespaceKey(metadata_iter_->key(), storage_->IsSlotIdEncoded());
// Release the iterator if the slot id doesn't match
if (GetSlotIdFromKey(user_key.ToString()) != slot_) {
Reset();
return;
}
}

while (metadata_iter_->Valid()) {
Metadata metadata(kRedisNone, false);
// Skip the metadata if it's expired
if (metadata.Decode(metadata_iter_->value()).ok() && !metadata.Expired()) {
metadata_ = metadata;
break;
}
metadata_iter_->Next();
}
}

bool DBIterator::Valid() const { return metadata_iter_ && metadata_iter_->Valid(); }

Slice DBIterator::Key() const { return Valid() ? metadata_iter_->key() : Slice(); }

std::tuple<Slice, Slice> DBIterator::UserKey() const {
if (!Valid()) {
return {};
}
return ExtractNamespaceKey(metadata_iter_->key(), slot_ != -1);
}

Slice DBIterator::Value() const { return Valid() ? metadata_iter_->value() : Slice(); }

RedisType DBIterator::Type() const { return Valid() ? metadata_.Type() : kRedisNone; }

void DBIterator::Reset() {
if (metadata_iter_) metadata_iter_.reset();
}

void DBIterator::Seek(const std::string& target) {
metadata_iter_->Reset();

// Iterate with the slot id but storage didn't enable slot id encoding
if (slot_ != -1 && !storage_->IsSlotIdEncoded()) {
Reset();
return;
}
std::string prefix = target;
if (slot_ != -1) {
prefix = ComposeSlotKeyPrefix(kDefaultNamespace, slot_) + target;
}

// TODO(@git-hulk): add slot id support for seek
metadata_iter_->Seek(prefix);
nextUntilValid();
}

std::unique_ptr<SubKeyIterator> DBIterator::GetSubKeyIterator() const {
if (!Valid()) {
return nullptr;
}

// The string/json type doesn't have sub keys
RedisType type = metadata_.Type();
if (type == kRedisNone || type == kRedisString || type == kRedisJson) {
return nullptr;
}

auto prefix = InternalKey(Key(), "", metadata_.version, storage_->IsSlotIdEncoded()).Encode();
return std::make_unique<SubKeyIterator>(storage_, read_options_, type, prefix);
}

SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix)
: storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) {
if (type_ == kRedisStream) {
cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName);
} else {
cf_handle_ = storage_->GetCFHandle(kSubkeyColumnFamilyName);
}
iter_ = std::move(util::UniqueIterator(storage_->NewIterator(read_options_, cf_handle_)));
}

void SubKeyIterator::Next() {
iter_->Next();

if (!Valid()) return;

if (!iter_->key().starts_with(prefix_)) {
Reset();
}
}

bool SubKeyIterator::Valid() const { return iter_ && iter_->Valid(); }

Slice SubKeyIterator::Key() const { return Valid() ? iter_->key() : Slice(); }

Slice SubKeyIterator::UserKey() const {
if (!Valid()) return {};

const InternalKey internal_key(iter_->key(), storage_->IsSlotIdEncoded());
return internal_key.GetSubKey();
}

Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); }

void SubKeyIterator::Seek() {
iter_->Reset();
iter_->Seek(prefix_);

if (!iter_->Valid()) return;
// For the subkey iterator, it MUST contain the prefix key itself
if (!iter_->key().starts_with(prefix_)) {
Reset();
}
}

void SubKeyIterator::Reset() {
if (iter_) iter_.reset();
}

} // namespace engine
82 changes: 82 additions & 0 deletions src/storage/iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#pragma once

#include <rocksdb/iterator.h>
#include <rocksdb/options.h>

#include "storage.h"

namespace engine {

class SubKeyIterator {
public:
explicit SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix);
~SubKeyIterator() = default;
bool Valid() const;
void Seek();
void Next();
// return the raw key in rocksdb
Slice Key() const;
// return the user key without prefix
Slice UserKey() const;
Slice Value() const;
void Reset();

private:
Storage *storage_;
rocksdb::ReadOptions read_options_;
RedisType type_;
std::string prefix_;
std::unique_ptr<rocksdb::Iterator> iter_;
rocksdb::ColumnFamilyHandle *cf_handle_ = nullptr;
};

class DBIterator {
public:
explicit DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot = -1);
~DBIterator() = default;

bool Valid() const;
void Seek(const std::string &target = "");
void Next();
// return the raw key in rocksdb
Slice Key() const;
// return the namespace and user key without prefix
std::tuple<Slice, Slice> UserKey() const;
Slice Value() const;
RedisType Type() const;
void Reset();
std::unique_ptr<SubKeyIterator> GetSubKeyIterator() const;

private:
void nextUntilValid();

Storage *storage_;
rocksdb::ReadOptions read_options_;
int slot_ = -1;
Metadata metadata_ = Metadata(kRedisNone, false);

rocksdb::ColumnFamilyHandle *metadata_cf_handle_ = nullptr;
std::unique_ptr<rocksdb::Iterator> metadata_iter_;
std::unique_ptr<SubKeyIterator> subkey_iter_;
};

} // namespace engine
Loading

0 comments on commit bfa1d0d

Please sign in to comment.