Skip to content

Commit

Permalink
Fix data race in connection
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Nov 16, 2023
1 parent e0520b2 commit e328917
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
14 changes: 10 additions & 4 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ void Connection::Close() {

void Connection::Detach() { owner_->DetachConnection(this); }

void Connection::OnRead(bufferevent *bev) {
DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);
void Connection::OnRead(struct bufferevent *bev) {
is_running_ = true;
MakeScopeExit([this] { is_running_ = false; });

SetLastInteraction();
auto s = req_.Tokenize(Input());
Expand Down Expand Up @@ -178,6 +179,13 @@ void Connection::DisableFlag(Flag flag) { flags_ &= (~flag); }

bool Connection::IsFlagEnabled(Flag flag) const { return (flags_ & flag) > 0; }

bool Connection::CanMigrate() const {
return !is_running_ // reading or writing
&& !IsFlagEnabled(redis::Connection::kCloseAfterReply) // close after reply
&& saved_current_command_ == nullptr // not executing blocking command like BLPOP
&& subscribe_channels_.empty() && subscribe_patterns_.empty(); // not subscribing any channel
}

void Connection::SubscribeChannel(const std::string &channel) {
for (const auto &chan : subscribe_channels_) {
if (channel == chan) return;
Expand Down Expand Up @@ -296,8 +304,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
Config *config = srv_->GetConfig();
std::string reply, password = config->requirepass;

has_running_command_ = true;
MakeScopeExit([this] { has_running_command_ = false; });
while (!to_process_cmds->empty()) {
auto cmd_tokens = to_process_cmds->front();
to_process_cmds->pop_front();
Expand Down
4 changes: 2 additions & 2 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class Connection : public EvbufCallbackBase<Connection> {
Worker *Owner() { return owner_; }
void SetOwner(Worker *new_owner) { owner_ = new_owner; };
int GetFD() { return bufferevent_getfd(bev_); }
bool HasRunningCommand() const { return has_running_command_; }
evbuffer *Input() { return bufferevent_get_input(bev_); }
evbuffer *Output() { return bufferevent_get_output(bev_); }
bufferevent *GetBufferEvent() { return bev_; }
Expand All @@ -120,6 +119,7 @@ class Connection : public EvbufCallbackBase<Connection> {
void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
void SetImporting() { importing_ = true; }
bool IsImporting() const { return importing_; }
bool CanMigrate() const;

// Multi exec
void SetInExec() { in_exec_ = true; }
Expand Down Expand Up @@ -160,7 +160,7 @@ class Connection : public EvbufCallbackBase<Connection> {
Server *srv_;
bool in_exec_ = false;
bool multi_error_ = false;
std::atomic<bool> has_running_command_ = false;
std::atomic<bool> is_running_ = false;
std::deque<redis::CommandTokens> multi_cmds_;

bool importing_ = false;
Expand Down
10 changes: 7 additions & 3 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,15 @@ redis::Connection *Worker::removeConnection(int fd) {
// blocked on a key or stream.
void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
if (!target || !conn) return;

auto bev = conn->GetBufferEvent();
// disable read/write event to prevent the connection from being processed during migration
bufferevent_disable(bev, EV_READ | EV_WRITE);
// We cannot migrate the connection if it has a running command
// since it will cause data race since the old worker may still process the command.
if (conn->HasRunningCommand()) {
// don't need to close the connection since destroy worker thread will close it
if (!conn->CanMigrate()) {
// Need to enable read/write event again since we disabled them before
bufferevent_enable(bev, EV_READ | EV_WRITE);
return;
}

Expand All @@ -373,7 +378,6 @@ void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
conn->Close();
return;
}
auto bev = conn->GetBufferEvent();
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ | EV_WRITE);
Expand Down

0 comments on commit e328917

Please sign in to comment.