Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support blocking migration for the cluster migrate command #1418

Merged
merged 11 commits into from
May 15, 2023
9 changes: 4 additions & 5 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ Status Cluster::SetSlotImported(int slot) {
return Status::OK();
}

Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx) {
if (nodes_.find(dst_node_id) == nodes_.end()) {
return {Status::NotOK, "Can't find the destination node id"};
}
Expand All @@ -316,10 +317,8 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
return {Status::NotOK, "Can't migrate slot to myself"};
}

const auto dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(
dst_node_id, dst->host, dst->port, slot, svr_->GetConfig()->migrate_speed, svr_->GetConfig()->pipeline_size,
svr_->GetConfig()->sequence_gap);
const auto &dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx);
return s;
}

Expand Down
4 changes: 3 additions & 1 deletion src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct SlotInfo {
using ClusterNodes = std::unordered_map<std::string, std::shared_ptr<ClusterNode>>;

class Server;
class SyncMigrateContext;

class Cluster {
public:
Expand All @@ -91,7 +92,8 @@ class Cluster {
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status MigrateSlot(int slot, const std::string &dst_node_id,
SyncMigrateContext *blocking_ctx = nullptr);
Status ImportSlot(redis::Connection *conn, int slot, int state);
std::string GetMyId() const { return myid_; }
Status DumpClusterNodes(const std::string &file);
Expand Down
32 changes: 31 additions & 1 deletion src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "fmt/format.h"
#include "io_util.h"
#include "storage/batch_extractor.h"
#include "sync_migrate_context.h"
#include "thread_util.h"
#include "time_util.h"
#include "types/redis_stream_base.h"
Expand Down Expand Up @@ -76,7 +77,7 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin
}

Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id,
int speed, int pipeline_size, int seq_gap) {
SyncMigrateContext *blocking_ctx) {
// Only one slot migration job at the same time
int16_t no_slot = -1;
if (!migrating_slot_.compare_exchange_strong(no_slot, static_cast<int16_t>(slot_id))) {
Expand All @@ -91,6 +92,10 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin

migration_state_ = MigrationState::kStarted;

auto speed = svr_->GetConfig()->migrate_speed;
auto seq_gap = svr_->GetConfig()->sequence_gap;
auto pipeline_size = svr_->GetConfig()->pipeline_size;

if (speed <= 0) {
speed = 0;
}
Expand All @@ -103,6 +108,12 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin
seq_gap = kDefaultSequenceGapLimit;
}

if (blocking_ctx) {
auto lock = blockingLock();
blocking_context_ = blocking_ctx;
blocking_context_->StartBlock();
}

dst_node_ = node_id;

// Create migration job
Expand Down Expand Up @@ -184,6 +195,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to start migrating slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
wakeupBlocking(s);
}
break;
}
Expand All @@ -194,6 +206,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
wakeupBlocking(s);
}
break;
}
Expand All @@ -205,6 +218,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
wakeupBlocking(s);
}
break;
}
Expand All @@ -214,10 +228,12 @@ void SlotMigrator::runMigrationProcess() {
LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_;
current_stage_ = SlotMigrationStage::kClean;
migration_state_ = MigrationState::kSuccess;
wakeupBlocking(s);
} else {
LOG(ERROR) << "[migrate] Failed to finish a successful migration of slot " << migrating_slot_
<< ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
wakeupBlocking(s);
}
break;
}
Expand Down Expand Up @@ -1083,3 +1099,17 @@ void SlotMigrator::GetMigrationInfo(std::string *info) const {
*info =
fmt::format("migrating_slot: {}\r\ndestination_node: {}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state);
}

void SlotMigrator::CancelBlocking() {
auto lock = blockingLock();
blocking_context_ = nullptr;
}

void SlotMigrator::wakeupBlocking(const Status &migrate_result) {
auto lock = blockingLock();
if (blocking_context_) {
blocking_context_->Wakeup(migrate_result);

blocking_context_ = nullptr;
}
}
13 changes: 11 additions & 2 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct SlotMigrationJob {
int seq_gap_limit;
};

class SyncMigrateContext;

class SlotMigrator : public redis::Database {
public:
explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
Expand All @@ -80,8 +82,8 @@ class SlotMigrator : public redis::Database {
~SlotMigrator();

Status CreateMigrationThread();
Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed,
int pipeline_size, int seq_gap);
Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id,
SyncMigrateContext *blocking_ctx = nullptr);
void ReleaseForbiddenSlot();
void SetMaxMigrationSpeed(int value) {
if (value >= 0) max_migration_speed_ = value;
Expand All @@ -98,6 +100,7 @@ class SlotMigrator : public redis::Database {
int16_t GetForbiddenSlot() const { return forbidden_slot_; }
int16_t GetMigratingSlot() const { return migrating_slot_; }
void GetMigrationInfo(std::string *info) const;
void CancelBlocking();

private:
void loop();
Expand Down Expand Up @@ -131,6 +134,9 @@ class SlotMigrator : public redis::Database {
Status syncWalBeforeForbiddingSlot();
Status syncWalAfterForbiddingSlot();
void setForbiddenSlot(int16_t slot);
std::unique_lock<std::mutex> blockingLock() { return std::unique_lock<std::mutex>(blocking_mutex_); }

void wakeupBlocking(const Status &migrate_result);

enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd };
enum class ThreadState { Uninitialized, Running, Terminated };
Expand Down Expand Up @@ -170,4 +176,7 @@ class SlotMigrator : public redis::Database {
std::atomic<bool> stop_migration_ = false; // if is true migration will be stopped but the thread won't be destroyed
const rocksdb::Snapshot *slot_snapshot_ = nullptr;
uint64_t wal_begin_seq_ = 0;

std::mutex blocking_mutex_;
SyncMigrateContext *blocking_context_;
};
100 changes: 100 additions & 0 deletions src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "cluster/sync_migrate_context.h"

SyncMigrateContext::~SyncMigrateContext() {
if (timer_) {
event_free(timer_);
timer_ = nullptr;
}
}

void SyncMigrateContext::StartBlock() {
auto bev = conn_->GetBufferEvent();
bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this);

if (timeout_) {
timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this);
timeval tm = {timeout_, 0};
evtimer_add(timer_, &tm);
}
}

void SyncMigrateContext::Wakeup(const Status &migrate_result) {
migrate_result_ = migrate_result;
auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD());
if (!s.IsOK()) {
LOG(ERROR) << "[server] Failed to enable write event on the sync migrate connection " << conn_->GetFD() << ": "
<< s.Msg();
}
}

void SyncMigrateContext::EventCB(bufferevent *bev, int16_t events, void *ctx) {
auto self = reinterpret_cast<SyncMigrateContext *>(ctx);
auto &&slot_migrator = self->svr_->slot_migrator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using auto && here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a reference to std::unique_ptr<SlotMigrator>, which cannot be copied.

Technically auto& would suffice, but I personally prefer auto&& since it accepts more kinds of value (e.g. const, not this case though).

Of course, it is also okay to use auto slot_migrator = self->svr_->slot_migrator.get() (raw pointer) here. Just a choice of style.


if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (self->timer_ != nullptr) {
event_free(self->timer_);
self->timer_ = nullptr;
}

slot_migrator->CancelBlocking();
}
redis::Connection::OnEvent(bev, events, self->conn_);
}

void SyncMigrateContext::TimerCB(int, int16_t events, void *ctx) {
auto self = reinterpret_cast<SyncMigrateContext *>(ctx);
auto &&slot_migrator = self->svr_->slot_migrator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


self->conn_->Reply(redis::NilString());
event_free(self->timer_);
self->timer_ = nullptr;

slot_migrator->CancelBlocking();

auto bev = self->conn_->GetBufferEvent();
bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent,
self->conn_);
bufferevent_enable(bev, EV_READ);
}

void SyncMigrateContext::WriteCB(bufferevent *bev, void *ctx) {
auto self = reinterpret_cast<SyncMigrateContext *>(ctx);

if (self->migrate_result_) {
self->conn_->Reply(redis::SimpleString("OK"));
} else {
self->conn_->Reply(redis::Error("ERR " + self->migrate_result_.Msg()));
}

if (self->timer_) {
event_free(self->timer_);
self->timer_ = nullptr;
}

bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent,
self->conn_);
bufferevent_enable(bev, EV_READ);

bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}
48 changes: 48 additions & 0 deletions src/cluster/sync_migrate_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once
#include "server/server.h"

class SyncMigrateContext {
public:
SyncMigrateContext(Server *svr, redis::Connection *conn, int timeout) : svr_(svr), conn_(conn), timeout_(timeout){};
SyncMigrateContext(SyncMigrateContext &&) = delete;
SyncMigrateContext(const SyncMigrateContext &) = delete;

SyncMigrateContext &operator=(SyncMigrateContext &&) = delete;
SyncMigrateContext &operator=(const SyncMigrateContext &) = delete;

~SyncMigrateContext();

void StartBlock();
void Wakeup(const Status &migrate_result);
static void WriteCB(bufferevent *bev, void *ctx);
static void EventCB(bufferevent *bev, int16_t events, void *ctx);
static void TimerCB(int, int16_t events, void *ctx);

private:
Server *svr_;
redis::Connection *conn_;
int timeout_ = 0;
event *timer_ = nullptr;

Status migrate_result_;
};
Loading