diff --git a/include/pika_monitor_thread.h b/include/pika_monitor_thread.h deleted file mode 100644 index 7ba26bde70..0000000000 --- a/include/pika_monitor_thread.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#ifndef PIKA_MONITOR_THREAD_H_ -#define PIKA_MONITOR_THREAD_H_ - -#include -#include -#include -#include - -#include "net/include/net_thread.h" -#include "pstd/include/pstd_mutex.h" - -#include "include/pika_client_conn.h" -#include "include/pika_define.h" - -class PikaMonitorThread : public net::Thread { - public: - PikaMonitorThread(); - virtual ~PikaMonitorThread(); - - void AddMonitorClient(std::shared_ptr client_ptr); - void AddMonitorMessage(const std::string& monitor_message); - int32_t ThreadClientList(std::vector* client = nullptr); - bool ThreadClientKill(const std::string& ip_port = "all"); - bool HasMonitorClients(); - - private: - void AddCronTask(MonitorCronTask task); - bool FindClient(const std::string& ip_port); - net::WriteStatus SendMessage(int32_t fd, std::string& message); - void RemoveMonitorClient(const std::string& ip_port); - - std::atomic has_monitor_clients_; - pstd::Mutex monitor_mutex_protector_; - pstd::CondVar monitor_cond_; - - std::list monitor_clients_; - std::deque monitor_messages_; - std::queue cron_tasks_; - - virtual void* ThreadMain(); - void RemoveMonitorClient(int32_t client_fd); -}; -#endif diff --git a/include/pika_server.h b/include/pika_server.h index 2fa3c69f85..d9c965b50f 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -14,6 +14,7 @@ # include #endif #include +#include #include "net/include/bg_thread.h" #include "net/include/net_pubsub.h" @@ -30,7 +31,6 @@ #include "include/pika_conf.h" #include "include/pika_define.h" #include "include/pika_dispatch_thread.h" -#include "include/pika_monitor_thread.h" #include "include/pika_repl_client.h" #include "include/pika_repl_server.h" #include "include/pika_rsync_service.h" @@ -267,7 +267,7 @@ class PikaServer { /* * Monitor used */ - bool HasMonitorClients(); + bool HasMonitorClients() const; void AddMonitorMessage(const std::string& monitor_message); void AddMonitorClient(std::shared_ptr client_ptr); @@ -411,7 +411,8 @@ class PikaServer { /* * Monitor used */ - std::unique_ptr pika_monitor_thread_; + mutable pstd::Mutex monitor_mutex_protector_; + std::set, std::owner_less>> pika_monitor_clients_; /* * Rsync used diff --git a/src/pika_admin.cc b/src/pika_admin.cc index aa64c2dc9b..e56af17ee4 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1988,12 +1988,9 @@ void MonitorCmd::Do(std::shared_ptr partition) { LOG(WARNING) << name_ << " weak ptr is empty"; return; } - std::shared_ptr conn = - std::dynamic_pointer_cast(conn_repl)->server_thread()->MoveConnOut(conn_repl->fd()); - assert(conn.get() == conn_repl.get()); - g_pika_server->AddMonitorClient(std::dynamic_pointer_cast(conn)); - g_pika_server->AddMonitorMessage("OK"); - return; // Monitor thread will return "OK" + + g_pika_server->AddMonitorClient(std::dynamic_pointer_cast(conn_repl)); + res_.SetRes(CmdRes::kOk); } void DbsizeCmd::DoInitial() { diff --git a/src/pika_monitor_thread.cc b/src/pika_monitor_thread.cc deleted file mode 100644 index 0a0ba2ec1e..0000000000 --- a/src/pika_monitor_thread.cc +++ /dev/null @@ -1,189 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "include/pika_monitor_thread.h" - -#include - -PikaMonitorThread::PikaMonitorThread() : net::Thread() { - set_thread_name("MonitorThread"); - has_monitor_clients_.store(false); -} - -PikaMonitorThread::~PikaMonitorThread() { - set_should_stop(); - if (is_running()) { - monitor_cond_.notify_all(); - StopThread(); - } - for (std::list::iterator iter = monitor_clients_.begin(); iter != monitor_clients_.end(); ++iter) { - close(iter->fd); - } - LOG(INFO) << "PikaMonitorThread " << pthread_self() << " exit!!!"; -} - -void PikaMonitorThread::AddMonitorClient(std::shared_ptr client_ptr) { - StartThread(); - std::lock_guard lm(monitor_mutex_protector_); - monitor_clients_.push_back(ClientInfo{client_ptr->fd(), client_ptr->ip_port(), 0, client_ptr}); - has_monitor_clients_.store(true); -} - -void PikaMonitorThread::RemoveMonitorClient(const std::string& ip_port) { - std::list::iterator iter = monitor_clients_.begin(); - for (; iter != monitor_clients_.end(); ++iter) { - if (ip_port == "all") { - close(iter->fd); - continue; - } - if (iter->ip_port == ip_port) { - close(iter->fd); - break; - } - } - if (ip_port == "all") { - monitor_clients_.clear(); - } else if (iter != monitor_clients_.end()) { - monitor_clients_.erase(iter); - } - has_monitor_clients_.store(!monitor_clients_.empty()); -} - -void PikaMonitorThread::AddMonitorMessage(const std::string& monitor_message) { - std::lock_guard lm(monitor_mutex_protector_); - if (monitor_messages_.empty() && cron_tasks_.empty()) { - monitor_messages_.push_back(monitor_message); - monitor_cond_.notify_one(); - } else { - monitor_messages_.push_back(monitor_message); - } -} - -int32_t PikaMonitorThread::ThreadClientList(std::vector* clients_ptr) { - if (clients_ptr != nullptr) { - for (std::list::iterator iter = monitor_clients_.begin(); iter != monitor_clients_.end(); iter++) { - clients_ptr->push_back(*iter); - } - } - return monitor_clients_.size(); -} - -void PikaMonitorThread::AddCronTask(MonitorCronTask task) { - std::lock_guard lm(monitor_mutex_protector_); - if (monitor_messages_.empty() && cron_tasks_.empty()) { - cron_tasks_.push(task); - monitor_cond_.notify_one(); - } else { - cron_tasks_.push(task); - } -} - -bool PikaMonitorThread::FindClient(const std::string& ip_port) { - std::lock_guard lm(monitor_mutex_protector_); - for (std::list::iterator iter = monitor_clients_.begin(); iter != monitor_clients_.end(); ++iter) { - if (iter->ip_port == ip_port) { - return true; - } - } - return false; -} - -bool PikaMonitorThread::ThreadClientKill(const std::string& ip_port) { - if (is_running()) { - if (ip_port == "all") { - AddCronTask({TASK_KILLALL, "all"}); - } else if (FindClient(ip_port)) { - AddCronTask({TASK_KILL, ip_port}); - } else { - return false; - } - } - return true; -} - -bool PikaMonitorThread::HasMonitorClients() { return has_monitor_clients_.load(); } - -net::WriteStatus PikaMonitorThread::SendMessage(int32_t fd, std::string& message) { - size_t retry = 0; - ssize_t nwritten = 0, message_len_sended = 0, message_len_left = message.size(); - while (message_len_left > 0) { - nwritten = write(fd, message.data() + message_len_sended, message_len_left); - if (nwritten == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - // If the write buffer is full, but the client no longer consumes, it will - // get stuck in the loop and cause the entire Pika to block becase of monitor_mutex_protector_. - // So we put a limit on the number of retries - if (++retry >= 10) { - return net::kWriteError; - } else { - // Sleep one second wait for client consume message - sleep(1); - continue; - } - } else if (nwritten == -1) { - return net::kWriteError; - } - if (retry > 0) retry = 0; - message_len_sended += nwritten; - message_len_left -= nwritten; - } - return net::kWriteAll; -} - -void* PikaMonitorThread::ThreadMain() { - std::deque messages_deque; - std::string messages_transfer; - MonitorCronTask task; - net::WriteStatus write_status; - while (!should_stop()) { - { - std::unique_lock lm(monitor_mutex_protector_); - monitor_cond_.wait(lm, [this]() { return !monitor_messages_.empty() || !cron_tasks_.empty() || should_stop(); }); - } - if (should_stop()) { - break; - } - { - std::lock_guard lm(monitor_mutex_protector_); - while (!cron_tasks_.empty()) { - task = cron_tasks_.front(); - cron_tasks_.pop(); - RemoveMonitorClient(task.ip_port); - if (task.task == TASK_KILLALL) { - std::queue empty_queue; - cron_tasks_.swap(empty_queue); - } - } - } - - messages_deque.clear(); - { - std::lock_guard lm(monitor_mutex_protector_); - messages_deque.swap(monitor_messages_); - if (monitor_clients_.empty() || messages_deque.empty()) { - continue; - } - } - messages_transfer = "+"; - for (const auto& msg : messages_deque) { - messages_transfer.append(msg.data(), msg.size()); - messages_transfer.append(" ", 1); - } - if (messages_transfer == "+") { - continue; - } - - messages_transfer.pop_back(); // no space follow last param - messages_transfer.append("\r\n", 2); - - std::lock_guard lm(monitor_mutex_protector_); - for (auto iter = monitor_clients_.begin(); iter != monitor_clients_.end(); ++iter) { - write_status = SendMessage(iter->fd, messages_transfer); - if (write_status == net::kWriteError) { - cron_tasks_.push({TASK_KILL, iter->ip_port}); - } - } - } - return nullptr; -} diff --git a/src/pika_server.cc b/src/pika_server.cc index 6e3d590940..92ea28c808 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -79,7 +79,6 @@ PikaServer::PikaServer() LOG(INFO) << "Worker queue limit is " << worker_queue_limit; pika_dispatch_thread_ = std::make_unique(ips, port_, worker_num_, 3000, worker_queue_limit, g_pika_conf->max_conn_rbuf_size()); - pika_monitor_thread_ = std::make_unique(); pika_rsync_service_ = std::make_unique(g_pika_conf->db_sync_path(), g_pika_conf->port() + kPortShiftRSync); pika_pubsub_thread_ = std::make_unique(); pika_auxiliary_thread_ = std::make_unique(); @@ -1057,13 +1056,10 @@ void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) { key_scan_thread_.Schedule(func, arg); } -void PikaServer::ClientKillAll() { - pika_dispatch_thread_->ClientKillAll(); - pika_monitor_thread_->ThreadClientKill(); -} +void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); } int PikaServer::ClientKill(const std::string& ip_port) { - if (pika_dispatch_thread_->ClientKill(ip_port) || pika_monitor_thread_->ThreadClientKill(ip_port)) { + if (pika_dispatch_thread_->ClientKill(ip_port)) { return 1; } return 0; @@ -1072,18 +1068,44 @@ int PikaServer::ClientKill(const std::string& ip_port) { int64_t PikaServer::ClientList(std::vector* clients) { int64_t clients_num = 0; clients_num += pika_dispatch_thread_->ThreadClientList(clients); - clients_num += pika_monitor_thread_->ThreadClientList(clients); return clients_num; } -bool PikaServer::HasMonitorClients() { return pika_monitor_thread_->HasMonitorClients(); } +bool PikaServer::HasMonitorClients() const { + std::unique_lock lock(monitor_mutex_protector_); + return !pika_monitor_clients_.empty(); +} void PikaServer::AddMonitorMessage(const std::string& monitor_message) { - pika_monitor_thread_->AddMonitorMessage(monitor_message); + const std::string msg = "+" + monitor_message + "\r\n"; + + std::vector> clients; + + std::unique_lock lock(monitor_mutex_protector_); + clients.reserve(pika_monitor_clients_.size()); + for (auto it = pika_monitor_clients_.begin(); it != pika_monitor_clients_.end();) { + auto cli = (*it).lock(); + if (cli) { + clients.push_back(std::move(cli)); + ++it; + } else { + it = pika_monitor_clients_.erase(it); + } + } + + lock.unlock(); // SendReply without lock + + for (const auto& cli : clients) { + cli->WriteResp(msg); + cli->SendReply(); + } } void PikaServer::AddMonitorClient(std::shared_ptr client_ptr) { - pika_monitor_thread_->AddMonitorClient(client_ptr); + if (client_ptr) { + std::unique_lock lock(monitor_mutex_protector_); + pika_monitor_clients_.insert(client_ptr); + } } void PikaServer::SlowlogTrim() {