Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support blocking migration for the cluster migrate command #1418

Merged
merged 11 commits into from
May 15, 2023
8 changes: 3 additions & 5 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ Status Cluster::SetSlotImported(int slot) {
return Status::OK();
}

Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx) {
if (nodes_.find(dst_node_id) == nodes_.end()) {
return {Status::NotOK, "Can't find the destination node id"};
}
Expand All @@ -305,10 +305,8 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
return {Status::NotOK, "Can't migrate slot to myself"};
}

const auto dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(
dst_node_id, dst->host, dst->port, slot, svr_->GetConfig()->migrate_speed, svr_->GetConfig()->pipeline_size,
svr_->GetConfig()->sequence_gap);
const auto &dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx);
return s;
}

Expand Down
3 changes: 2 additions & 1 deletion src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct SlotInfo {
using ClusterNodes = std::unordered_map<std::string, std::shared_ptr<ClusterNode>>;

class Server;
class SyncMigrateContext;

class Cluster {
public:
Expand All @@ -84,7 +85,7 @@ class Cluster {
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx = nullptr);
Status ImportSlot(redis::Connection *conn, int slot, int state);
std::string GetMyId() const { return myid_; }
Status DumpClusterNodes(const std::string &file);
Expand Down
32 changes: 31 additions & 1 deletion src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "fmt/format.h"
#include "io_util.h"
#include "storage/batch_extractor.h"
#include "sync_migrate_context.h"
#include "thread_util.h"
#include "time_util.h"
#include "types/redis_stream_base.h"
Expand Down Expand Up @@ -76,7 +77,7 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin
}

Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id,
int speed, int pipeline_size, int seq_gap) {
SyncMigrateContext *blocking_ctx) {
// Only one slot migration job at the same time
int16_t no_slot = -1;
if (!migrating_slot_.compare_exchange_strong(no_slot, static_cast<int16_t>(slot_id))) {
Expand All @@ -91,6 +92,10 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin

migration_state_ = MigrationState::kStarted;

auto speed = svr_->GetConfig()->migrate_speed;
auto seq_gap = svr_->GetConfig()->sequence_gap;
auto pipeline_size = svr_->GetConfig()->pipeline_size;

if (speed <= 0) {
speed = 0;
}
Expand All @@ -103,6 +108,12 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin
seq_gap = kDefaultSequenceGapLimit;
}

if (blocking_ctx) {
std::unique_lock<std::mutex> lock(blocking_mutex_);
blocking_context_ = blocking_ctx;
blocking_context_->Suspend();
}

dst_node_ = node_id;

// Create migration job
Expand Down Expand Up @@ -184,6 +195,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to start migrating slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand All @@ -194,6 +206,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand All @@ -205,6 +218,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand All @@ -214,10 +228,12 @@ void SlotMigrator::runMigrationProcess() {
LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_;
current_stage_ = SlotMigrationStage::kClean;
migration_state_ = MigrationState::kSuccess;
resumeSyncCtx(s);
} else {
LOG(ERROR) << "[migrate] Failed to finish a successful migration of slot " << migrating_slot_
<< ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand Down Expand Up @@ -1083,3 +1099,17 @@ void SlotMigrator::GetMigrationInfo(std::string *info) const {
*info =
fmt::format("migrating_slot: {}\r\ndestination_node: {}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state);
}

void SlotMigrator::CancelSyncCtx() {
std::unique_lock<std::mutex> lock(blocking_mutex_);
blocking_context_ = nullptr;
}

void SlotMigrator::resumeSyncCtx(const Status &migrate_result) {
std::unique_lock<std::mutex> lock(blocking_mutex_);
if (blocking_context_) {
blocking_context_->Resume(migrate_result);

blocking_context_ = nullptr;
}
}
13 changes: 11 additions & 2 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct SlotMigrationJob {
int seq_gap_limit;
};

class SyncMigrateContext;

class SlotMigrator : public redis::Database {
public:
explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
Expand All @@ -80,8 +82,8 @@ class SlotMigrator : public redis::Database {
~SlotMigrator();

Status CreateMigrationThread();
Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed,
int pipeline_size, int seq_gap);
Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id,
SyncMigrateContext *blocking_ctx = nullptr);
void ReleaseForbiddenSlot();
void SetMaxMigrationSpeed(int value) {
if (value >= 0) max_migration_speed_ = value;
Expand All @@ -98,6 +100,7 @@ class SlotMigrator : public redis::Database {
int16_t GetForbiddenSlot() const { return forbidden_slot_; }
int16_t GetMigratingSlot() const { return migrating_slot_; }
void GetMigrationInfo(std::string *info) const;
void CancelSyncCtx();

private:
void loop();
Expand Down Expand Up @@ -131,6 +134,9 @@ class SlotMigrator : public redis::Database {
Status syncWalBeforeForbiddingSlot();
Status syncWalAfterForbiddingSlot();
void setForbiddenSlot(int16_t slot);
std::unique_lock<std::mutex> blockingLock() { return std::unique_lock<std::mutex>(blocking_mutex_); }

void resumeSyncCtx(const Status &migrate_result);

enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd };
enum class ThreadState { Uninitialized, Running, Terminated };
Expand Down Expand Up @@ -170,4 +176,7 @@ class SlotMigrator : public redis::Database {
std::atomic<bool> stop_migration_ = false; // if is true migration will be stopped but the thread won't be destroyed
const rocksdb::Snapshot *slot_snapshot_ = nullptr;
uint64_t wal_begin_seq_ = 0;

std::mutex blocking_mutex_;
SyncMigrateContext *blocking_context_ = nullptr;
};
81 changes: 81 additions & 0 deletions src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "cluster/sync_migrate_context.h"

void SyncMigrateContext::Suspend() {
auto bev = conn_->GetBufferEvent();
SetCB(bev);

if (timeout_ > 0) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int sec = static_cast<int>(timeout_);
timeval tm = {sec, static_cast<int>((timeout_ - static_cast<float>(sec)) * 1000000)};
evtimer_add(timer_.get(), &tm);
}
}

void SyncMigrateContext::Resume(const Status &migrate_result) {
migrate_result_ = migrate_result;
auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD());
if (!s.IsOK()) {
LOG(ERROR) << "[server] Failed to enable write event on the sync migrate connection " << conn_->GetFD() << ": "
<< s.Msg();
}
}

void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
auto &&slot_migrator = svr_->slot_migrator;

if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
timer_.reset();

slot_migrator->CancelSyncCtx();
}
conn_->OnEvent(bev, events);
}

void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = svr_->slot_migrator;

conn_->Reply(redis::NilString());
timer_.reset();

slot_migrator->CancelSyncCtx();

auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

void SyncMigrateContext::OnWrite(bufferevent *bev) {
if (migrate_result_) {
conn_->Reply(redis::SimpleString("OK"));
} else {
conn_->Reply(redis::Error("ERR " + migrate_result_.Msg()));
}

timer_.reset();

conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);

bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}
43 changes: 43 additions & 0 deletions src/cluster/sync_migrate_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once
#include "event_util.h"
#include "server/server.h"

class SyncMigrateContext : private EvbufCallbackBase<SyncMigrateContext, false>,
private EventCallbackBase<SyncMigrateContext> {
public:
SyncMigrateContext(Server *svr, redis::Connection *conn, float timeout) : svr_(svr), conn_(conn), timeout_(timeout){};

void Suspend();
void Resume(const Status &migrate_result);
void OnWrite(bufferevent *bev);
void OnEvent(bufferevent *bev, int16_t events);
void TimerCB(int, int16_t events);

private:
Server *svr_;
redis::Connection *conn_;
float timeout_ = 0;
UniqueEvent timer_;

Status migrate_result_;
};
35 changes: 33 additions & 2 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "cluster/cluster_defs.h"
#include "cluster/slot_import.h"
#include "cluster/sync_migrate_context.h"
#include "commander.h"
#include "error_constants.h"

Expand Down Expand Up @@ -127,11 +128,30 @@ class CommandClusterX : public Commander {
if (subcommand_ == "setnodeid" && args_.size() == 3 && args_[2].size() == kClusterNodeIdLen) return Status::OK();

if (subcommand_ == "migrate") {
if (args.size() != 4) return {Status::RedisParseErr, errWrongNumOfArguments};
if (args.size() < 4 || args.size() > 6) return {Status::RedisParseErr, errWrongNumOfArguments};

slot_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10));

dst_node_id_ = args[3];

if (args.size() >= 5) {
auto sync_flag = util::ToLower(args[4]);
if (sync_flag == "async") {
sync_migrate_ = false;

if (args.size() == 6) {
return {Status::RedisParseErr, "Async migration does not support timeout"};
}
} else if (sync_flag == "sync") {
sync_migrate_ = true;

if (args.size() == 6) {
sync_migrate_timeout_ = GET_OR_RET(ParseFloat<float>(args[5]));
}
} else {
return {Status::RedisParseErr, "Invalid sync flag"};
}
}
return Status::OK();
}

Expand Down Expand Up @@ -225,8 +245,15 @@ class CommandClusterX : public Commander {
int64_t v = svr->cluster->GetVersion();
*output = redis::BulkString(std::to_string(v));
} else if (subcommand_ == "migrate") {
Status s = svr->cluster->MigrateSlot(static_cast<int>(slot_), dst_node_id_);
if (sync_migrate_) {
sync_migrate_ctx_ = std::make_unique<SyncMigrateContext>(svr, conn, sync_migrate_timeout_);
}

Status s = svr->cluster->MigrateSlot(static_cast<int>(slot_), dst_node_id_, sync_migrate_ctx_.get());
if (s.IsOK()) {
if (sync_migrate_) {
return {Status::BlockingCmd};
}
*output = redis::SimpleString("OK");
} else {
*output = redis::Error(s.Msg());
Expand All @@ -248,6 +275,10 @@ class CommandClusterX : public Commander {
int64_t slot_ = -1;
std::vector<SlotRange> slot_ranges_;
bool force_ = false;

bool sync_migrate_ = false;
float sync_migrate_timeout_ = 0;
std::unique_ptr<SyncMigrateContext> sync_migrate_ctx_ = nullptr;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster no-script", 0, 0, 0),
Expand Down
Loading