Skip to content

Commit

Permalink
feat: floyd supports "Within the same Redis database, a single key na…
Browse files Browse the repository at this point in the history
…me can only have one type of data structure" (#2609)

* floyd supports one key for one data structure
* Modified the criteria for determining multiple keys
  • Loading branch information
Mixficsol authored May 20, 2024
1 parent 3e15569 commit e38e255
Show file tree
Hide file tree
Showing 76 changed files with 3,364 additions and 3,057 deletions.
10 changes: 3 additions & 7 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ class CompactCmd : public Cmd {
private:
void DoInitial() override;
void Clear() override {
struct_type_.clear();
compact_dbs_.clear();
}
std::string struct_type_;
std::set<std::string> compact_dbs_;
};

Expand All @@ -127,12 +125,10 @@ class CompactRangeCmd : public Cmd {
private:
void DoInitial() override;
void Clear() override {
struct_type_.clear();
compact_dbs_.clear();
start_key_.clear();
end_key_.clear();
}
std::string struct_type_;
std::set<std::string> compact_dbs_;
std::string start_key_;
std::string end_key_;
Expand Down Expand Up @@ -424,9 +420,9 @@ class ScandbCmd : public Cmd {
Cmd* Clone() override { return new ScandbCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
storage::DataType type_ = storage::DataType::kAll;
void DoInitial() override;
void Clear() override { type_ = storage::kAll; }
void Clear() override { type_ = storage::DataType::kAll; }
};

class SlowlogCmd : public Cmd {
Expand Down Expand Up @@ -473,7 +469,7 @@ class PKPatternMatchDelCmd : public Cmd {
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
storage::DataType type_ = storage::DataType::kAll;
std::string pattern_;
void DoInitial() override;
};
Expand Down
2 changes: 1 addition & 1 deletion include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<

rocksdb::Status Init(uint32_t cache_num, cache::CacheConfig *cache_cfg);
rocksdb::Status Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg = nullptr);
std::map<storage::DataType, int64_t> TTL(std::string &key, std::map<storage::DataType, rocksdb::Status>* type_status);
int64_t TTL(std::string &key);
void ResetConfig(cache::CacheConfig *cache_cfg);
void Destroy(void);
void SetCacheStatus(int status);
Expand Down
12 changes: 11 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ const std::string kCmdNameTtl = "ttl";
const std::string kCmdNamePttl = "pttl";
const std::string kCmdNamePersist = "persist";
const std::string kCmdNameType = "type";
const std::string kCmdNamePType = "ptype";
const std::string kCmdNameScan = "scan";
const std::string kCmdNameScanx = "scanx";
const std::string kCmdNamePKSetexAt = "pksetexat";
Expand Down Expand Up @@ -248,6 +247,12 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";

/*
* If a type holds a key, a new data structure
* that uses the key will use this error
*/
constexpr const char* ErrTypeMessage = "Invalid argument: WRONGTYPE";

using PikaCmdArgsType = net::RedisCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

Expand Down Expand Up @@ -326,6 +331,7 @@ class CmdRes {
kInvalidTransaction,
kTxnQueued,
kTxnAbort,
kMultiKey
};

CmdRes() = default;
Expand Down Expand Up @@ -420,6 +426,10 @@ class CmdRes {
result.append(message_);
result.append(kNewLine);
break;
case kMultiKey:
result = "-WRONGTYPE Operation against a key holding the wrong kind of value";
result.append(kNewLine);
break;
default:
break;
}
Expand Down
8 changes: 0 additions & 8 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,6 @@ const std::string kDBSyncModule = "document";

const std::string kBgsaveInfoFile = "info";

// prefix of pika cache
const std::string PCacheKeyPrefixK = "K";
const std::string PCacheKeyPrefixH = "H";
const std::string PCacheKeyPrefixS = "S";
const std::string PCacheKeyPrefixZ = "Z";
const std::string PCacheKeyPrefixL = "L";


/*
* cache status
*/
Expand Down
22 changes: 1 addition & 21 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,26 +732,6 @@ class TypeCmd : public Cmd {
rocksdb::Status s_;
};

class PTypeCmd : public Cmd {
public:
PTypeCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::KEYSPACE)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PTypeCmd(*this); }

private:
std::string key_;
void DoInitial() override;
rocksdb::Status s_;
};

class ScanCmd : public Cmd {
public:
ScanCmd(const std::string& name, int arity, uint32_t flag)
Expand Down Expand Up @@ -865,7 +845,7 @@ class PKRScanRangeCmd : public Cmd {
Cmd* Clone() override { return new PKRScanRangeCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
storage::DataType type_ = storage::DataType::kAll;
std::string key_start_;
std::string key_end_;
std::string pattern_ = "*";
Expand Down
1 change: 1 addition & 0 deletions include/pika_migrate_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "pika_client_conn.h"
#include "pika_db.h"
#include "storage/storage.h"
#include "storage/src/base_data_key_format.h"
#include "strings.h"

void WriteDelKeyToBinlog(const std::string& key, const std::shared_ptr<DB>& db);
Expand Down
11 changes: 1 addition & 10 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,12 @@ extern std::unique_ptr<PikaConf> g_pika_conf;

enum TaskType {
kCompactAll,
kCompactStrings,
kCompactHashes,
kCompactSets,
kCompactZSets,
kCompactList,
kResetReplState,
kPurgeLog,
kStartKeyScan,
kStopKeyScan,
kBgSave,
kCompactRangeStrings,
kCompactRangeHashes,
kCompactRangeSets,
kCompactRangeZSets,
kCompactRangeList,
kCompactRangeAll,
};

struct TaskArg {
Expand Down
2 changes: 2 additions & 0 deletions include/pika_slot_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "net/include/net_cli.h"
#include "net/include/net_thread.h"
#include "storage/storage.h"
#include "storage/src/base_data_key_format.h"
#include "strings.h"

const std::string SlotKeyPrefix = "_internal:slotkey:4migrate:";
Expand Down Expand Up @@ -56,6 +57,7 @@ class PikaMigrate {
int ParseSKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseHKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseLKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
int ParseMKey(const std::string& key, std::string& wbuf_str, const std::shared_ptr<DB>& db);
bool SetTTL(const std::string& key, std::string& wbuf_str, int64_t ttl);
};

Expand Down
8 changes: 4 additions & 4 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ class RedisCache {
Status HExists(std::string& key, std::string &field);
Status HIncrby(std::string& key, std::string &field, int64_t value);
Status HIncrbyfloat(std::string& key, std::string &field, double value);
Status HLen(std::string& key, uint64_t *len);
Status HLen(const std::string& key, uint64_t *len);
Status HStrlen(std::string& key, std::string &field, uint64_t *len);

// List Commands
Status LIndex(std::string& key, int64_t index, std::string *element);
Status LInsert(std::string& key, storage::BeforeOrAfter &before_or_after,
std::string &pivot, std::string &value);
Status LLen(std::string& key, uint64_t *len);
Status LLen(const std::string& key, uint64_t *len);
Status LPop(std::string& key, std::string *element);
Status LPush(std::string& key, std::vector<std::string> &values);
Status LPushx(std::string& key, std::vector<std::string> &values);
Expand All @@ -108,15 +108,15 @@ class RedisCache {

// Set Commands
Status SAdd(std::string& key, std::vector<std::string> &members);
Status SCard(std::string& key, uint64_t *len);
Status SCard(const std::string& key, uint64_t *len);
Status SIsmember(std::string& key, std::string& member);
Status SMembers(std::string& key, std::vector<std::string> *members);
Status SRem(std::string& key, std::vector<std::string> &members);
Status SRandmember(std::string& key, int64_t count, std::vector<std::string> *members);

// Zset Commands
Status ZAdd(std::string& key, std::vector<storage::ScoreMember> &score_members);
Status ZCard(std::string& key, uint64_t *len);
Status ZCard(const std::string& key, uint64_t *len);
Status ZCount(std::string& key, std::string &min, std::string &max, uint64_t *len);
Status ZIncrby(std::string& key, std::string& member, double increment);
Status ZRange(std::string& key,
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ Status RedisCache::HIncrbyfloat(std::string& key, std::string &field, double val
return Status::OK();
}

Status RedisCache::HLen(std::string& key, uint64_t *len) {
Status RedisCache::HLen(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Status RedisCache::LInsert(std::string& key, storage::BeforeOrAfter &before_or_a
return Status::OK();
}

Status RedisCache::LLen(std::string& key, uint64_t *len) {
Status RedisCache::LLen(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Status RedisCache::SAdd(std::string& key, std::vector<std::string> &members) {
return Status::OK();
}

Status RedisCache::SCard(std::string& key, uint64_t *len) {
Status RedisCache::SCard(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Status RedisCache::ZAdd(std::string& key, std::vector<storage::ScoreMember> &sco
return Status::OK();
}

Status RedisCache::ZCard(std::string& key, uint64_t *len) {
Status RedisCache::ZCard(const std::string& key, uint64_t *len) {
robj *kobj = createObject(OBJ_STRING, sdsnewlen(key.data(), key.size()));
DEFER {
DecrObjectsRefCount(kobj);
Expand Down
73 changes: 15 additions & 58 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,12 +344,8 @@ void CompactCmd::DoInitial() {
}

if (argv_.size() == 1) {
struct_type_ = "all";
compact_dbs_ = g_pika_server->GetAllDBName();
} else if (argv_.size() == 2) {
struct_type_ = argv_[1];
compact_dbs_ = g_pika_server->GetAllDBName();
} else if (argv_.size() == 3) {
std::vector<std::string> dbs;
pstd::StringSplit(argv_[1], COMMA, dbs);
for (const auto& db : dbs) {
Expand All @@ -360,27 +356,16 @@ void CompactCmd::DoInitial() {
compact_dbs_.insert(db);
}
}
struct_type_ = argv_[2];
}
}

/*
* Because meta-CF stores the meta information of all data structures,
* the compact operation can only operate on all data types without
* specifying data types
*/
void CompactCmd::Do() {
if (strcasecmp(struct_type_.data(), "all") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactAll});
} else if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactStrings});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactHashes});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactSets});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactZSets});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactList});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactAll});
LogCommand();
res_.SetRes(CmdRes::kOk);
}
Expand All @@ -406,26 +391,12 @@ void CompactRangeCmd::DoInitial() {
compact_dbs_.insert(db);
}
}
struct_type_ = argv_[2];
start_key_ = argv_[3];
end_key_ = argv_[4];
start_key_ = argv_[2];
end_key_ = argv_[3];
}

void CompactRangeCmd::Do() {
if (strcasecmp(struct_type_.data(), "string") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeStrings, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "hash") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeHashes, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "set") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeSets, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "zset") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeZSets, {start_key_, end_key_}});
} else if (strcasecmp(struct_type_.data(), "list") == 0) {
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeList, {start_key_, end_key_}});
} else {
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
g_pika_server->DoSameThingSpecificDB(compact_dbs_, {TaskType::kCompactRangeAll, {start_key_, end_key_}});
LogCommand();
res_.SetRes(CmdRes::kOk);
}
Expand Down Expand Up @@ -2962,18 +2933,18 @@ void ScandbCmd::DoInitial() {
return;
}
if (argv_.size() == 1) {
type_ = storage::kAll;
type_ = storage::DataType::kAll;
} else {
if (strcasecmp(argv_[1].data(), "string") == 0) {
type_ = storage::kStrings;
type_ = storage::DataType::kStrings;
} else if (strcasecmp(argv_[1].data(), "hash") == 0) {
type_ = storage::kHashes;
type_ = storage::DataType::kHashes;
} else if (strcasecmp(argv_[1].data(), "set") == 0) {
type_ = storage::kSets;
type_ = storage::DataType::kSets;
} else if (strcasecmp(argv_[1].data(), "zset") == 0) {
type_ = storage::kZSets;
type_ = storage::DataType::kZSets;
} else if (strcasecmp(argv_[1].data(), "list") == 0) {
type_ = storage::kLists;
type_ = storage::DataType::kLists;
} else {
res_.SetRes(CmdRes::kInvalidDbType);
}
Expand Down Expand Up @@ -3055,20 +3026,6 @@ void PKPatternMatchDelCmd::DoInitial() {
return;
}
pattern_ = argv_[1];
if (strcasecmp(argv_[2].data(), "set") == 0) {
type_ = storage::kSets;
} else if (strcasecmp(argv_[2].data(), "list") == 0) {
type_ = storage::kLists;
} else if (strcasecmp(argv_[2].data(), "string") == 0) {
type_ = storage::kStrings;
} else if (strcasecmp(argv_[2].data(), "zset") == 0) {
type_ = storage::kZSets;
} else if (strcasecmp(argv_[2].data(), "hash") == 0) {
type_ = storage::kHashes;
} else {
res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel);
return;
}
}

void PKPatternMatchDelCmd::Do() {
Expand Down
Loading

0 comments on commit e38e255

Please sign in to comment.