Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
brother-jin committed Aug 5, 2024
1 parent fa414f1 commit c730b71
Show file tree
Hide file tree
Showing 18 changed files with 102 additions and 71 deletions.
1 change: 0 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,6 @@ class PKPatternMatchDelCmd : public Cmd {
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }
void DoBinlog() override;

private:
storage::DataType type_ = storage::kAll;
Expand Down
63 changes: 44 additions & 19 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3098,15 +3098,33 @@ void PKPatternMatchDelCmd::DoInitial() {
res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel);
return;
}
max_count_ = storage::BATCH_DELETE_LIMIT;
if (argv_.size() > 2) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &max_count_) == 0 || max_count_ < 1 || max_count_ > storage::BATCH_DELETE_LIMIT) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}
}

void PKPatternMatchDelCmd::Do() {
int ret = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret);
if (s.ok()) {
res_.AppendInteger(ret);
int64_t count = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(type_, pattern_, &count, &remove_keys_, max_count_);

if(s.ok()) {
res_.AppendInteger(count);
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
if (count >= 0) {
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
}
}
}

Expand All @@ -3118,26 +3136,33 @@ void PKPatternMatchDelCmd::DoUpdateCache() {
if(s_.ok()) {
std::vector<std::string> v;
for (auto key : remove_keys_) {
v.emplace_back(PCacheKeyPrefixK + key);
v.emplace_back(PCacheKeyPrefixL + key);
v.emplace_back(PCacheKeyPrefixZ + key);
v.emplace_back(PCacheKeyPrefixS + key);
v.emplace_back(PCacheKeyPrefixH + key);
if (argv_.size() > 2) {
//only delete the corresponding prefix
switch (type_) {
case storage::kSets:
v.emplace_back(PCacheKeyPrefixS + key);
break;
case storage::kLists:
v.emplace_back(PCacheKeyPrefixL + key);
break;
case storage::kStrings:
v.emplace_back(PCacheKeyPrefixK + key);
break;
case storage::kZSets:
v.emplace_back(PCacheKeyPrefixZ + key);
break;
case storage::kHashes:
v.emplace_back(PCacheKeyPrefixH + key);
break;
default:
break;
}
}
}
db_->cache()->Del(v);
}
}

void PKPatternMatchDelCmd::DoBinlog() {
std::string opt = "del";
for(auto& key: remove_keys_) {
argv_.clear();
argv_.emplace_back(opt);
argv_.emplace_back(key);
Cmd::DoBinlog();
}
}

void DummyCmd::DoInitial() {}

void DummyCmd::Do() {}
Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePadding, std::move(paddingptr)));

std::unique_ptr<Cmd> pkpatternmatchdelptr =
std::make_unique<PKPatternMatchDelCmd>(kCmdNamePKPatternMatchDel, -2, kCmdFlagsWrite | kCmdFlagsAdmin);
std::make_unique<PKPatternMatchDelCmd>(kCmdNamePKPatternMatchDel, -3, kCmdFlagsWrite | kCmdFlagsAdmin);
cmd_table->insert(
std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePKPatternMatchDel, std::move(pkpatternmatchdelptr)));
std::unique_ptr<Cmd> dummyptr = std::make_unique<DummyCmd>(kCmdDummy, 0, kCmdFlagsWrite);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ class Storage {

// Traverses the database of the specified type, removing the Key that matches
// the pattern
Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret);
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count);

// Iterate over a collection of elements
// return next_key that the user need to use as the start_key argument
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class Redis {
virtual Status GetProperty(const std::string& property, uint64_t* out) = 0;
virtual Status ScanKeyNum(KeyInfo* key_info) = 0;
virtual Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) = 0;
virtual Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) = 0;

// Keys Commands
virtual Status Expire(const Slice& key, int32_t ttl) = 0;
Expand All @@ -110,7 +109,7 @@ class Redis {
virtual Status Expireat(const Slice& key, int32_t timestamp) = 0;
virtual Status Persist(const Slice& key) = 0;
virtual Status TTL(const Slice& key, int64_t* timestamp) = 0;
Status PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count);
virtual Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) = 0;
Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Status RedisHashes::ScanKeys(const std::string& pattern, std::vector<std::string
return Status::OK();
}

Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisHashes::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -154,27 +154,28 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret)

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
if (!parsed_hashes_meta_value.IsStale() && (parsed_hashes_meta_value.count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_hashes_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>( batch.Count());
total_delete += static_cast<int64_t>( batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -186,7 +187,7 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_hashes.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RedisHashes : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Hashes Commands
Status HDel(const Slice& key, const std::vector<std::string>& fields, int32_t* ret);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status RedisLists::ScanKeys(const std::string& pattern, std::vector<std::string>
return Status::OK();
}

Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisLists::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -161,27 +161,28 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (!parsed_lists_meta_value.IsStale() && (parsed_lists_meta_value.count() != 0U) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_lists_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -193,7 +194,7 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RedisLists : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Lists commands;
Status LIndex(const Slice& key, int64_t index, std::string* element);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ rocksdb::Status RedisSets::ScanKeys(const std::string& pattern, std::vector<std:
return rocksdb::Status::OK();
}

rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
rocksdb::Status RedisSets::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -161,27 +161,28 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
rocksdb::Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (!parsed_sets_meta_value.IsStale() && (parsed_sets_meta_value.count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_sets_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -193,7 +194,7 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_sets.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RedisSets : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Setes Commands
Status SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret);
Expand Down
11 changes: 6 additions & 5 deletions src/storage/src/redis_streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ Status RedisStreams::ScanKeys(const std::string& pattern, std::vector<std::strin
return Status::OK();
}

Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisStreams::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -452,15 +452,15 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
StreamMetaValue stream_meta_value;
Expand All @@ -469,11 +469,12 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
stream_meta_value.InitMetaValue();
batch.Put(handles_[0], key, stream_meta_value.value());
remove_keys->push_back(key.data());
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
Expand All @@ -485,7 +486,7 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret)
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class RedisStreams : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* keyinfo) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;
Status PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
std::vector<std::string>* keys, std::string* next_key);
Status PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit,
Expand Down
Loading

0 comments on commit c730b71

Please sign in to comment.