Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some new-delete patterns and refine LockManager hash function #1268

Merged
merged 1 commit into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ void Connection::recordProfilingSampleIfNeed(const std::string &cmd, uint64_t du
std::string iostats_context = rocksdb::get_iostats_context()->ToString(true);
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);
if (perf_context.empty()) return; // request without db operation
auto entry = new PerfEntry();
auto entry = std::unique_ptr<PerfEntry>();
entry->cmd_name = cmd;
entry->duration = duration;
entry->iostats_context = std::move(iostats_context);
entry->perf_context = std::move(perf_context);
svr_->GetPerfLog()->PushEntry(entry);
svr_->GetPerfLog()->PushEntry(std::move(entry));
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
Expand Down
32 changes: 14 additions & 18 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,43 +280,39 @@ Status Server::RemoveMaster() {
}

Status Server::AddSlave(Redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq) {
auto t = new FeedSlaveThread(this, conn, next_repl_seq);
auto t = std::make_unique<FeedSlaveThread>(this, conn, next_repl_seq);
auto s = t->Start();
if (!s.IsOK()) {
delete t;
return s;
}

std::lock_guard<std::mutex> lg(slave_threads_mu_);
slave_threads_.emplace_back(t);
slave_threads_.emplace_back(std::move(t));
return Status::OK();
}

void Server::DisconnectSlaves() {
std::lock_guard<std::mutex> lg(slave_threads_mu_);
for (const auto &slave_thread : slave_threads_) {
for (auto &slave_thread : slave_threads_) {
if (!slave_thread->IsStopped()) slave_thread->Stop();
}
while (!slave_threads_.empty()) {
auto slave_thread = slave_threads_.front();
auto slave_thread = std::move(slave_threads_.front());
slave_threads_.pop_front();
slave_thread->Join();
delete slave_thread;
}
}

void Server::cleanupExitedSlaves() {
std::list<FeedSlaveThread *> exited_slave_threads;
std::lock_guard<std::mutex> lg(slave_threads_mu_);
for (const auto &slave_thread : slave_threads_) {
if (slave_thread->IsStopped()) exited_slave_threads.emplace_back(slave_thread);
}
while (!exited_slave_threads.empty()) {
auto t = exited_slave_threads.front();
exited_slave_threads.pop_front();
slave_threads_.remove(t);
t->Join();
delete t;
for (auto it = slave_threads_.begin(); it != slave_threads_.end();) {
if ((*it)->IsStopped()) {
auto thread = std::move(*it);
it = slave_threads_.erase(it);
thread->Join();
} else {
++it;
}
}
}

Expand Down Expand Up @@ -1333,7 +1329,7 @@ time_t Server::GetLastScanTime(const std::string &ns) {
void Server::SlowlogPushEntryIfNeeded(const std::vector<std::string> *args, uint64_t duration) {
int64_t threshold = config_->slowlog_log_slower_than;
if (threshold < 0 || static_cast<int64_t>(duration) < threshold) return;
auto entry = new SlowEntry();
auto entry = std::make_unique<SlowEntry>();
size_t argc = args->size() > kSlowLogMaxArgc ? kSlowLogMaxArgc : args->size();
for (size_t i = 0; i < argc; i++) {
if (argc != args->size() && i == argc - 1) {
Expand All @@ -1348,7 +1344,7 @@ void Server::SlowlogPushEntryIfNeeded(const std::vector<std::string> *args, uint
}
}
entry->duration = duration;
slow_log_.PushEntry(entry);
slow_log_.PushEntry(std::move(entry));
}

std::string Server::GetClientsStr() {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class Server {

// slave
std::mutex slave_threads_mu_;
std::list<FeedSlaveThread *> slave_threads_;
std::list<std::unique_ptr<FeedSlaveThread>> slave_threads_;
std::atomic<int> fetch_file_threads_num_;

// Some jobs to operate DB should be unique
Expand Down
10 changes: 3 additions & 7 deletions src/stats/log_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,15 @@ LogCollector<T>::~LogCollector() {

template <class T>
ssize_t LogCollector<T>::Size() {
ssize_t n = 0;
std::lock_guard<std::mutex> guard(mu_);
n = entries_.size();
ssize_t n = entries_.size();
return n;
}

template <class T>
void LogCollector<T>::Reset() {
std::lock_guard<std::mutex> guard(mu_);
while (!entries_.empty()) {
delete entries_.front();
entries_.pop_front();
}
}
Expand All @@ -72,22 +70,20 @@ template <class T>
void LogCollector<T>::SetMaxEntries(int64_t max_entries) {
std::lock_guard<std::mutex> guard(mu_);
while (max_entries > 0 && static_cast<int64_t>(entries_.size()) > max_entries) {
delete entries_.back();
entries_.pop_back();
}
max_entries_ = max_entries;
}

template <class T>
void LogCollector<T>::PushEntry(T *entry) {
void LogCollector<T>::PushEntry(std::unique_ptr<T> &&entry) {
std::lock_guard<std::mutex> guard(mu_);
entry->id = ++id_;
entry->time = time(nullptr);
if (max_entries_ > 0 && !entries_.empty() && entries_.size() >= static_cast<size_t>(max_entries_)) {
delete entries_.back();
entries_.pop_back();
}
entries_.push_front(entry);
entries_.push_front(std::move(entry));
}

template <class T>
Expand Down
7 changes: 4 additions & 3 deletions src/stats/log_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
#include <time.h>

#include <cstdint>
#include <deque>
#include <functional>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
Expand Down Expand Up @@ -63,12 +64,12 @@ class LogCollector {
ssize_t Size();
void Reset();
void SetMaxEntries(int64_t max_entries);
void PushEntry(T *entry);
void PushEntry(std::unique_ptr<T> &&entry);
std::string GetLatestEntries(int64_t cnt);

private:
std::mutex mu_;
uint64_t id_ = 0;
int64_t max_entries_ = 128;
std::list<T *> entries_;
std::deque<std::unique_ptr<T>> entries_;
};
16 changes: 6 additions & 10 deletions src/storage/lock_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,21 @@

LockManager::LockManager(int hash_power) : hash_power_(hash_power), hash_mask_((1U << hash_power) - 1) {
for (unsigned i = 0; i < Size(); i++) {
mutex_pool_.emplace_back(new std::mutex());
mutex_pool_.emplace_back(new std::mutex{});
}
}

LockManager::~LockManager() {
for (const auto &mu : mutex_pool_) {
delete mu;
}
unsigned LockManager::hash(const rocksdb::Slice &key) {
return std::hash<std::string_view>{}(std::string_view{key.data(), key.size()}) & hash_mask_;
}

unsigned LockManager::hash(const rocksdb::Slice &key) { return std::hash<std::string>{}(key.ToString()) & hash_mask_; }

unsigned LockManager::Size() { return (1U << hash_power_); }

void LockManager::Lock(const rocksdb::Slice &key) { mutex_pool_[hash(key)]->lock(); }

void LockManager::UnLock(const rocksdb::Slice &key) { mutex_pool_[hash(key)]->unlock(); }

std::vector<std::mutex *> LockManager::MultiGet(const std::vector<std::string> &keys) {
std::vector<std::mutex *> locks;
std::set<unsigned, std::greater<unsigned>> to_acquire_indexes;
// We are using the `set` to avoid retrieving the mutex, as well as guarantee to retrieve
// the order of locks.
Expand All @@ -58,9 +53,10 @@ std::vector<std::mutex *> LockManager::MultiGet(const std::vector<std::string> &
to_acquire_indexes.insert(hash(key));
}

std::vector<std::mutex *> locks;
locks.reserve(to_acquire_indexes.size());
for (const auto &index : to_acquire_indexes) {
locks.emplace_back(mutex_pool_[index]);
for (auto index : to_acquire_indexes) {
locks.emplace_back(mutex_pool_[index].get());
}
return locks;
}
4 changes: 2 additions & 2 deletions src/storage/lock_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
class LockManager {
public:
explicit LockManager(int hash_power);
~LockManager();
~LockManager() = default;

LockManager(const LockManager &) = delete;
LockManager &operator=(const LockManager &) = delete;
Expand All @@ -43,7 +43,7 @@ class LockManager {
private:
int hash_power_;
unsigned hash_mask_;
std::vector<std::mutex *> mutex_pool_;
std::vector<std::unique_ptr<std::mutex>> mutex_pool_;
unsigned hash(const rocksdb::Slice &key);
};

Expand Down
8 changes: 4 additions & 4 deletions tests/cppunit/log_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
TEST(LogCollector, PushEntry) {
LogCollector<PerfEntry> perf_log;
perf_log.SetMaxEntries(1);
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(std::make_unique<PerfEntry>());
perf_log.PushEntry(std::make_unique<PerfEntry>());
EXPECT_EQ(perf_log.Size(), 1);
perf_log.SetMaxEntries(2);
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(new PerfEntry());
perf_log.PushEntry(std::make_unique<PerfEntry>());
perf_log.PushEntry(std::make_unique<PerfEntry>());
EXPECT_EQ(perf_log.Size(), 2);
perf_log.Reset();
EXPECT_EQ(perf_log.Size(), 0);
Expand Down