Skip to content

Commit

Permalink
Remove some new-delete patterns and refine LockManager hash function (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Feb 20, 2023
1 parent 62d9113 commit a08a0ff
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 47 deletions.
4 changes: 2 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ void Connection::recordProfilingSampleIfNeed(const std::string &cmd, uint64_t du
std::string iostats_context = rocksdb::get_iostats_context()->ToString(true);
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);
if (perf_context.empty()) return; // request without db operation
auto entry = new PerfEntry();
auto entry = std::unique_ptr<PerfEntry>();
entry->cmd_name = cmd;
entry->duration = duration;
entry->iostats_context = std::move(iostats_context);
entry->perf_context = std::move(perf_context);
svr_->GetPerfLog()->PushEntry(entry);
svr_->GetPerfLog()->PushEntry(std::move(entry));
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
Expand Down
32 changes: 14 additions & 18 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,43 +280,39 @@ Status Server::RemoveMaster() {
}

Status Server::AddSlave(Redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq) {
auto t = new FeedSlaveThread(this, conn, next_repl_seq);
auto t = std::make_unique<FeedSlaveThread>(this, conn, next_repl_seq);
auto s = t->Start();
if (!s.IsOK()) {
delete t;
return s;
}

std::lock_guard<std::mutex> lg(slave_threads_mu_);
slave_threads_.emplace_back(t);
slave_threads_.emplace_back(std::move(t));
return Status::OK();
}

void Server::DisconnectSlaves() {
std::lock_guard<std::mutex> lg(slave_threads_mu_);
for (const auto &slave_thread : slave_threads_) {
for (auto &slave_thread : slave_threads_) {
if (!slave_thread->IsStopped()) slave_thread->Stop();
}
while (!slave_threads_.empty()) {
auto slave_thread = slave_threads_.front();
auto slave_thread = std::move(slave_threads_.front());
slave_threads_.pop_front();
slave_thread->Join();
delete slave_thread;
}
}

void Server::cleanupExitedSlaves() {
std::list<FeedSlaveThread *> exited_slave_threads;
std::lock_guard<std::mutex> lg(slave_threads_mu_);
for (const auto &slave_thread : slave_threads_) {
if (slave_thread->IsStopped()) exited_slave_threads.emplace_back(slave_thread);
}
while (!exited_slave_threads.empty()) {
auto t = exited_slave_threads.front();
exited_slave_threads.pop_front();
slave_threads_.remove(t);
t->Join();
delete t;
for (auto it = slave_threads_.begin(); it != slave_threads_.end();) {
if ((*it)->IsStopped()) {
auto thread = std::move(*it);
it = slave_threads_.erase(it);
thread->Join();
} else {
++it;
}
}
}

Expand Down Expand Up @@ -1333,7 +1329,7 @@ time_t Server::GetLastScanTime(const std::string &ns) {
void Server::SlowlogPushEntryIfNeeded(const std::vector<std::string> *args, uint64_t duration) {
int64_t threshold = config_->slowlog_log_slower_than;
if (threshold < 0 || static_cast<int64_t>(duration) < threshold) return;
auto entry = new SlowEntry();
auto entry = std::make_unique<SlowEntry>();
size_t argc = args->size() > kSlowLogMaxArgc ? kSlowLogMaxArgc : args->size();
for (size_t i = 0; i < argc; i++) {
if (argc != args->size() && i == argc - 1) {
Expand All @@ -1348,7 +1344,7 @@ void Server::SlowlogPushEntryIfNeeded(const std::vector<std::string> *args, uint
}
}
entry->duration = duration;
slow_log_.PushEntry(entry);
slow_log_.PushEntry(std::move(entry));
}

std::string Server::GetClientsStr() {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class Server {

// slave
std::mutex slave_threads_mu_;
std::list<FeedSlaveThread *> slave_threads_;
std::list<std::unique_ptr<FeedSlaveThread>> slave_threads_;
std::atomic<int> fetch_file_threads_num_;

// Some jobs to operate DB should be unique
Expand Down
10 changes: 3 additions & 7 deletions src/stats/log_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,15 @@ LogCollector<T>::~LogCollector() {

template <class T>
ssize_t LogCollector<T>::Size() {
ssize_t n = 0;
std::lock_guard<std::mutex> guard(mu_);
n = entries_.size();
ssize_t n = entries_.size();
return n;
}

template <class T>
void LogCollector<T>::Reset() {
std::lock_guard<std::mutex> guard(mu_);
while (!entries_.empty()) {
delete entries_.front();
entries_.pop_front();
}
}
Expand All @@ -72,22 +70,20 @@ template <class T>
void LogCollector<T>::SetMaxEntries(int64_t max_entries) {
std::lock_guard<std::mutex> guard(mu_);
while (max_entries > 0 && static_cast<int64_t>(entries_.size()) > max_entries) {
delete entries_.back();
entries_.pop_back();
}
max_entries_ = max_entries;
}

template <class T>
void LogCollector<T>::PushEntry(T *entry) {
void LogCollector<T>::PushEntry(std::unique_ptr<T> &&entry) {
std::lock_guard<std::mutex> guard(mu_);
entry->id = ++id_;
entry->time = time(nullptr);
if (max_entries_ > 0 && !entries_.empty() && entries_.size() >= static_cast<size_t>(max_entries_)) {
delete entries_.back();
entries_.pop_back();
}
entries_.push_front(entry);
entries_.push_front(std::move(entry));
}

template <class T>
Expand Down
7 changes: 4 additions & 3 deletions src/stats/log_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
#include <time.h>

#include <cstdint>
#include <deque>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
Expand Down Expand Up @@ -63,12 +64,12 @@ class LogCollector {
ssize_t Size();
void Reset();
void SetMaxEntries(int64_t max_entries);
void PushEntry(T *entry);
void PushEntry(std::unique_ptr<T> &&entry);
std::string GetLatestEntries(int64_t cnt);

private:
std::mutex mu_;
uint64_t id_ = 0;
int64_t max_entries_ = 128;
std::list<T *> entries_;
std::deque<std::unique_ptr<T>> entries_;
};
16 changes: 6 additions & 10 deletions src/storage/lock_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,21 @@

LockManager::LockManager(int hash_power) : hash_power_(hash_power), hash_mask_((1U << hash_power) - 1) {
for (unsigned i = 0; i < Size(); i++) {
mutex_pool_.emplace_back(new std::mutex());
mutex_pool_.emplace_back(new std::mutex{});
}
}

LockManager::~LockManager() {
for (const auto &mu : mutex_pool_) {
delete mu;
}
unsigned LockManager::hash(const rocksdb::Slice &key) {
return std::hash<std::string_view>{}(std::string_view{key.data(), key.size()}) & hash_mask_;
}

unsigned LockManager::hash(const rocksdb::Slice &key) { return std::hash<std::string>{}(key.ToString()) & hash_mask_; }

unsigned LockManager::Size() { return (1U << hash_power_); }

void LockManager::Lock(const rocksdb::Slice &key) { mutex_pool_[hash(key)]->lock(); }

void LockManager::UnLock(const rocksdb::Slice &key) { mutex_pool_[hash(key)]->unlock(); }

std::vector<std::mutex *> LockManager::MultiGet(const std::vector<std::string> &keys) {
std::vector<std::mutex *> locks;
std::set<unsigned, std::greater<unsigned>> to_acquire_indexes;
// We are using the `set` to avoid retrieving the mutex, as well as guarantee to retrieve
// the order of locks.
Expand All @@ -58,9 +53,10 @@ std::vector<std::mutex *> LockManager::MultiGet(const std::vector<std::string> &
to_acquire_indexes.insert(hash(key));
}

std::vector<std::mutex *> locks;
locks.reserve(to_acquire_indexes.size());
for (const auto &index : to_acquire_indexes) {
locks.emplace_back(mutex_pool_[index]);
for (auto index : to_acquire_indexes) {
locks.emplace_back(mutex_pool_[index].get());
}
return locks;
}
4 changes: 2 additions & 2 deletions src/storage/lock_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
class LockManager {
public:
explicit LockManager(int hash_power);
~LockManager();
~LockManager() = default;

LockManager(const LockManager &) = delete;
LockManager &operator=(const LockManager &) = delete;
Expand All @@ -43,7 +43,7 @@ class LockManager {
private:
int hash_power_;
unsigned hash_mask_;
std::vector<std::mutex *> mutex_pool_;
std::vector<std::unique_ptr<std::mutex>> mutex_pool_;
unsigned hash(const rocksdb::Slice &key);
};

Expand Down
8 changes: 4 additions & 4 deletions tests/cppunit/log_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
TEST(LogCollector, PushEntry) {
LogCollector<PerfEntry> perf_log;
perf_log.SetMaxEntries(1);
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(std::make_unique<PerfEntry>());
perf_log.PushEntry(std::make_unique<PerfEntry>());
EXPECT_EQ(perf_log.Size(), 1);
perf_log.SetMaxEntries(2);
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(std::make_unique<PerfEntry>());
perf_log.PushEntry(std::make_unique<PerfEntry>());
EXPECT_EQ(perf_log.Size(), 2);
perf_log.Reset();
EXPECT_EQ(perf_log.Size(), 0);
Expand Down

0 comments on commit a08a0ff

Please sign in to comment.