Skip to content

Commit

Permalink
Minor refactoring of slot migration source code (#1373)
Browse files Browse the repository at this point in the history
  • Loading branch information
torwig authored Apr 7, 2023
1 parent 4b5831e commit 8d5ccbc
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 271 deletions.
16 changes: 8 additions & 8 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_versio
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrate_->ClearKeysOfSlot(kDefaultNamespace, slot);
svr_->slot_migrator_->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
Expand Down Expand Up @@ -231,7 +231,7 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
if (!migrated_slots_.empty()) {
for (auto &it : migrated_slots_) {
if (slots_nodes_[it.first] != myself_) {
svr_->slot_migrate_->ClearKeysOfSlot(kDefaultNamespace, it.first);
svr_->slot_migrator_->ClearKeysOfSlot(kDefaultNamespace, it.first);
}
}
}
Expand Down Expand Up @@ -322,9 +322,9 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
}

const auto dst = nodes_[dst_node_id];
Status s = svr_->slot_migrate_->MigrateStart(svr_, dst_node_id, dst->host_, dst->port_, slot,
svr_->GetConfig()->migrate_speed, svr_->GetConfig()->pipeline_size,
svr_->GetConfig()->sequence_gap);
Status s = svr_->slot_migrator_->PerformSlotMigration(
dst_node_id, dst->host_, dst->port_, slot, svr_->GetConfig()->migrate_speed, svr_->GetConfig()->pipeline_size,
svr_->GetConfig()->sequence_gap);
return s;
}

Expand All @@ -351,7 +351,7 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
object_ptr->StopForLinkError(capture_fd);
};
// Stop forbidding writing slot to accept write commands
if (slot == svr_->slot_migrate_->GetForbiddenSlot()) svr_->slot_migrate_->ReleaseForbiddenSlot();
if (slot == svr_->slot_migrator_->GetForbiddenSlot()) svr_->slot_migrator_->ReleaseForbiddenSlot();
LOG(INFO) << "[import] Start importing slot " << slot;
break;
case kImportSuccess:
Expand Down Expand Up @@ -416,7 +416,7 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
if (myself_ != nullptr && myself_->role_ == kClusterMaster && !svr_->IsSlave()) {
// Get migrating status
std::string migrate_infos;
svr_->slot_migrate_->GetMigrateInfo(&migrate_infos);
svr_->slot_migrator_->GetMigrationInfo(&migrate_infos);
*cluster_infos += migrate_infos;

// Get importing status
Expand Down Expand Up @@ -758,7 +758,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return Status::OK();
}

bool Cluster::IsWriteForbiddenSlot(int slot) { return svr_->slot_migrate_->GetForbiddenSlot() == slot; }
bool Cluster::IsWriteForbiddenSlot(int slot) { return svr_->slot_migrator_->GetForbiddenSlot() == slot; }

Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
Redis::Connection *conn) {
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/slot_import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SlotImport::SlotImport(Server *svr)
import_fd_(-1) {
std::lock_guard<std::mutex> guard(mutex_);
// Let metadata_cf_handle_ be nullptr, then get them in real time while use them.
// See comments in SlotMigrate::SlotMigrate for detailed reason.
// See comments in SlotMigrator::SlotMigrator for detailed reason.
metadata_cf_handle_ = nullptr;
}

Expand Down Expand Up @@ -100,7 +100,7 @@ void SlotImport::StopForLinkError(int fd) {

// Maybe server has failovered
// Situation:
// Refer to the situation described in SlotMigrate::SlotMigrate
// Refer to the situation described in SlotMigrator::SlotMigrator
// 1. Change server to slave when it is importing data.
// 2. Source server's migration process end after destination server has finished replication.
// 3. The migration link closed by source server, then this function will be call by OnEvent.
Expand Down
Loading

0 comments on commit 8d5ccbc

Please sign in to comment.