Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the RocksDB WriteBatchWithIndex to implement the read-your-own-writes in transaction #1287

Merged
merged 13 commits into from
Mar 4, 2023
Merged
3 changes: 1 addition & 2 deletions src/cluster/slot_import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ SlotImport::SlotImport(Server *svr)
import_status_(kImportNone),
import_fd_(-1) {
std::lock_guard<std::mutex> guard(mutex_);
// Let db_ and metadata_cf_handle_ be nullptr, then get them in real time while use them.
// Let metadata_cf_handle_ be nullptr, then get them in real time while use them.
// See comments in SlotMigrate::SlotMigrate for detailed reason.
db_ = nullptr;
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
metadata_cf_handle_ = nullptr;
}

Expand Down
11 changes: 6 additions & 5 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static std::map<RedisType, std::string> type_to_cmd = {

SlotMigrate::SlotMigrate(Server *svr, int migration_speed, int pipeline_size_limit, int seq_gap)
: Database(svr->storage_, kDefaultNamespace), svr_(svr) {
// Let db_ and metadata_cf_handle_ be nullptr, and get them in real time to avoid accessing invalid pointer,
// Let metadata_cf_handle_ be nullptr, and get them in real time to avoid accessing invalid pointer,
// because metadata_cf_handle_ and db_ will be destroyed if DB is reopened.
// [Situation]:
// 1. Start an empty slave server.
Expand All @@ -59,7 +59,6 @@ SlotMigrate::SlotMigrate(Server *svr, int migration_speed, int pipeline_size_lim
// in all functions used in migration process.
// [Note]:
// This problem may exist in all functions of Database called in slot migration process.
db_ = nullptr;
metadata_cf_handle_ = nullptr;

if (migration_speed >= 0) {
Expand Down Expand Up @@ -294,7 +293,7 @@ Status SlotMigrate::SendSnapshot() {
read_options.snapshot = slot_snapshot_;
storage_->SetReadOptions(read_options);
rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(Engine::kMetadataColumnFamilyName);
std::unique_ptr<rocksdb::Iterator> iter(storage_->GetDB()->NewIterator(read_options, cf_handle));
auto iter = DBUtil::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));

// Construct key prefix to iterate the keys belong to the target slot
std::string prefix;
Expand Down Expand Up @@ -646,7 +645,8 @@ Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata
rocksdb::ReadOptions read_options;
read_options.snapshot = slot_snapshot_;
storage_->SetReadOptions(read_options);
std::unique_ptr<rocksdb::Iterator> iter(storage_->GetDB()->NewIterator(read_options));
// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
auto iter = DBUtil::UniqueIterator(storage_->GetDB()->NewIterator(read_options));

// Construct key prefix to iterate values of the complex type user key
std::string slot_key, prefix_subkey;
Expand Down Expand Up @@ -747,7 +747,8 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada
rocksdb::ReadOptions read_options;
read_options.snapshot = slot_snapshot_;
storage_->SetReadOptions(read_options);
std::unique_ptr<rocksdb::Iterator> iter(
// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
auto iter = DBUtil::UniqueIterator(
storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(Engine::kStreamColumnFamilyName)));

std::string ns_key;
Expand Down
10 changes: 8 additions & 2 deletions src/commands/cmd_txn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "commander.h"
#include "error_constants.h"
#include "server/redis_connection.h"
#include "server/server.h"

namespace Redis {

Expand Down Expand Up @@ -68,13 +69,18 @@ class CommandExec : public Commander {
return Status::OK();
}

auto storage = svr->storage_;
// Reply multi length first
conn->Reply(Redis::MultiLen(conn->GetMultiExecCommands()->size()));
// Execute multi-exec commands
conn->SetInExec();
conn->ExecuteCommands(conn->GetMultiExecCommands());
auto s = storage->BeginTxn();
if (s.IsOK()) {
conn->ExecuteCommands(conn->GetMultiExecCommands());
s = storage->CommitTxn();
}
conn->ResetMultiExec();
return Status::OK();
return s;
}
};

Expand Down
8 changes: 5 additions & 3 deletions src/common/db_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ struct UniqueIterator : std::unique_ptr<rocksdb::Iterator> {
using base_type = std::unique_ptr<rocksdb::Iterator>;

explicit UniqueIterator(rocksdb::Iterator* iter) : base_type(iter) {}
UniqueIterator(rocksdb::DB* db, const rocksdb::ReadOptions& options, rocksdb::ColumnFamilyHandle* column_family)
: base_type(db->NewIterator(options, column_family)) {}
UniqueIterator(rocksdb::DB* db, const rocksdb::ReadOptions& options) : base_type(db->NewIterator(options)) {}
UniqueIterator(Engine::Storage* storage, const rocksdb::ReadOptions& options,
rocksdb::ColumnFamilyHandle* column_family)
: base_type(storage->NewIterator(options, column_family)) {}
UniqueIterator(Engine::Storage* storage, const rocksdb::ReadOptions& options)
: base_type(storage->NewIterator(options)) {}
};

} // namespace DBUtil
51 changes: 51 additions & 0 deletions src/common/ptr_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include <memory>

#pragma once

enum class ObserverOrUnique : char { Observer, Unique };

template <typename T, typename D = std::default_delete<T>>
struct ObserverOrUniquePtr : private D {
explicit ObserverOrUniquePtr(T* ptr, ObserverOrUnique own) : ptr_(ptr), own_(own) {}

ObserverOrUniquePtr(ObserverOrUniquePtr&& p) : ptr_(p.ptr_), own_(p.own_) { p.Release(); }
ObserverOrUniquePtr(const ObserverOrUniquePtr&) = delete;
ObserverOrUniquePtr& operator=(const ObserverOrUniquePtr&) = delete;

~ObserverOrUniquePtr() {
if (own_ == ObserverOrUnique::Unique) {
(*this)(ptr_);
}
}

T* operator->() const { return ptr_; }
T* Get() const { return ptr_; }
T* Release() {
own_ = ObserverOrUnique::Observer;
return ptr_;
}

private:
T* ptr_;
ObserverOrUnique own_;
};
2 changes: 1 addition & 1 deletion src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ Status Server::ScriptExists(const std::string &sha) {
Status Server::ScriptGet(const std::string &sha, std::string *body) {
std::string func_name = Engine::kLuaFunctionPrefix + sha;
auto cf = storage_->GetCFHandle(Engine::kPropagateColumnFamilyName);
auto s = storage_->GetDB()->Get(rocksdb::ReadOptions(), cf, func_name, body);
auto s = storage_->Get(rocksdb::ReadOptions(), cf, func_name, body);
if (!s.ok()) {
if (s.IsNotFound()) return {Status::NotFound};

Expand Down
4 changes: 2 additions & 2 deletions src/stats/disk_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ rocksdb::Status Disk::GetApproximateSizes(const Metadata &metadata, const Slice
InternalKey(ns_key, subkeyright, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(&next_version_prefix_key);
auto key_range = rocksdb::Range(prefix_key, next_version_prefix_key);
uint64_t tmp_size = 0;
rocksdb::Status s = db_->GetApproximateSizes(option_, column_family, &key_range, 1, &tmp_size);
rocksdb::Status s = storage_->GetDB()->GetApproximateSizes(option_, column_family, &key_range, 1, &tmp_size);
if (!s.ok()) return s;
*key_size += tmp_size;
return rocksdb::Status::OK();
Expand Down Expand Up @@ -79,7 +79,7 @@ rocksdb::Status Disk::GetKeySize(const Slice &user_key, RedisType type, uint64_t
rocksdb::Status Disk::GetStringSize(const Slice &ns_key, uint64_t *key_size) {
auto limit = ns_key.ToString() + static_cast<char>(0);
auto key_range = rocksdb::Range(Slice(ns_key), Slice(limit));
return db_->GetApproximateSizes(option_, metadata_cf_handle_, &key_range, 1, key_size);
return storage_->GetDB()->GetApproximateSizes(option_, metadata_cf_handle_, &key_range, 1, key_size);
}

rocksdb::Status Disk::GetHashSize(const Slice &ns_key, uint64_t *key_size) {
Expand Down
57 changes: 28 additions & 29 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@

namespace Redis {

Database::Database(Engine::Storage *storage, std::string ns)
: storage_(storage), db_(storage->GetDB()), namespace_(std::move(ns)) {
Database::Database(Engine::Storage *storage, std::string ns) : storage_(storage), namespace_(std::move(ns)) {
metadata_cf_handle_ = storage->GetCFHandle("metadata");
}

Expand Down Expand Up @@ -63,10 +62,10 @@ rocksdb::Status Database::GetMetadata(RedisType type, const Slice &ns_key, Metad
}

rocksdb::Status Database::GetRawMetadata(const Slice &ns_key, std::string *bytes) {
LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
return db_->Get(read_options, metadata_cf_handle_, ns_key, bytes);
return storage_->Get(read_options, metadata_cf_handle_, ns_key, bytes);
}

rocksdb::Status Database::GetRawMetadataByUserKey(const Slice &user_key, std::string *bytes) {
Expand All @@ -82,7 +81,7 @@ rocksdb::Status Database::Expire(const Slice &user_key, int timestamp) {
std::string value;
Metadata metadata(kRedisNone, false);
LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), metadata_cf_handle_, ns_key, &value);
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
metadata.Decode(value);
if (metadata.Expired()) {
Expand All @@ -97,11 +96,11 @@ rocksdb::Status Database::Expire(const Slice &user_key, int timestamp) {
memcpy(buf, value.data(), value.size());
// +1 to skip the flags
EncodeFixed32(buf + 1, (uint32_t)timestamp);
rocksdb::WriteBatch batch;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone, {std::to_string(kRedisCmdExpire)});
batch.PutLogData(log_data.Encode());
batch.Put(metadata_cf_handle_, ns_key, Slice(buf, value.size()));
s = storage_->Write(storage_->DefaultWriteOptions(), &batch);
batch->PutLogData(log_data.Encode());
batch->Put(metadata_cf_handle_, ns_key, Slice(buf, value.size()));
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
delete[] buf;
return s;
}
Expand All @@ -112,7 +111,7 @@ rocksdb::Status Database::Del(const Slice &user_key) {

std::string value;
LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = db_->Get(rocksdb::ReadOptions(), metadata_cf_handle_, ns_key, &value);
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
metadata.Decode(value);
Expand All @@ -124,15 +123,15 @@ rocksdb::Status Database::Del(const Slice &user_key) {

rocksdb::Status Database::Exists(const std::vector<Slice> &keys, int *ret) {
*ret = 0;
LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();

rocksdb::Status s;
std::string ns_key, value;
for (const auto &key : keys) {
AppendNamespacePrefix(key, &ns_key);
s = db_->Get(read_options, metadata_cf_handle_, ns_key, &value);
s = storage_->Get(read_options, metadata_cf_handle_, ns_key, &value);
if (s.ok()) {
Metadata metadata(kRedisNone, false);
metadata.Decode(value);
Expand All @@ -147,11 +146,11 @@ rocksdb::Status Database::TTL(const Slice &user_key, int *ttl) {
AppendNamespacePrefix(user_key, &ns_key);

*ttl = -2; // ttl is -2 when the key does not exist or expired
LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string value;
rocksdb::Status s = db_->Get(read_options, metadata_cf_handle_, ns_key, &value);
rocksdb::Status s = storage_->Get(read_options, metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Metadata metadata(kRedisNone, false);
Expand All @@ -178,11 +177,11 @@ void Database::Keys(const std::string &prefix, std::vector<std::string> *keys, K
}

uint64_t ttl_sum = 0;
LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
storage_->SetReadOptions(read_options);
auto iter = DBUtil::UniqueIterator(db_, read_options, metadata_cf_handle_);
auto iter = DBUtil::UniqueIterator(storage_, read_options, metadata_cf_handle_);

while (true) {
ns_prefix.empty() ? iter->SeekToFirst() : iter->Seek(ns_prefix);
Expand Down Expand Up @@ -232,11 +231,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
uint16_t slot_id = 0, slot_start = 0;
std::string ns_prefix, ns_cursor, ns, user_key, value, index_key;

LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
storage_->SetReadOptions(read_options);
auto iter = DBUtil::UniqueIterator(db_, read_options, metadata_cf_handle_);
auto iter = DBUtil::UniqueIterator(storage_, read_options, metadata_cf_handle_);

AppendNamespacePrefix(cursor, &ns_cursor);
if (storage_->IsSlotIdEncoded()) {
Expand Down Expand Up @@ -356,11 +355,11 @@ rocksdb::Status Database::FlushDB() {
}

rocksdb::Status Database::FlushAll() {
LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
storage_->SetReadOptions(read_options);
auto iter = DBUtil::UniqueIterator(db_, read_options, metadata_cf_handle_);
auto iter = DBUtil::UniqueIterator(storage_, read_options, metadata_cf_handle_);
iter->SeekToFirst();
if (!iter->Valid()) {
return rocksdb::Status::OK();
Expand All @@ -384,11 +383,11 @@ rocksdb::Status Database::Dump(const Slice &user_key, std::vector<std::string> *
std::string ns_key;
AppendNamespacePrefix(user_key, &ns_key);

LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string value;
rocksdb::Status s = db_->Get(read_options, metadata_cf_handle_, ns_key, &value);
rocksdb::Status s = storage_->Get(read_options, metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Metadata metadata(kRedisNone, false);
Expand Down Expand Up @@ -433,11 +432,11 @@ rocksdb::Status Database::Type(const Slice &user_key, RedisType *type) {
AppendNamespacePrefix(user_key, &ns_key);

*type = kRedisNone;
LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
std::string value;
rocksdb::Status s = db_->Get(read_options, metadata_cf_handle_, ns_key, &value);
rocksdb::Status s = storage_->Get(read_options, metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

Metadata metadata(kRedisNone, false);
Expand Down Expand Up @@ -466,11 +465,11 @@ rocksdb::Status Database::FindKeyRangeWithPrefix(const std::string &prefix, cons
begin->clear();
end->clear();

LatestSnapShot ss(storage_->GetDB());
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
storage_->SetReadOptions(read_options);
auto iter = DBUtil::UniqueIterator(storage_->GetDB(), read_options, cf_handle);
auto iter = DBUtil::UniqueIterator(storage_, read_options, cf_handle);
iter->Seek(prefix);
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
return rocksdb::Status::NotFound();
Expand Down Expand Up @@ -525,7 +524,7 @@ rocksdb::Status Database::GetSlotKeysInfo(int slot, std::map<int, uint64_t> *slo
rocksdb::ReadOptions read_options;
read_options.snapshot = snapshot;
storage_->SetReadOptions(read_options);
auto iter = db_->NewIterator(read_options, metadata_cf_handle_);
auto iter = DBUtil::UniqueIterator(storage_, read_options, metadata_cf_handle_);
bool end = false;
for (int i = 0; i < HASH_SLOTS_SIZE; i++) {
std::string prefix;
Expand Down Expand Up @@ -568,11 +567,11 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
rocksdb::Status s = GetMetadata(type, ns_key, &metadata);
if (!s.ok()) return s;

LatestSnapShot ss(db_);
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
storage_->SetReadOptions(read_options);
auto iter = DBUtil::UniqueIterator(db_, read_options);
auto iter = DBUtil::UniqueIterator(storage_, read_options);
std::string match_prefix_key;
if (!subkey_prefix.empty()) {
InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode(&match_prefix_key);
Expand Down
Loading