Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown the workers when reducing worker threads #1863

Merged
merged 9 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

bool IsBlocking() const override { return true; }
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
Expand Down
2 changes: 0 additions & 2 deletions src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void SubscribeCommandReply(std::string *output, const std::string &name, const s

class CommandSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
Expand Down Expand Up @@ -112,7 +111,6 @@ class CommandUnSubscribe : public Commander {

class CommandPSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (size_t i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
Expand Down
1 change: 0 additions & 1 deletion src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ class CommandXRead : public Commander,
private EvbufCallbackBase<CommandXRead, false>,
private EventCallbackBase<CommandXRead> {
public:
bool IsBlocking() const override { return true; }
Status Parse(const std::vector<std::string> &args) override {
size_t streams_word_idx = 0;

Expand Down
1 change: 0 additions & 1 deletion src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class Commander {
void SetAttributes(const CommandAttributes *attributes) { attributes_ = attributes; }
const CommandAttributes *GetAttributes() const { return attributes_; }
void SetArgs(const std::vector<std::string> &args) { args_ = args; }
virtual bool IsBlocking() const { return false; }
virtual Status Parse() { return Parse(args_); }
virtual Status Parse(const std::vector<std::string> &args) { return Status::OK(); }
virtual Status Execute(Server *srv, Connection *conn, std::string *output) {
Expand Down
19 changes: 17 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "commands/blocking_commander.h"
#include "redis_connection.h"
#include "scope_exit.h"
#include "server.h"
#include "time_util.h"
#include "tls_util.h"
Expand Down Expand Up @@ -75,8 +76,9 @@ void Connection::Close() {

void Connection::Detach() { owner_->DetachConnection(this); }

void Connection::OnRead(bufferevent *bev) {
DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);
void Connection::OnRead(struct bufferevent *bev) {
is_running_ = true;
MakeScopeExit([this] { is_running_ = false; });

SetLastInteraction();
auto s = req_.Tokenize(Input());
Expand Down Expand Up @@ -177,6 +179,13 @@ void Connection::DisableFlag(Flag flag) { flags_ &= (~flag); }

bool Connection::IsFlagEnabled(Flag flag) const { return (flags_ & flag) > 0; }

bool Connection::CanMigrate() const {
return !is_running_ // reading or writing
&& !IsFlagEnabled(redis::Connection::kCloseAfterReply) // close after reply
&& saved_current_command_ == nullptr // not executing blocking command like BLPOP
&& subscribe_channels_.empty() && subscribe_patterns_.empty(); // not subscribing any channel
}

void Connection::SubscribeChannel(const std::string &channel) {
for (const auto &chan : subscribe_channels_) {
if (channel == chan) return;
Expand Down Expand Up @@ -302,6 +311,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) break;

std::unique_ptr<Commander> current_cmd;
auto s = srv_->LookupAndCreateCommand(cmd_tokens.front(), &current_cmd);
if (!s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
Expand Down Expand Up @@ -424,6 +434,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
// Break the execution loop when occurring the blocking command like BLPOP or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
// For the blocking command, it will use the command while resumed from the suspend state.
// So we need to save the command for the next execution.
// Migrate connection would also check the saved_current_command_ to determine whether
// the connection can be migrated or not.
saved_current_command_ = std::move(current_cmd);
break;
}

Expand Down
5 changes: 4 additions & 1 deletion src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Connection : public EvbufCallbackBase<Connection> {
void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
void SetImporting() { importing_ = true; }
bool IsImporting() const { return importing_; }
bool CanMigrate() const;

// Multi exec
void SetInExec() { in_exec_ = true; }
Expand All @@ -127,7 +128,6 @@ class Connection : public EvbufCallbackBase<Connection> {
void ResetMultiExec();
std::deque<redis::CommandTokens> *GetMultiExecCommands() { return &multi_cmds_; }

std::unique_ptr<Commander> current_cmd;
std::function<void(int)> close_cb = nullptr;

std::set<std::string> watched_keys;
Expand All @@ -152,12 +152,15 @@ class Connection : public EvbufCallbackBase<Connection> {
bufferevent *bev_;
Request req_;
Worker *owner_;
std::unique_ptr<Commander> saved_current_command_;

std::vector<std::string> subscribe_channels_;
std::vector<std::string> subscribe_patterns_;

Server *srv_;
bool in_exec_ = false;
bool multi_error_ = false;
std::atomic<bool> is_running_ = false;
std::deque<redis::CommandTokens> multi_cmds_;

bool importing_ = false;
Expand Down
32 changes: 21 additions & 11 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Server::~Server() {
for (auto &worker_thread : worker_threads_) {
worker_thread.reset();
}
cleanupExitedWorkerThreads();
cleanupExitedWorkerThreads(true /* force */);
CleanupExitedSlaves();

lua::DestroyState(lua_);
Expand Down Expand Up @@ -226,7 +226,7 @@ void Server::Stop() {
slaveof_mu_.unlock();

for (const auto &worker : worker_threads_) {
worker->Stop();
worker->Stop(0 /* immediately terminate */);
}

rocksdb::CancelAllBackgroundWork(storage->GetDB(), true);
Expand Down Expand Up @@ -739,8 +739,9 @@ void Server::cron() {
storage->SetDBInRetryableIOError(false);
}

if (counter != 0 && counter % 10 == 0) {
cleanupExitedWorkerThreads();
// check if we need to clean up exited worker threads every 5s
if (counter != 0 && counter % 50 == 0) {
cleanupExitedWorkerThreads(false);
}

CleanupExitedSlaves();
Expand Down Expand Up @@ -1685,12 +1686,12 @@ void Server::AdjustWorkerThreads() {
if (new_worker_threads > worker_threads_.size()) {
delta = new_worker_threads - worker_threads_.size();
increaseWorkerThreads(delta);
LOG(INFO) << "[server] Increase worker threads to " << new_worker_threads;
LOG(INFO) << "[server] Increase worker threads from " << worker_threads_.size() << " to " << new_worker_threads;
return;
}

delta = worker_threads_.size() - new_worker_threads;
LOG(INFO) << "[server] Decrease worker threads to " << new_worker_threads;
LOG(INFO) << "[server] Decrease worker threads from " << worker_threads_.size() << " to " << new_worker_threads;
decreaseWorkerThreads(delta);
}

Expand Down Expand Up @@ -1721,17 +1722,26 @@ void Server::decreaseWorkerThreads(size_t delta) {
auto target_worker = worker_threads_[iter.first % remain_worker_threads]->GetWorker();
worker_thread->GetWorker()->MigrateConnection(target_worker, iter.second);
}
worker_thread->Stop();
worker_thread->Stop(10 /* graceful timeout */);
// Don't join the worker thread here, because it may join itself.
recycle_worker_threads_.push(std::move(worker_thread));
}
}

void Server::cleanupExitedWorkerThreads() {
void Server::cleanupExitedWorkerThreads(bool force) {
std::unique_ptr<WorkerThread> worker_thread = nullptr;
while (recycle_worker_threads_.try_pop(worker_thread)) {
worker_thread->Join();
worker_thread.reset();
auto total = recycle_worker_threads_.unsafe_size();
for (size_t i = 0; i < total; i++) {
if (!recycle_worker_threads_.try_pop(worker_thread)) {
break;
}
if (worker_thread->IsTerminated() || force) {
worker_thread->Join();
worker_thread.reset();
} else {
// Push the worker thread back to the queue if it's still running.
recycle_worker_threads_.push(std::move(worker_thread));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class Server {
void updateAllWatchedKeys();
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
void cleanupExitedWorkerThreads();
void cleanupExitedWorkerThreads(bool force);

std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
Expand Down
35 changes: 25 additions & 10 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
}

evutil_make_socket_nonblocking(fd);
auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd);
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
listen_events_.emplace_back(lev);
}

Expand Down Expand Up @@ -292,14 +293,22 @@ void Worker::Run(std::thread::id tid) {
if (event_base_dispatch(base_) != 0) {
LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
}
is_terminated_ = true;
}

void Worker::Stop() {
event_base_loopbreak(base_);
void Worker::Stop(uint32_t wait_seconds) {
for (const auto &lev : listen_events_) {
// It's unnecessary to close the listener fd since we have set the LEV_OPT_CLOSE_ON_FREE flag
evconnlistener_free(lev);
}
// wait_seconds == 0 means stop immediately, or it will wait N seconds
// for the worker to process the remaining requests before stopping.
if (wait_seconds > 0) {
timeval tv = {wait_seconds, 0};
event_base_loopexit(base_, &tv);
} else {
event_base_loopbreak(base_);
}
}

Status Worker::AddConnection(redis::Connection *c) {
Expand Down Expand Up @@ -351,18 +360,24 @@ redis::Connection *Worker::removeConnection(int fd) {
// blocked on a key or stream.
void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
if (!target || !conn) return;
if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) {
// don't need to close the connection since destroy worker thread will close it

auto bev = conn->GetBufferEvent();
// disable read/write event to prevent the connection from being processed during migration
bufferevent_disable(bev, EV_READ | EV_WRITE);
// We cannot migrate the connection if it has a running command
// since it will cause data race since the old worker may still process the command.
if (!conn->CanMigrate()) {
// Need to enable read/write event again since we disabled them before
bufferevent_enable(bev, EV_READ | EV_WRITE);
return;
}

// remove the connection from current worker
DetachConnection(conn);
if (!target->AddConnection(conn).IsOK()) {
// destroy worker thread will close the connection
conn->Close();
return;
}
// remove the connection from current worker
DetachConnection(conn);
auto bev = conn->GetBufferEvent();
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ | EV_WRITE);
Expand Down Expand Up @@ -540,7 +555,7 @@ void WorkerThread::Start() {
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

void WorkerThread::Stop() { worker_->Stop(); }
void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }

void WorkerThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
Expand Down
7 changes: 5 additions & 2 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
Worker(Worker &&) = delete;
Worker &operator=(const Worker &) = delete;

void Stop();
void Stop(uint32_t wait_seconds);
void Run(std::thread::id tid);
bool IsTerminated() const { return is_terminated_; }

void MigrateConnection(Worker *target, redis::Connection *conn);
void DetachConnection(redis::Connection *conn);
Expand Down Expand Up @@ -94,6 +95,7 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr;
struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr;
lua_State *lua_;
std::atomic<bool> is_terminated_ = false;
};

class WorkerThread {
Expand All @@ -106,8 +108,9 @@ class WorkerThread {

Worker *GetWorker() { return worker_.get(); }
void Start();
void Stop();
void Stop(uint32_t wait_seconds);
void Join();
bool IsTerminated() const { return worker_->IsTerminated(); }

private:
std::thread t_;
Expand Down
9 changes: 4 additions & 5 deletions tests/gocase/unit/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClientWithOption(&redis.Options{
MaxIdleConns: 20,
MaxRetries: -1, // Disable retry to check connections are alive after config change
})
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("Test dynamic change worker thread", func(t *testing.T) {
Expand Down Expand Up @@ -217,12 +214,14 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
go func() {
defer wg.Done()
_ = rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"s1", "s2", "s3"},
Streams: []string{"s1", "$"},
Count: 1,
Block: blockingTimeout,
})
}()

// sleep a while to make sure all blocking requests are ready
time.Sleep(time.Second)
require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", "workers", "1").Err())
wg.Wait()

Expand Down