Skip to content

Commit

Permalink
feat:thread purge
Browse files Browse the repository at this point in the history
  • Loading branch information
brother-jin committed Jun 5, 2024
1 parent 1705916 commit a6ae2be
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 37 deletions.
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# This parameter is used to control whether to separate fast and slow commands.
# When slow-cmd-pool is set to yes, fast and slow commands are separated.
# When set to no, they are not separated.
slow-cmd-pool : no

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down
2 changes: 0 additions & 2 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ class PikaClientProcessor {
int Start();
void Stop();
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();
size_t ThreadPoolMaxQueueSize();

private:
std::unique_ptr<net::ThreadPool> pool_;
std::vector<std::unique_ptr<net::BGThread>> bg_threads_;
};
#endif // PIKA_CLIENT_PROCESSOR_H_
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slotmigrate_;
}
bool slow_cmd_pool() {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -574,6 +578,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlowCmdPool(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no");
slow_cmd_pool_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
Expand Down Expand Up @@ -844,6 +853,7 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_path_;
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;

std::string compression_;
std::string compression_per_level_;
Expand Down
9 changes: 7 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class PikaServer : public pstd::noncopyable {
bool force_full_sync();
void SetForceFullSync(bool v);
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
Expand Down Expand Up @@ -168,7 +169,6 @@ class PikaServer : public pstd::noncopyable {
void FinishMetaSync();
bool MetaSyncDone();
void ResetMetaSyncStatus();
void SetLoopDBStateMachine(bool need_loop);
int GetMetaSyncTimestamp();
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
Expand All @@ -179,7 +179,7 @@ class PikaServer : public pstd::noncopyable {
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();
Expand Down Expand Up @@ -642,6 +642,11 @@ class PikaServer : public pstd::noncopyable {
* acl
*/
std::unique_ptr<::Acl> acl_ = nullptr;

/*
* fast and slow thread pools
*/
bool slow_cmd_thread_pool_flag_;
};

#endif
2 changes: 1 addition & 1 deletion src/net/src/net_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Thread::~Thread() = default;
void* Thread::RunThread(void* arg) {
auto thread = reinterpret_cast<Thread*>(arg);
if (!(thread->thread_name().empty())) {
SetThreadName(pthread_self(), thread->thread_name());
SetThreadName(pthread_self(), "pika");
}
thread->ThreadMain();
return nullptr;
Expand Down
20 changes: 20 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) {
elements += 2;
EncodeString(&config_body, "slow-cmd-pool");
EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "slotmigrate-thread-num");
Expand Down Expand Up @@ -2129,6 +2135,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"requirepass",
"masterauth",
"slotmigrate",
"slow-cmd-pool",
"slotmigrate-thread-num",
"thread-migrate-keys-num",
"userpass",
Expand Down Expand Up @@ -2288,6 +2295,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetSlotMigrate(slotmigrate);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slow_cmd_pool") {
bool SlowCmdPool;
if (value == "yes") {
SlowCmdPool = true;
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool);
} else if (value == "no") {
SlowCmdPool = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n");
return;
}
g_pika_conf->SetSlowCmdPool(SlowCmdPool);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down
18 changes: 0 additions & 18 deletions src/pika_client_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

PikaClientProcessor::PikaClientProcessor(size_t worker_num, size_t max_queue_size, const std::string& name_prefix) {
pool_ = std::make_unique<net::ThreadPool>(worker_num, max_queue_size, name_prefix + "Pool");
for (size_t i = 0; i < worker_num; ++i) {
bg_threads_.push_back(std::make_unique<net::BGThread>(max_queue_size));
bg_threads_.back()->set_thread_name(name_prefix + "BgThread");
}
}

PikaClientProcessor::~PikaClientProcessor() {
Expand All @@ -24,29 +20,15 @@ int PikaClientProcessor::Start() {
if (res != net::kSuccess) {
return res;
}
for (auto& bg_thread : bg_threads_) {
res = bg_thread->StartThread();
if (res != net::kSuccess) {
return res;
}
}
return res;
}

void PikaClientProcessor::Stop() {
pool_->stop_thread_pool();
for (auto & bg_thread : bg_threads_) {
bg_thread->StopThread();
}
}

void PikaClientProcessor::SchedulePool(net::TaskFunc func, void* arg) { pool_->Schedule(func, arg); }

void PikaClientProcessor::ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) {
std::size_t index = std::hash<std::string>{}(hash_str) % bg_threads_.size();
bg_threads_[index]->Schedule(func, arg);
}

size_t PikaClientProcessor::ThreadPoolCurQueueSize() {
size_t cur_size = 0;
if (pool_) {
Expand Down
13 changes: 9 additions & 4 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ int PikaConf::Load() {
GetConfStr("slotmigrate", &smgrt);
slotmigrate_.store(smgrt == "yes" ? true : false);

// slow cmd thread pool
std::string slowcmdpool;
GetConfStr("slow-cmd-pool", &slowcmdpool);
slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false);

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
if (binlog_writer_num <= 0 || binlog_writer_num > 24) {
Expand Down Expand Up @@ -154,11 +159,11 @@ int PikaConf::Load() {
}

GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_);
if (slow_cmd_thread_pool_size_ <= 0) {
slow_cmd_thread_pool_size_ = 12;
if (slow_cmd_thread_pool_size_ < 0) {
slow_cmd_thread_pool_size_ = 8;
}
if (slow_cmd_thread_pool_size_ > 100) {
slow_cmd_thread_pool_size_ = 100;
if (slow_cmd_thread_pool_size_ > 50) {
slow_cmd_thread_pool_size_ = 50;
}

std::string slow_cmd_list;
Expand Down
27 changes: 17 additions & 10 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void DoPurgeDir(void* arg) {

PikaServer::PikaServer()
: exit_(false),
slow_cmd_thread_pool_flag_(false),
last_check_compact_time_({0, 0}),
last_check_resume_time_({0, 0}),
repl_state_(PIKA_REPL_NO_CONNECT),
Expand Down Expand Up @@ -100,6 +101,7 @@ PikaServer::PikaServer()
}

acl_ = std::make_unique<::Acl>();
SetSlowCmdThreadPoolFlag(g_pika_conf->slow_cmd_pool());
}

PikaServer::~PikaServer() {
Expand Down Expand Up @@ -166,12 +168,6 @@ void PikaServer::Start() {
LOG(FATAL) << "Start PikaClientProcessor Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_slow_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
ret = pika_dispatch_thread_->StartThread();
if (ret != net::kSuccess) {
dbs_.clear();
Expand Down Expand Up @@ -205,6 +201,21 @@ void PikaServer::Start() {
LOG(INFO) << "Goodbye...";
}

void PikaServer::SetSlowCmdThreadPoolFlag(bool flag) {
slow_cmd_thread_pool_flag_ = flag;
int ret = 0;
if (flag) {
ret = pika_slow_cmd_thread_pool_->start_thread_pool();
if (ret != net::kSuccess) {
dbs_.clear();
LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret
<< (ret == net::kCreateThreadError ? ": create thread error " : ": other error");
}
} else {
pika_slow_cmd_thread_pool_->stop_thread_pool();
}
}

void PikaServer::Exit() {
g_pika_server->DisableCompact();
exit_mutex_.unlock();
Expand Down Expand Up @@ -714,10 +725,6 @@ void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_
pika_client_processor_->SchedulePool(func, arg);
}

void PikaServer::ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) {
pika_client_processor_->ScheduleBgThreads(func, arg, hash_str);
}

size_t PikaServer::ClientProcessorThreadPoolCurQueueSize() {
if (!pika_client_processor_) {
return 0;
Expand Down

0 comments on commit a6ae2be

Please sign in to comment.