Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:thread purge #2697

Merged
merged 8 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ thread-num : 1
# are dedicated to handling user requests.
thread-pool-size : 12

# This parameter is used to control whether to separate fast and slow commands.
# When slow-cmd-pool is set to yes, fast and slow commands are separated.
# When set to no, they are not separated.
slow-cmd-pool : no

# Size of the low level thread pool, The threads within this pool
# are dedicated to handling slow user requests.
slow-cmd-thread-pool-size : 1
Expand Down
2 changes: 0 additions & 2 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ class PikaClientProcessor {
int Start();
void Stop();
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();
size_t ThreadPoolMaxQueueSize();

private:
std::unique_ptr<net::ThreadPool> pool_;
std::vector<std::unique_ptr<net::BGThread>> bg_threads_;
};
#endif // PIKA_CLIENT_PROCESSOR_H_
10 changes: 10 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return slotmigrate_;
}
bool slow_cmd_pool() {
std::shared_lock l(rwlock_);
return slow_cmd_pool_;
}
std::string server_id() {
std::shared_lock l(rwlock_);
return server_id_;
Expand Down Expand Up @@ -574,6 +578,11 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlowCmdPool(const bool value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slow-cmd-pool", value ? "yes" : "no");
slow_cmd_pool_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
Expand Down Expand Up @@ -844,6 +853,7 @@ class PikaConf : public pstd::BaseConf {
std::string bgsave_path_;
std::string bgsave_prefix_;
std::string pidfile_;
std::atomic<bool> slow_cmd_pool_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原子变量不用加锁保护,不要使用原子变量

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原子变量不用加锁保护,不要使用原子变量

done


std::string compression_;
std::string compression_per_level_;
Expand Down
3 changes: 3 additions & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class PikaReplBgWorker {
void QueueClear();
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
Expand Down
9 changes: 7 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class PikaServer : public pstd::noncopyable {
bool force_full_sync();
void SetForceFullSync(bool v);
void SetDispatchQueueLimit(int queue_limit);
void SetSlowCmdThreadPoolFlag(bool flag);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Ensure thread safety when toggling slow_cmd_thread_pool_flag_. The current implementation lacks mutex locks or atomic operations, which could lead to race conditions.

  • src/pika_server.cc: PikaServer::SetSlowCmdThreadPoolFlag
Analysis chain

Ensure thread safety when toggling slow_cmd_thread_pool_flag_.


Ensure thread safety around slow_cmd_thread_pool_flag_ usage by checking for mutex locks or atomic operations.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for potential race conditions related to `slow_cmd_thread_pool_flag_`.

# Test: Search for occurrences of `slow_cmd_thread_pool_flag_` being accessed.
rg --type cpp --context 5 'slow_cmd_thread_pool_flag_' src/

Length of output: 1088



Script:

#!/bin/bash
# Description: Check for thread safety mechanisms around `slow_cmd_thread_pool_flag_` usage.

# Test: Search for mutex locks or atomic operations related to `slow_cmd_thread_pool_flag_`.
rg --type cpp --context 5 'slow_cmd_thread_pool_flag_' src/ | grep -E 'std::mutex|std::lock_guard|std::atomic'

Length of output: 110

storage::StorageOptions storage_options();
std::unique_ptr<PikaDispatchThread>& pika_dispatch_thread() {
return pika_dispatch_thread_;
Expand Down Expand Up @@ -168,7 +169,6 @@ class PikaServer : public pstd::noncopyable {
void FinishMetaSync();
bool MetaSyncDone();
void ResetMetaSyncStatus();
void SetLoopDBStateMachine(bool need_loop);
int GetMetaSyncTimestamp();
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
Expand All @@ -179,7 +179,7 @@ class PikaServer : public pstd::noncopyable {
* PikaClientProcessor Process Task
*/
void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd);
void ScheduleClientBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str);

// for info debug
size_t ClientProcessorThreadPoolCurQueueSize();
size_t ClientProcessorThreadPoolMaxQueueSize();
Expand Down Expand Up @@ -642,6 +642,11 @@ class PikaServer : public pstd::noncopyable {
* acl
*/
std::unique_ptr<::Acl> acl_ = nullptr;

/*
* fast and slow thread pools
*/
bool slow_cmd_thread_pool_flag_;
};

#endif
1 change: 1 addition & 0 deletions src/net/include/backend_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class BackendThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Properly overriding the set_thread_name method in BackendThread allows for dynamic and specific thread naming, which is crucial for debugging. Consider encapsulating thread naming logic within the Thread class to avoid redundancy across derived classes.

- void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
+ // This method is now handled in the base class to avoid redundancy.

Committable suggestion was skipped due to low confidence.

pstd::Status Write(int fd, const std::string& msg);
pstd::Status Close(int fd);
// Try to connect fd noblock, if return EINPROGRESS or EAGAIN or EWOULDBLOCK
Expand Down
1 change: 1 addition & 0 deletions src/net/include/client_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class ClientThread : public Thread {
*/
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensuring that ClientThread can have its thread name set dynamically aids in monitoring and debugging. As with BackendThread, consider centralizing thread naming logic to reduce redundancy.

- void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
+ // This method is now handled in the base class to avoid redundancy.
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
// This method is now handled in the base class to avoid redundancy.

pstd::Status Write(const std::string& ip, int port, const std::string& msg);
pstd::Status Close(const std::string& ip, int port);

Expand Down
2 changes: 1 addition & 1 deletion src/net/include/net_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Thread : public pstd::noncopyable {

std::string thread_name() const { return thread_name_; }

void set_thread_name(const std::string& name) { thread_name_ = name; }
virtual void set_thread_name(const std::string& name) { thread_name_ = name; }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding synchronization to set_thread_name.

To ensure thread safety when multiple threads might be setting the thread name simultaneously, consider using a mutex:

+ std::lock_guard<std::mutex> lock(name_mutex_);
  thread_name_ = name;

Add a private member std::mutex name_mutex_; to the Thread class to support this.

Committable suggestion was skipped due to low confidence.


protected:
std::atomic_bool should_stop_;
Expand Down
2 changes: 2 additions & 0 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ class ServerThread : public Thread {
// Move into server thread
virtual void MoveConnIn(std::shared_ptr<NetConn> conn, const NotifyType& type) = 0;

void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

virtual void KillAllConns() = 0;
virtual bool KillConn(const std::string& ip_port) = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/backend_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ int BackendThread::StartThread() {
if (res) {
return res;
}
set_thread_name("BackendThread");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the server thread, setting the thread name to "BackendThread" is hardcoded. Consider using a dynamic naming strategy here as well to improve monitoring and debugging capabilities.

- set_thread_name("BackendThread");
+ set_thread_name("BackendThread-" + std::to_string(port_));
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
set_thread_name("BackendThread");
set_thread_name("BackendThread-" + std::to_string(port_));


return Thread::StartThread();
}

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/client_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ int ClientThread::StartThread() {
if (res) {
return res;
}
set_thread_name("ClientThread");

return Thread::StartThread();
}

Expand Down
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
// Adding timer tasks and run timertaskThread
timerTaskThread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });

timerTaskThread_.set_thread_name("TimerTaskThread");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the thread name before starting the thread is a good practice.

Consider allowing dynamic thread naming for greater flexibility, especially useful in environments with multiple timer tasks.

timerTaskThread_.StartThread();
return ServerThread::StartThread();
}
Expand Down
1 change: 1 addition & 0 deletions src/net/src/holy_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ int HolyThread::StartThread() {
if (ret) {
return ret;
}
set_thread_name("HolyThread");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HolyThread是其他类的父类,你在这里设置。会不会其他类设置的名字被这个替代了。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HolyThread是其他类的父类,你在这里设置。会不会其他类设置的名字被这个替代了。

已修复

return ServerThread::StartThread();
}

Expand Down
2 changes: 2 additions & 0 deletions src/net/src/holy_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class HolyThread : public ServerThread {

int StopThread() override;

void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

void set_keepalive_timeout(int timeout) override { keepalive_timeout_ = timeout; }

int conn_num() const override;
Expand Down
2 changes: 1 addition & 1 deletion src/net/src/net_thread_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ inline bool SetThreadName(pthread_t id, const std::string& name) {
#else
inline bool SetThreadName(pthread_t id, const std::string& name) {
// printf ("no pthread_setname\n");
return false;
return pthread_setname_np(name.c_str()) == 0;
}
#endif
} // namespace net
Expand Down
1 change: 1 addition & 0 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ int TimerTaskThread::StartThread() {
// if there is no timer task registered, no need of start the thread
return -1;
}
set_thread_name("TimerTask");
LOG(INFO) << "TimerTaskThread Starting...";
return Thread::StartThread();
}
Expand Down
1 change: 1 addition & 0 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class TimerTaskThread : public Thread {
~TimerTaskThread() override;
int StartThread() override;
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
Expand Down
4 changes: 3 additions & 1 deletion src/net/src/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <sys/time.h>

#include <string>
#include <utility>

namespace net {
Expand All @@ -24,7 +25,8 @@ int ThreadPool::Worker::start() {
return -1;
} else {
start_.store(true);
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "Worker");
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str);
Comment on lines +28 to +29
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider improving portability of thread ID conversion.

The current method of casting pthread_t to unsigned long may not be portable across different platforms where pthread_t is not an integer type. Consider using a portable method to obtain a string representation of pthread_t:

- std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
+ std::string thread_id_str = std::to_string(std::hash<std::thread::id>()(std::thread::id(thread_id_)));

This uses the standard library's hash function to convert the thread ID to a string in a more portable manner.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::string thread_id_str = std::to_string(reinterpret_cast<unsigned long>(thread_id_));
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str);
std::string thread_id_str = std::to_string(std::hash<std::thread::id>()(std::thread::id(thread_id_)));
SetThreadName(thread_id_, thread_pool_->thread_pool_name() + "_Worker_" + thread_id_str);

}
}
return 0;
Expand Down
20 changes: 20 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slow-cmd-pool", 1)) {
elements += 2;
EncodeString(&config_body, "slow-cmd-pool");
EncodeString(&config_body, g_pika_conf->slow_cmd_pool() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "slotmigrate-thread-num");
Expand Down Expand Up @@ -2129,6 +2135,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"requirepass",
"masterauth",
"slotmigrate",
"slow-cmd-pool",
"slotmigrate-thread-num",
"thread-migrate-keys-num",
"userpass",
Expand Down Expand Up @@ -2288,6 +2295,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetSlotMigrate(slotmigrate);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slow_cmd_pool") {
bool SlowCmdPool;
if (value == "yes") {
SlowCmdPool = true;
g_pika_server->SetSlowCmdThreadPoolFlag(SlowCmdPool);
} else if (value == "no") {
SlowCmdPool = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slow-cmd-pool'\r\n");
return;
}
g_pika_conf->SetSlowCmdPool(SlowCmdPool);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你schedule判断是否把请求塞给慢队列的时候,会读取g_pika_conf->slow_cmd_pool,setslowcmdthreadpoolflag中会等待慢队列为空然后才会stop线程池,那么这里就应该先改pika conf,保证不再有新的请求塞到队列里,然后你在等待队列都消费完之后返回。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setslowcmdthreadpoolflag

done

res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down
18 changes: 0 additions & 18 deletions src/pika_client_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@

PikaClientProcessor::PikaClientProcessor(size_t worker_num, size_t max_queue_size, const std::string& name_prefix) {
pool_ = std::make_unique<net::ThreadPool>(worker_num, max_queue_size, name_prefix + "Pool");
for (size_t i = 0; i < worker_num; ++i) {
bg_threads_.push_back(std::make_unique<net::BGThread>(max_queue_size));
bg_threads_.back()->set_thread_name(name_prefix + "BgThread");
}
}

PikaClientProcessor::~PikaClientProcessor() {
Expand All @@ -24,29 +20,15 @@ int PikaClientProcessor::Start() {
if (res != net::kSuccess) {
return res;
}
for (auto& bg_thread : bg_threads_) {
res = bg_thread->StartThread();
if (res != net::kSuccess) {
return res;
}
}
return res;
}

void PikaClientProcessor::Stop() {
pool_->stop_thread_pool();
for (auto & bg_thread : bg_threads_) {
bg_thread->StopThread();
}
}

void PikaClientProcessor::SchedulePool(net::TaskFunc func, void* arg) { pool_->Schedule(func, arg); }

void PikaClientProcessor::ScheduleBgThreads(net::TaskFunc func, void* arg, const std::string& hash_str) {
std::size_t index = std::hash<std::string>{}(hash_str) % bg_threads_.size();
bg_threads_[index]->Schedule(func, arg);
}

size_t PikaClientProcessor::ThreadPoolCurQueueSize() {
size_t cur_size = 0;
if (pool_) {
Expand Down
13 changes: 9 additions & 4 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ int PikaConf::Load() {
GetConfStr("slotmigrate", &smgrt);
slotmigrate_.store(smgrt == "yes" ? true : false);

// slow cmd thread pool
std::string slowcmdpool;
GetConfStr("slow-cmd-pool", &slowcmdpool);
slow_cmd_pool_.store(slowcmdpool == "yes" ? true : false);

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
if (binlog_writer_num <= 0 || binlog_writer_num > 24) {
Expand Down Expand Up @@ -154,11 +159,11 @@ int PikaConf::Load() {
}

GetConfInt("slow-cmd-thread-pool-size", &slow_cmd_thread_pool_size_);
if (slow_cmd_thread_pool_size_ <= 0) {
slow_cmd_thread_pool_size_ = 12;
if (slow_cmd_thread_pool_size_ < 0) {
slow_cmd_thread_pool_size_ = 8;
}
if (slow_cmd_thread_pool_size_ > 100) {
slow_cmd_thread_pool_size_ = 100;
if (slow_cmd_thread_pool_size_ > 50) {
slow_cmd_thread_pool_size_ = 50;
}

std::string slow_cmd_list;
Expand Down
10 changes: 8 additions & 2 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
write_binlog_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
auto new_binlog_worker = std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE);
std::string binlog_worker_name = "ReplBinlogWorker" + std::to_string(i);
new_binlog_worker->SetThreadName(binlog_worker_name);
write_binlog_workers_.emplace_back(std::move(new_binlog_worker));
}
for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) {
write_db_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
auto new_db_worker = std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE);
std::string db_worker_name = "ReplWriteDBWorker" + std::to_string(i);
new_db_worker->SetThreadName(db_worker_name);
write_db_workers_.emplace_back(std::move(new_db_worker));
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;

PikaReplServer::PikaReplServer(const std::set<std::string>& ips, int port, int cron_interval) {
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000);
server_tp_ = std::make_unique<net::ThreadPool>(PIKA_REPL_SERVER_TP_SIZE, 100000, "PikaReplServer");
pika_repl_server_thread_ = std::make_unique<PikaReplServerThread>(ips, port, cron_interval);
pika_repl_server_thread_->set_thread_name("PikaReplServer");
}
Expand All @@ -27,6 +27,7 @@ PikaReplServer::~PikaReplServer() {
}

int PikaReplServer::Start() {
pika_repl_server_thread_->set_thread_name("PikaReplServer");
int res = pika_repl_server_thread_->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Server Thread Error: " << res
Expand Down
Loading
Loading