Skip to content

Commit

Permalink
feat:add lastsave cmd (OpenAtomFoundation#2167)
Browse files Browse the repository at this point in the history
* feat:unstable branch

* feat:handle merge

* feat:add lastsave cmd

* feat:update lastsave cmd

* feat:update lastsave cmd

* feat:fixed lastsave cmd

* feat:fixed server_test.go

* feat:fixed server_test.go

* feat:fixed server_test.go

* feat:fixed server_test.go

* feat:update modifies

* feat:delete redundant files

* feat:fixed server_test

* feat:update code

* feat:update server_test

* feat:update ut
  • Loading branch information
hero-heng authored Dec 14, 2023
1 parent 8f2d16d commit acea406
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 23 deletions.
12 changes: 12 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,18 @@ class TimeCmd : public Cmd {
void DoInitial() override;
};

class LastsaveCmd : public Cmd {
public:
LastsaveCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new LastsaveCmd(*this); }

private:
void DoInitial() override;
};

class DelbackupCmd : public Cmd {
public:
DelbackupCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand Down
1 change: 1 addition & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const std::string kCmdNameCommand = "command";
const std::string kCmdNameDiskRecovery = "diskrecovery";
const std::string kCmdNameClearReplicationID = "clearreplicationid";
const std::string kCmdNameDisableWal = "disablewal";
const std::string kCmdNameLastSave = "lastsave";
const std::string kCmdNameCache = "cache";
const std::string kCmdNameClearCache = "clearcache";

Expand Down
12 changes: 12 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,12 @@ class PikaServer : public pstd::noncopyable {
void ClearHitRatio(void);
void ProcessCronTask();
double HitRatio();

/*
* lastsave used
*/
int64_t GetLastSave() const {return lastsave_;}
void UpdateLastSave(int64_t lastsave) {lastsave_ = lastsave;}
private:
/*
* TimingTask use
Expand All @@ -577,6 +583,7 @@ class PikaServer : public pstd::noncopyable {
void AutoKeepAliveRSync();
void AutoUpdateNetworkMetric();
void PrintThreadPoolQueueStatus();
int64_t GetLastSaveTime(const std::string& dump_dir);

std::string host_;
int port_ = 0;
Expand Down Expand Up @@ -701,6 +708,11 @@ class PikaServer : public pstd::noncopyable {
*/
std::shared_mutex mu_;
std::shared_mutex cache_info_rwlock_;

/*
* lastsave used
*/
int64_t lastsave_ = 0;
};

#endif
11 changes: 11 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2610,6 +2610,17 @@ void TimeCmd::Do(std::shared_ptr<Slot> slot) {
}
}

void LastsaveCmd::DoInitial() {
if (argv_.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameLastSave);
return;
}
}

void LastsaveCmd::Do(std::shared_ptr<Slot> slot) {
res_.AppendInteger(g_pika_server->GetLastSave());
}

void DelbackupCmd::DoInitial() {
if (argv_.size() != 1) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDelbackup);
Expand Down
3 changes: 3 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
extern std::unique_ptr<PikaCmdTableManager> g_pika_cmd_table_manager;


void InitCmdTable(CmdTable* cmd_table) {
// Admin
////Slaveof
Expand Down Expand Up @@ -108,6 +109,8 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameCache, std::move(cacheptr)));
std::unique_ptr<Cmd> clearcacheptr = std::make_unique<ClearCacheCmd>(kCmdNameClearCache, 1, kCmdFlagsAdmin | kCmdFlagsWrite);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameClearCache, std::move(clearcacheptr)));
std::unique_ptr<Cmd> lastsaveptr = std::make_unique<LastsaveCmd>(kCmdNameLastSave, 1, kCmdFlagsAdmin | kCmdFlagsRead);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameLastSave, std::move(lastsaveptr)));

#ifdef WITH_COMMAND_DOCS
std::unique_ptr<Cmd> commandptr = std::make_unique<CommandCmd>(kCmdNameCommand, -1, kCmdFlagsRead | kCmdFlagsAdmin);
Expand Down
53 changes: 36 additions & 17 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ PikaServer::PikaServer()
pika_client_processor_ = std::make_unique<PikaClientProcessor>(g_pika_conf->thread_pool_size(), 100000);
instant_ = std::make_unique<Instant>();
exit_mutex_.lock();
int64_t lastsave = GetLastSaveTime(g_pika_conf->bgsave_path());
UpdateLastSave(lastsave);
}

PikaServer::~PikaServer() {
Expand Down Expand Up @@ -548,7 +550,7 @@ void PikaServer::SlotSetSmallCompactionDurationThreshold(uint32_t small_compacti
}

bool PikaServer::GetDBSlotBinlogOffset(const std::string& db_name, uint32_t slot_id,
BinlogOffset* const boffset) {
BinlogOffset* const boffset) {
std::shared_ptr<SyncMasterSlot> slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
if (!slot) {
Expand Down Expand Up @@ -1233,7 +1235,7 @@ void PikaServer::ResetLastSecQuerynum() {
}

void PikaServer::UpdateQueryNumAndExecCountDB(const std::string& db_name, const std::string& command,
bool is_write) {
bool is_write) {
std::string cmd(command);
statistic_.server_stat.qps.querynum++;
statistic_.server_stat.exec_count_db[pstd::StringToUpper(cmd)]++;
Expand Down Expand Up @@ -1537,13 +1539,13 @@ void PikaServer::AutoUpdateNetworkMetric() {
}

void PikaServer::PrintThreadPoolQueueStatus() {
// Print the current queue size if it exceeds QUEUE_SIZE_THRESHOLD_PERCENTAGE/100 of the maximum queue size.
size_t cur_size = ClientProcessorThreadPoolCurQueueSize();
size_t max_size = ClientProcessorThreadPoolMaxQueueSize();
size_t thread_hold = (max_size / 100) * QUEUE_SIZE_THRESHOLD_PERCENTAGE;
if (cur_size > thread_hold) {
LOG(INFO) << "The current queue size of the Pika Server's client thread processor thread pool: " << cur_size;
}
// Print the current queue size if it exceeds QUEUE_SIZE_THRESHOLD_PERCENTAGE/100 of the maximum queue size.
size_t cur_size = ClientProcessorThreadPoolCurQueueSize();
size_t max_size = ClientProcessorThreadPoolMaxQueueSize();
size_t thread_hold = (max_size / 100) * QUEUE_SIZE_THRESHOLD_PERCENTAGE;
if (cur_size > thread_hold) {
LOG(INFO) << "The current queue size of the Pika Server's client thread processor thread pool: " << cur_size;
}
}

void PikaServer::InitStorageOptions() {
Expand Down Expand Up @@ -1600,14 +1602,14 @@ void PikaServer::InitStorageOptions() {
}

storage_options_.options.rate_limiter =
std::shared_ptr<rocksdb::RateLimiter>(
rocksdb::NewGenericRateLimiter(
g_pika_conf->rate_limiter_bandwidth(),
g_pika_conf->rate_limiter_refill_period_us(),
static_cast<int32_t>(g_pika_conf->rate_limiter_fairness()),
rocksdb::RateLimiter::Mode::kWritesOnly,
g_pika_conf->rate_limiter_auto_tuned()
));
std::shared_ptr<rocksdb::RateLimiter>(
rocksdb::NewGenericRateLimiter(
g_pika_conf->rate_limiter_bandwidth(),
g_pika_conf->rate_limiter_refill_period_us(),
static_cast<int32_t>(g_pika_conf->rate_limiter_fairness()),
rocksdb::RateLimiter::Mode::kWritesOnly,
g_pika_conf->rate_limiter_auto_tuned()
));

// For Storage small compaction
storage_options_.statistics_max_size = g_pika_conf->max_cache_statistic_keys();
Expand Down Expand Up @@ -1778,6 +1780,23 @@ void PikaServer::Bgslotscleanup(std::vector<int> cleanupSlots, const std::shared
bgslots_cleanup_thread_.StartThread();
bgslots_cleanup_thread_.Schedule(&DoBgslotscleanup, static_cast<void*>(this));
}
int64_t PikaServer::GetLastSaveTime(const std::string& dir_path) {
std::vector<std::string> dump_dir;
// Dump file is not exist
if (!pstd::FileExists(dir_path)) {
LOG(INFO) << "Dump file is not exist,path: " << dir_path;
return 0;
}
if (pstd::GetChildren(dir_path, dump_dir) != 0) {
return 0;
}
std::string dump_file = dir_path + dump_dir[0];
struct stat fileStat;
if (stat(dump_file.c_str(), &fileStat) == 0) {
return static_cast<int64_t>(fileStat.st_mtime);
}
return 0;
}

void DoBgslotscleanup(void* arg) {
auto p = static_cast<PikaServer*>(arg);
Expand Down
3 changes: 2 additions & 1 deletion src/pika_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "include/pika_slot.h"
#include "include/pika_command.h"

#include "pstd/include/mutex_impl.h"
#include "pstd/include/pstd_hash.h"
Expand Down Expand Up @@ -389,7 +390,6 @@ bool Slot::RunBgsaveEngine() {
return false;
}
LOG(INFO) << slot_name_ << " create new backup finished.";

return true;
}

Expand Down Expand Up @@ -458,6 +458,7 @@ void Slot::ClearBgsave() {
void Slot::FinishBgsave() {
std::lock_guard l(bgsave_protector_);
bgsave_info_.bgsaving = false;
g_pika_server->UpdateLastSave(time(nullptr));
}

bool Slot::FlushDB() {
Expand Down
23 changes: 18 additions & 5 deletions tests/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,24 @@ var _ = Describe("Server", func() {
// Expect(info.Val()).To(ContainSubstring(`memory`))
//})
//
//It("should LastSave", func() {
// lastSave := client.LastSave(ctx)
// Expect(lastSave.Err()).NotTo(HaveOccurred())
// Expect(lastSave.Val()).NotTo(Equal(0))
//})
It("should LastSave", func() {
lastSave := client.LastSave(ctx)
Expect(lastSave.Err()).NotTo(HaveOccurred())
// Expect(lastSave.Val()).To(Equal(int64(0)))

bgSaveTime1 := time.Now().Unix()
bgSave, err := client.BgSave(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(bgSave).To(ContainSubstring("Background saving started"))
time.Sleep(1 * time.Second)
bgSaveTime2 := time.Now().Unix()

lastSave = client.LastSave(ctx)
Expect(lastSave.Err()).NotTo(HaveOccurred())
Expect(lastSave.Val()).To(BeNumerically(">=", bgSaveTime1))
Expect(lastSave.Val()).To(BeNumerically("<=", bgSaveTime2))

})

//It("should Save", func() {
//
Expand Down
Empty file modified tests/integration/start_master_and_slave.sh
100644 → 100755
Empty file.

0 comments on commit acea406

Please sign in to comment.