Skip to content

Commit

Permalink
add cluster_slot_batch ref to apache#529
Browse files Browse the repository at this point in the history
Signed-off-by: clundro <[email protected]>
  • Loading branch information
infdahai committed May 1, 2023
1 parent 02034fb commit 2d17033
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 42 deletions.
114 changes: 85 additions & 29 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,32 +93,27 @@ Status Cluster::SetNodeId(const std::string &node_id) {
return Status::OK();
}

// Set the slot to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages, since we only update one
// slot distribution and there are 16384 slot in our design.
//
// Set the slots to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages. By th way,there are 16384 slots
// in our design.
// The reason why the new version MUST be +1 of current version is that,
// the command changes topology based on specific topology (also means specific
// version), we must guarantee current topology is exactly expected, otherwise,
// this update may make topology corrupt, so base topology version is very important.
// This is different with CLUSTERX SETNODES commands because it uses new version
// topology to cover current version, it allows kvrocks nodes lost some topology
// updates since of network failure, it is state instead of operation.
Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_version) {
// Parameters check
Status Cluster::SetSlot(const std::string &slots_str, const std::string &node_id, int64_t new_version) {
// the parsing process takes some time, so let's check other arguments first.
if (new_version <= 0 || new_version != version_ + 1) {
return {Status::NotOK, errInvalidClusterVersion};
}

if (!IsValidSlot(slot)) {
return {Status::NotOK, errInvalidSlotID};
}

if (node_id.size() != kClusterNodeIdLen) {
return {Status::NotOK, errInvalidNodeID};
}

// Get the node which we want to assign a slot into it
// Get the node which we want to assign slots into it
std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
if (to_assign_node == nullptr) {
return {Status::NotOK, "No this node in the cluster"};
Expand All @@ -128,30 +123,43 @@ Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_versio
return {Status::NotOK, errNoMasterNode};
}

std::vector<std::pair<int, int>> slots;
Status s = parseSlotRange(slots_str, slots);
if (!s.OK()) {
return s;
}

// Update version
version_ = new_version;

// Update topology
// 1. Remove the slot from old node if existing
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlot().
for (const auto &slot_range : slots) {
int s_start = slot_range.first, s_end = slot_range.second;
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
}
}
}
}

Expand Down Expand Up @@ -465,7 +473,7 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptr<Clus
std::vector<SlotInfo::NodeInfo> vn;
vn.push_back({n->host, n->port, n->id}); // itself

for (const auto &id : n->replicas) { // replicas
for (const auto &id : n->replicas) { // replicas
if (nodes_.find(id) == nodes_.end()) continue;
vn.push_back({nodes_[id]->host, nodes_[id]->port, nodes_[id]->id});
}
Expand Down Expand Up @@ -641,6 +649,54 @@ Status Cluster::LoadClusterNodes(const std::string &file_path) {
return SetClusterNodes(nodes_info, version, false);
}

// TODO: maybe it needs to use a more precise error type to represent `NotOk`.
Status Cluster::parseSlotRange(const std::string &slots_str, std::vector<std::pair<int, int>> &slots) {
if (slots_str.empty()) {
return {Status::NotOK, "Don't use empty slots."};
}
std::vector<std::string> slot_ranges = util::Split(slots_str, " ");

if (slot_ranges.empty()) {
return {Status::NotOK, fmt::format("Invalid slots: {}. Please use ' ' to space slots.", slots_str)};
}

auto is_number = [&](const std::string &s) {
#if __cplusplus >= 202002L
return std::ranges::all_of(s.begin(), s.end(), [](char c) { return isdigit(c) != 0; });
#else
for (char c : s) {
if (isdigit(c) == 0) {
return false;
}
}
return true;
#endif
};

// Parse all slots(include slot ranges)
for (const auto &slot_range : slot_ranges) {
if (is_number(slot_range)) {
int s_start = stoi(slot_range);
assert(IsValidSlot(s_start));
slots.emplace_back(std::make_pair(s_start, s_start));
continue;
}

// parse slot range: "int1-int2" (satisfy: int1 <= int2 )
assert(slot_range.back() != '-');
std::vector<std::string> fields = util::Split(slot_range, "-");
assert(fields.size() == 2);
if (is_number(fields[0]) && is_number(fields[1])) {
int s_start = stoi(fields[0]), s_end = stoi(fields[1]);
assert((s_start <= s_end) && IsValidSlot(s_end));
slots.emplace_back(std::make_pair(s_start, s_end));
}
return {Status::NotOK, fmt::format("Invalid slot range: {}", slot_range)};
}

return Status::OK();
}

Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes) {
std::vector<std::string> nodes_info = util::Split(nodes_str, "\n");
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Cluster {
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(const std::string &node_id);
Status SetSlot(int slot, const std::string &node_id, int64_t version);
Status SetSlot(const std::string& slots_str, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Status SetSlotImported(int slot);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Expand All @@ -104,6 +104,7 @@ class Cluster {
std::string genNodesInfo();
void updateSlotsInfo();
SlotInfo genSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
static Status parseSlotRange(const std::string&slots_str,std::vector<std::pair<int,int>> &slots);
static Status parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
Server *svr_;
Expand Down
15 changes: 3 additions & 12 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,7 @@ class CommandClusterX : public Commander {

// CLUSTERX SETSLOT $SLOT_ID NODE $NODE_ID $VERSION
if (subcommand_ == "setslot" && args_.size() == 6) {
auto parse_id = ParseInt<int>(args[2], 10);
if (!parse_id) {
return {Status::RedisParseErr, errValueNotInteger};
}

slot_id_ = *parse_id;

if (!Cluster::IsValidSlot(slot_id_)) {
return {Status::RedisParseErr, "Invalid slot id"};
}
slots_str_ = args_[2];

if (strcasecmp(args_[3].c_str(), "node") != 0) {
return {Status::RedisParseErr, "Invalid setslot options"};
Expand Down Expand Up @@ -219,7 +210,7 @@ class CommandClusterX : public Commander {
*output = redis::Error(s.Msg());
}
} else if (subcommand_ == "setslot") {
Status s = svr->cluster->SetSlot(slot_id_, args_[4], set_version_);
Status s = svr->cluster->SetSlot(slots_str_, args_[4], set_version_);
if (s.IsOK()) {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
Expand Down Expand Up @@ -251,7 +242,7 @@ class CommandClusterX : public Commander {
std::string dst_node_id_;
int64_t set_version_ = 0;
int64_t slot_ = -1;
int slot_id_ = -1;
std::string slots_str_;
bool force_ = false;
};

Expand Down

0 comments on commit 2d17033

Please sign in to comment.