diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 7943bc6319..d0460bdcf6 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -3098,15 +3098,33 @@ void PKPatternMatchDelCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel); return; } + max_count_ = storage::BATCH_DELETE_LIMIT; + if (argv_.size() > 2) { + if (pstd::string2int(argv_[2].data(), argv_[2].size(), &max_count_) == 0 || max_count_ < 1 || max_count_ > storage::BATCH_DELETE_LIMIT) { + res_.SetRes(CmdRes::kInvalidInt); + return; + } + } } void PKPatternMatchDelCmd::Do() { - int ret = 0; - rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret); - if (s.ok()) { - res_.AppendInteger(ret); + int64_t count = 0; + rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(type_, pattern_, &count, &remove_keys_, max_count_); + + if(s.ok()) { + res_.AppendInteger(count); + s_ = rocksdb::Status::OK(); + for (const auto& key : remove_keys_) { + RemSlotKey(key, db_); + } } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); + if (count >= 0) { + s_ = rocksdb::Status::OK(); + for (const auto& key : remove_keys_) { + RemSlotKey(key, db_); + } + } } } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 84b2132255..540bac5880 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -988,7 +988,7 @@ class Storage { // Traverses the database of the specified type, removing the Key that matches // the pattern - Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret); + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count); // Iterate over a collection of elements // return next_key that the user need to use as the start_key argument diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 9f29a6c3d3..a048c97987 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -110,7 +110,7 @@ class Redis { virtual Status Expireat(const Slice& key, int32_t timestamp) = 0; virtual Status Persist(const Slice& key) = 0; virtual Status TTL(const Slice& key, int64_t* timestamp) = 0; - Status PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count); + virtual Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) = 0; Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); diff --git a/src/storage/src/redis_strings.h b/src/storage/src/redis_strings.h index 2cb0bdb13f..51535d102e 100644 --- a/src/storage/src/redis_strings.h +++ b/src/storage/src/redis_strings.h @@ -9,7 +9,7 @@ #include #include #include - +#include "" #include "src/redis.h" namespace storage { @@ -27,7 +27,7 @@ class RedisStrings : public Redis { Status ScanKeyNum(KeyInfo* key_info) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; - + Status PKPatternMatchDelWithRemoveKeys(DataType::kStrings, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; // Strings Commands Status Append(const Slice& key, const Slice& value, int32_t* ret); Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index f348dc64be..9840c77129 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1137,26 +1137,28 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start, return s; } -Status Storage::PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret) { +Status Storage::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, + std::vector* remove_keys, const int64_t& max_count) { Status s; + int64_t tmp_ret = 0; switch (data_type) { case DataType::kStrings: - s = strings_db_->PKPatternMatchDel(pattern, ret); + s = strings_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, &tmp_ret, remove_keys, max_count - *ret); break; case DataType::kHashes: - s = hashes_db_->PKPatternMatchDel(pattern, ret); + s = hashes_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, &tmp_ret, remove_keys, max_count - *ret); break; case DataType::kLists: - s = lists_db_->PKPatternMatchDel(pattern, ret); + s = lists_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, &tmp_ret, remove_keys, max_count - *ret); break; case DataType::kZSets: - s = zsets_db_->PKPatternMatchDel(pattern, ret); + s = zsets_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, &tmp_ret, remove_keys, max_count - *ret); break; case DataType::kSets: - s = sets_db_->PKPatternMatchDel(pattern, ret); + s = sets_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, &tmp_ret, remove_keys, max_count - *ret); break; case DataType::kStreams: - s = streams_db_->PKPatternMatchDel(pattern, ret); + s = streams_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, &tmp_ret, remove_keys, max_count - *ret); break; default: s = Status::Corruption("Unsupported data type");