Skip to content

Commit

Permalink
Support blocking migration for the cluster migrate command (#1418)
Browse files Browse the repository at this point in the history
Co-authored-by: Twice <[email protected]>
Co-authored-by: hulk <[email protected]>
  • Loading branch information
3 people authored May 15, 2023
1 parent d2d10f7 commit 75db3e4
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 15 deletions.
8 changes: 3 additions & 5 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ Status Cluster::SetSlotImported(int slot) {
return Status::OK();
}

Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx) {
if (nodes_.find(dst_node_id) == nodes_.end()) {
return {Status::NotOK, "Can't find the destination node id"};
}
Expand All @@ -305,10 +305,8 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
return {Status::NotOK, "Can't migrate slot to myself"};
}

const auto dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(
dst_node_id, dst->host, dst->port, slot, svr_->GetConfig()->migrate_speed, svr_->GetConfig()->pipeline_size,
svr_->GetConfig()->sequence_gap);
const auto &dst = nodes_[dst_node_id];
Status s = svr_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx);
return s;
}

Expand Down
3 changes: 2 additions & 1 deletion src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct SlotInfo {
using ClusterNodes = std::unordered_map<std::string, std::shared_ptr<ClusterNode>>;

class Server;
class SyncMigrateContext;

class Cluster {
public:
Expand All @@ -84,7 +85,7 @@ class Cluster {
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx = nullptr);
Status ImportSlot(redis::Connection *conn, int slot, int state);
std::string GetMyId() const { return myid_; }
Status DumpClusterNodes(const std::string &file);
Expand Down
32 changes: 31 additions & 1 deletion src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "fmt/format.h"
#include "io_util.h"
#include "storage/batch_extractor.h"
#include "sync_migrate_context.h"
#include "thread_util.h"
#include "time_util.h"
#include "types/redis_stream_base.h"
Expand Down Expand Up @@ -76,7 +77,7 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin
}

Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id,
int speed, int pipeline_size, int seq_gap) {
SyncMigrateContext *blocking_ctx) {
// Only one slot migration job at the same time
int16_t no_slot = -1;
if (!migrating_slot_.compare_exchange_strong(no_slot, static_cast<int16_t>(slot_id))) {
Expand All @@ -91,6 +92,10 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin

migration_state_ = MigrationState::kStarted;

auto speed = svr_->GetConfig()->migrate_speed;
auto seq_gap = svr_->GetConfig()->sequence_gap;
auto pipeline_size = svr_->GetConfig()->pipeline_size;

if (speed <= 0) {
speed = 0;
}
Expand All @@ -103,6 +108,12 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin
seq_gap = kDefaultSequenceGapLimit;
}

if (blocking_ctx) {
std::unique_lock<std::mutex> lock(blocking_mutex_);
blocking_context_ = blocking_ctx;
blocking_context_->Suspend();
}

dst_node_ = node_id;

// Create migration job
Expand Down Expand Up @@ -184,6 +195,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to start migrating slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand All @@ -194,6 +206,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand All @@ -205,6 +218,7 @@ void SlotMigrator::runMigrationProcess() {
} else {
LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " << migrating_slot_ << ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand All @@ -214,10 +228,12 @@ void SlotMigrator::runMigrationProcess() {
LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_;
current_stage_ = SlotMigrationStage::kClean;
migration_state_ = MigrationState::kSuccess;
resumeSyncCtx(s);
} else {
LOG(ERROR) << "[migrate] Failed to finish a successful migration of slot " << migrating_slot_
<< ". Error: " << s.Msg();
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
Expand Down Expand Up @@ -1083,3 +1099,17 @@ void SlotMigrator::GetMigrationInfo(std::string *info) const {
*info =
fmt::format("migrating_slot: {}\r\ndestination_node: {}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state);
}

void SlotMigrator::CancelSyncCtx() {
std::unique_lock<std::mutex> lock(blocking_mutex_);
blocking_context_ = nullptr;
}

void SlotMigrator::resumeSyncCtx(const Status &migrate_result) {
std::unique_lock<std::mutex> lock(blocking_mutex_);
if (blocking_context_) {
blocking_context_->Resume(migrate_result);

blocking_context_ = nullptr;
}
}
13 changes: 11 additions & 2 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ struct SlotMigrationJob {
int seq_gap_limit;
};

class SyncMigrateContext;

class SlotMigrator : public redis::Database {
public:
explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed,
Expand All @@ -80,8 +82,8 @@ class SlotMigrator : public redis::Database {
~SlotMigrator();

Status CreateMigrationThread();
Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed,
int pipeline_size, int seq_gap);
Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id,
SyncMigrateContext *blocking_ctx = nullptr);
void ReleaseForbiddenSlot();
void SetMaxMigrationSpeed(int value) {
if (value >= 0) max_migration_speed_ = value;
Expand All @@ -98,6 +100,7 @@ class SlotMigrator : public redis::Database {
int16_t GetForbiddenSlot() const { return forbidden_slot_; }
int16_t GetMigratingSlot() const { return migrating_slot_; }
void GetMigrationInfo(std::string *info) const;
void CancelSyncCtx();

private:
void loop();
Expand Down Expand Up @@ -131,6 +134,9 @@ class SlotMigrator : public redis::Database {
Status syncWalBeforeForbiddingSlot();
Status syncWalAfterForbiddingSlot();
void setForbiddenSlot(int16_t slot);
std::unique_lock<std::mutex> blockingLock() { return std::unique_lock<std::mutex>(blocking_mutex_); }

void resumeSyncCtx(const Status &migrate_result);

enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd };
enum class ThreadState { Uninitialized, Running, Terminated };
Expand Down Expand Up @@ -170,4 +176,7 @@ class SlotMigrator : public redis::Database {
std::atomic<bool> stop_migration_ = false; // if is true migration will be stopped but the thread won't be destroyed
const rocksdb::Snapshot *slot_snapshot_ = nullptr;
uint64_t wal_begin_seq_ = 0;

std::mutex blocking_mutex_;
SyncMigrateContext *blocking_context_ = nullptr;
};
81 changes: 81 additions & 0 deletions src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "cluster/sync_migrate_context.h"

void SyncMigrateContext::Suspend() {
auto bev = conn_->GetBufferEvent();
SetCB(bev);

if (timeout_ > 0) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int sec = static_cast<int>(timeout_);
timeval tm = {sec, static_cast<int>((timeout_ - static_cast<float>(sec)) * 1000000)};
evtimer_add(timer_.get(), &tm);
}
}

void SyncMigrateContext::Resume(const Status &migrate_result) {
migrate_result_ = migrate_result;
auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD());
if (!s.IsOK()) {
LOG(ERROR) << "[server] Failed to enable write event on the sync migrate connection " << conn_->GetFD() << ": "
<< s.Msg();
}
}

void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) {
auto &&slot_migrator = svr_->slot_migrator;

if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
timer_.reset();

slot_migrator->CancelSyncCtx();
}
conn_->OnEvent(bev, events);
}

void SyncMigrateContext::TimerCB(int, int16_t events) {
auto &&slot_migrator = svr_->slot_migrator;

conn_->Reply(redis::NilString());
timer_.reset();

slot_migrator->CancelSyncCtx();

auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

void SyncMigrateContext::OnWrite(bufferevent *bev) {
if (migrate_result_) {
conn_->Reply(redis::SimpleString("OK"));
} else {
conn_->Reply(redis::Error("ERR " + migrate_result_.Msg()));
}

timer_.reset();

conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);

bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}
43 changes: 43 additions & 0 deletions src/cluster/sync_migrate_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once
#include "event_util.h"
#include "server/server.h"

class SyncMigrateContext : private EvbufCallbackBase<SyncMigrateContext, false>,
private EventCallbackBase<SyncMigrateContext> {
public:
SyncMigrateContext(Server *svr, redis::Connection *conn, float timeout) : svr_(svr), conn_(conn), timeout_(timeout){};

void Suspend();
void Resume(const Status &migrate_result);
void OnWrite(bufferevent *bev);
void OnEvent(bufferevent *bev, int16_t events);
void TimerCB(int, int16_t events);

private:
Server *svr_;
redis::Connection *conn_;
float timeout_ = 0;
UniqueEvent timer_;

Status migrate_result_;
};
35 changes: 33 additions & 2 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "cluster/cluster_defs.h"
#include "cluster/slot_import.h"
#include "cluster/sync_migrate_context.h"
#include "commander.h"
#include "error_constants.h"

Expand Down Expand Up @@ -127,11 +128,30 @@ class CommandClusterX : public Commander {
if (subcommand_ == "setnodeid" && args_.size() == 3 && args_[2].size() == kClusterNodeIdLen) return Status::OK();

if (subcommand_ == "migrate") {
if (args.size() != 4) return {Status::RedisParseErr, errWrongNumOfArguments};
if (args.size() < 4 || args.size() > 6) return {Status::RedisParseErr, errWrongNumOfArguments};

slot_ = GET_OR_RET(ParseInt<int64_t>(args[2], 10));

dst_node_id_ = args[3];

if (args.size() >= 5) {
auto sync_flag = util::ToLower(args[4]);
if (sync_flag == "async") {
sync_migrate_ = false;

if (args.size() == 6) {
return {Status::RedisParseErr, "Async migration does not support timeout"};
}
} else if (sync_flag == "sync") {
sync_migrate_ = true;

if (args.size() == 6) {
sync_migrate_timeout_ = GET_OR_RET(ParseFloat<float>(args[5]));
}
} else {
return {Status::RedisParseErr, "Invalid sync flag"};
}
}
return Status::OK();
}

Expand Down Expand Up @@ -225,8 +245,15 @@ class CommandClusterX : public Commander {
int64_t v = svr->cluster->GetVersion();
*output = redis::BulkString(std::to_string(v));
} else if (subcommand_ == "migrate") {
Status s = svr->cluster->MigrateSlot(static_cast<int>(slot_), dst_node_id_);
if (sync_migrate_) {
sync_migrate_ctx_ = std::make_unique<SyncMigrateContext>(svr, conn, sync_migrate_timeout_);
}

Status s = svr->cluster->MigrateSlot(static_cast<int>(slot_), dst_node_id_, sync_migrate_ctx_.get());
if (s.IsOK()) {
if (sync_migrate_) {
return {Status::BlockingCmd};
}
*output = redis::SimpleString("OK");
} else {
*output = redis::Error(s.Msg());
Expand All @@ -248,6 +275,10 @@ class CommandClusterX : public Commander {
int64_t slot_ = -1;
std::vector<SlotRange> slot_ranges_;
bool force_ = false;

bool sync_migrate_ = false;
float sync_migrate_timeout_ = 0;
std::unique_ptr<SyncMigrateContext> sync_migrate_ctx_ = nullptr;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster no-script", 0, 0, 0),
Expand Down
Loading

0 comments on commit 75db3e4

Please sign in to comment.