Skip to content

Commit

Permalink
refactor: implement monitor without thread (#1551)
Browse files Browse the repository at this point in the history
  • Loading branch information
loveyacper authored May 26, 2023
1 parent 27ae606 commit cf0300d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 256 deletions.
48 changes: 0 additions & 48 deletions include/pika_monitor_thread.h

This file was deleted.

7 changes: 4 additions & 3 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# include <sys/statfs.h>
#endif
#include <memory>
#include <set>

#include "net/include/bg_thread.h"
#include "net/include/net_pubsub.h"
Expand All @@ -30,7 +31,6 @@
#include "include/pika_conf.h"
#include "include/pika_define.h"
#include "include/pika_dispatch_thread.h"
#include "include/pika_monitor_thread.h"
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
Expand Down Expand Up @@ -267,7 +267,7 @@ class PikaServer {
/*
* Monitor used
*/
bool HasMonitorClients();
bool HasMonitorClients() const;
void AddMonitorMessage(const std::string& monitor_message);
void AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr);

Expand Down Expand Up @@ -411,7 +411,8 @@ class PikaServer {
/*
* Monitor used
*/
std::unique_ptr<PikaMonitorThread> pika_monitor_thread_;
mutable pstd::Mutex monitor_mutex_protector_;
std::set<std::weak_ptr<PikaClientConn>, std::owner_less<std::weak_ptr<PikaClientConn>>> pika_monitor_clients_;

/*
* Rsync used
Expand Down
9 changes: 3 additions & 6 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1988,12 +1988,9 @@ void MonitorCmd::Do(std::shared_ptr<Partition> partition) {
LOG(WARNING) << name_ << " weak ptr is empty";
return;
}
std::shared_ptr<net::NetConn> conn =
std::dynamic_pointer_cast<PikaClientConn>(conn_repl)->server_thread()->MoveConnOut(conn_repl->fd());
assert(conn.get() == conn_repl.get());
g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn));
g_pika_server->AddMonitorMessage("OK");
return; // Monitor thread will return "OK"

g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn_repl));
res_.SetRes(CmdRes::kOk);
}

void DbsizeCmd::DoInitial() {
Expand Down
189 changes: 0 additions & 189 deletions src/pika_monitor_thread.cc

This file was deleted.

42 changes: 32 additions & 10 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ PikaServer::PikaServer()
LOG(INFO) << "Worker queue limit is " << worker_queue_limit;
pika_dispatch_thread_ =
std::make_unique<PikaDispatchThread>(ips, port_, worker_num_, 3000, worker_queue_limit, g_pika_conf->max_conn_rbuf_size());
pika_monitor_thread_ = std::make_unique<PikaMonitorThread>();
pika_rsync_service_ = std::make_unique<PikaRsyncService>(g_pika_conf->db_sync_path(), g_pika_conf->port() + kPortShiftRSync);
pika_pubsub_thread_ = std::make_unique<net::PubSubThread>();
pika_auxiliary_thread_ = std::make_unique<PikaAuxiliaryThread>();
Expand Down Expand Up @@ -1057,13 +1056,10 @@ void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) {
key_scan_thread_.Schedule(func, arg);
}

void PikaServer::ClientKillAll() {
pika_dispatch_thread_->ClientKillAll();
pika_monitor_thread_->ThreadClientKill();
}
void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); }

int PikaServer::ClientKill(const std::string& ip_port) {
if (pika_dispatch_thread_->ClientKill(ip_port) || pika_monitor_thread_->ThreadClientKill(ip_port)) {
if (pika_dispatch_thread_->ClientKill(ip_port)) {
return 1;
}
return 0;
Expand All @@ -1072,18 +1068,44 @@ int PikaServer::ClientKill(const std::string& ip_port) {
int64_t PikaServer::ClientList(std::vector<ClientInfo>* clients) {
int64_t clients_num = 0;
clients_num += pika_dispatch_thread_->ThreadClientList(clients);
clients_num += pika_monitor_thread_->ThreadClientList(clients);
return clients_num;
}

bool PikaServer::HasMonitorClients() { return pika_monitor_thread_->HasMonitorClients(); }
bool PikaServer::HasMonitorClients() const {
std::unique_lock lock(monitor_mutex_protector_);
return !pika_monitor_clients_.empty();
}

void PikaServer::AddMonitorMessage(const std::string& monitor_message) {
pika_monitor_thread_->AddMonitorMessage(monitor_message);
const std::string msg = "+" + monitor_message + "\r\n";

std::vector<std::shared_ptr<PikaClientConn>> clients;

std::unique_lock lock(monitor_mutex_protector_);
clients.reserve(pika_monitor_clients_.size());
for (auto it = pika_monitor_clients_.begin(); it != pika_monitor_clients_.end();) {
auto cli = (*it).lock();
if (cli) {
clients.push_back(std::move(cli));
++it;
} else {
it = pika_monitor_clients_.erase(it);
}
}

lock.unlock(); // SendReply without lock

for (const auto& cli : clients) {
cli->WriteResp(msg);
cli->SendReply();
}
}

void PikaServer::AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr) {
pika_monitor_thread_->AddMonitorClient(client_ptr);
if (client_ptr) {
std::unique_lock lock(monitor_mutex_protector_);
pika_monitor_clients_.insert(client_ptr);
}
}

void PikaServer::SlowlogTrim() {
Expand Down

0 comments on commit cf0300d

Please sign in to comment.