-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reimplement the ‘monitor' command by delete its thread #1551
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
# include <sys/statfs.h> | ||
#endif | ||
#include <memory> | ||
#include <set> | ||
|
||
#include "net/include/bg_thread.h" | ||
#include "net/include/net_pubsub.h" | ||
|
@@ -30,7 +31,6 @@ | |
#include "include/pika_conf.h" | ||
#include "include/pika_define.h" | ||
#include "include/pika_dispatch_thread.h" | ||
#include "include/pika_monitor_thread.h" | ||
#include "include/pika_repl_client.h" | ||
#include "include/pika_repl_server.h" | ||
#include "include/pika_rsync_service.h" | ||
|
@@ -267,7 +267,7 @@ class PikaServer { | |
/* | ||
* Monitor used | ||
*/ | ||
bool HasMonitorClients(); | ||
bool HasMonitorClients() const; | ||
void AddMonitorMessage(const std::string& monitor_message); | ||
void AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr); | ||
|
||
|
@@ -411,7 +411,8 @@ class PikaServer { | |
/* | ||
* Monitor used | ||
*/ | ||
std::unique_ptr<PikaMonitorThread> pika_monitor_thread_; | ||
mutable pstd::Mutex monitor_mutex_protector_; | ||
std::set<std::weak_ptr<PikaClientConn>, std::owner_less<std::weak_ptr<PikaClientConn>>> pika_monitor_clients_; | ||
|
||
/* | ||
* Rsync used | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch removes the It's difficult to provide more detailed feedback without seeing the entirety of the codebase and how this change fits into it. However, some suggestions for further improvement could include verifying that the changes have been thoroughly tested, and possibly implementing move semantics for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch adds a Overall, the changes seem reasonable and should not introduce any bugs. However, it would be useful to see the A minor improvement suggestion would be to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch includes adding a set of monitor clients to PikaServer class, replacing the old implementation using PikaMonitorThread. The new implementation uses a mutex to protect the access to the set, and the set contains weak pointers to PikaClientConn objects instead of shared pointers. No bug risk is apparent from this patch. A suggestion for improvement is to use a shared_mutex instead of a Mutex for monitor_mutex_protector_, as multiple read accesses can be made safely and concurrently. |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1988,12 +1988,9 @@ void MonitorCmd::Do(std::shared_ptr<Partition> partition) { | |
LOG(WARNING) << name_ << " weak ptr is empty"; | ||
return; | ||
} | ||
std::shared_ptr<net::NetConn> conn = | ||
std::dynamic_pointer_cast<PikaClientConn>(conn_repl)->server_thread()->MoveConnOut(conn_repl->fd()); | ||
assert(conn.get() == conn_repl.get()); | ||
g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn)); | ||
g_pika_server->AddMonitorMessage("OK"); | ||
return; // Monitor thread will return "OK" | ||
|
||
g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn_repl)); | ||
res_.SetRes(CmdRes::kOk); | ||
} | ||
|
||
void DbsizeCmd::DoInitial() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that a block of code has been removed in the Also, the current implementation assumes that the cast Regarding the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that the patch removed some code related to moving a network connection out of a server thread, replacing it with directly adding a client to the monitor and setting a response to "OK". As long as it is intended to remove that specific functionality, there shouldn't be any bug risks. However, without knowing the context of the application, it's hard to suggest any improvements. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch you provided removes some code that moves a connection object from one thread to another and replaces it with a simpler approach of adding the connection directly to the monitor client list. This seems like a reasonable simplification if it meets the requirements of the application. One potential improvement could be to add error handling for the case where For the |
||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,7 +79,6 @@ PikaServer::PikaServer() | |
LOG(INFO) << "Worker queue limit is " << worker_queue_limit; | ||
pika_dispatch_thread_ = | ||
std::make_unique<PikaDispatchThread>(ips, port_, worker_num_, 3000, worker_queue_limit, g_pika_conf->max_conn_rbuf_size()); | ||
pika_monitor_thread_ = std::make_unique<PikaMonitorThread>(); | ||
pika_rsync_service_ = std::make_unique<PikaRsyncService>(g_pika_conf->db_sync_path(), g_pika_conf->port() + kPortShiftRSync); | ||
pika_pubsub_thread_ = std::make_unique<net::PubSubThread>(); | ||
pika_auxiliary_thread_ = std::make_unique<PikaAuxiliaryThread>(); | ||
|
@@ -1057,13 +1056,10 @@ void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) { | |
key_scan_thread_.Schedule(func, arg); | ||
} | ||
|
||
void PikaServer::ClientKillAll() { | ||
pika_dispatch_thread_->ClientKillAll(); | ||
pika_monitor_thread_->ThreadClientKill(); | ||
} | ||
void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); } | ||
|
||
int PikaServer::ClientKill(const std::string& ip_port) { | ||
if (pika_dispatch_thread_->ClientKill(ip_port) || pika_monitor_thread_->ThreadClientKill(ip_port)) { | ||
if (pika_dispatch_thread_->ClientKill(ip_port)) { | ||
return 1; | ||
} | ||
return 0; | ||
|
@@ -1072,18 +1068,44 @@ int PikaServer::ClientKill(const std::string& ip_port) { | |
int64_t PikaServer::ClientList(std::vector<ClientInfo>* clients) { | ||
int64_t clients_num = 0; | ||
clients_num += pika_dispatch_thread_->ThreadClientList(clients); | ||
clients_num += pika_monitor_thread_->ThreadClientList(clients); | ||
return clients_num; | ||
} | ||
|
||
bool PikaServer::HasMonitorClients() { return pika_monitor_thread_->HasMonitorClients(); } | ||
bool PikaServer::HasMonitorClients() const { | ||
std::unique_lock lock(monitor_mutex_protector_); | ||
return !pika_monitor_clients_.empty(); | ||
} | ||
|
||
void PikaServer::AddMonitorMessage(const std::string& monitor_message) { | ||
pika_monitor_thread_->AddMonitorMessage(monitor_message); | ||
const std::string msg = "+" + monitor_message + "\r\n"; | ||
|
||
std::vector<std::shared_ptr<PikaClientConn>> clients; | ||
|
||
std::unique_lock lock(monitor_mutex_protector_); | ||
clients.reserve(pika_monitor_clients_.size()); | ||
for (auto it = pika_monitor_clients_.begin(); it != pika_monitor_clients_.end();) { | ||
auto cli = (*it).lock(); | ||
if (cli) { | ||
clients.push_back(std::move(cli)); | ||
++it; | ||
} else { | ||
it = pika_monitor_clients_.erase(it); | ||
} | ||
} | ||
|
||
lock.unlock(); // SendReply without lock | ||
|
||
for (const auto& cli : clients) { | ||
cli->WriteResp(msg); | ||
cli->SendReply(); | ||
} | ||
} | ||
|
||
void PikaServer::AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr) { | ||
pika_monitor_thread_->AddMonitorClient(client_ptr); | ||
if (client_ptr) { | ||
std::unique_lock lock(monitor_mutex_protector_); | ||
pika_monitor_clients_.insert(client_ptr); | ||
} | ||
} | ||
|
||
void PikaServer::SlowlogTrim() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch removes the Improvement suggestions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code patch removes the This change could potentially reduce complexity and improve thread safety. However, it is impossible to assess bug risk without knowledge of the broader system context and testing. One improvement suggestion would be to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code review suggests the following:
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is better to use RWLock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make no difference.