From f3047a86671c6b2b589d27223d8d4f53609cbfdc Mon Sep 17 00:00:00 2001 From: "darion.yaphet" Date: Sun, 21 Nov 2021 03:45:58 +0800 Subject: [PATCH] remove some useless code --- src/meta/processors/admin/Balancer.cpp | 1232 ----------------- src/meta/processors/admin/Balancer.h | 269 ---- .../processors/job/BalanceJobExecutor.cpp | 58 - src/meta/processors/job/BalanceJobExecutor.h | 2 - 4 files changed, 1561 deletions(-) delete mode 100644 src/meta/processors/admin/Balancer.cpp delete mode 100644 src/meta/processors/admin/Balancer.h diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp deleted file mode 100644 index 8cdad02b45a..00000000000 --- a/src/meta/processors/admin/Balancer.cpp +++ /dev/null @@ -1,1232 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "meta/processors/admin/Balancer.h" - -#include - -#include -#include - -#include "common/network/NetworkUtils.h" -#include "common/utils/MetaKeyUtils.h" -#include "kvstore/NebulaStore.h" -#include "meta/ActiveHostsMan.h" -#include "meta/common/MetaCommon.h" -#include "meta/processors/Common.h" - -DEFINE_double(leader_balance_deviation, - 0.05, - "after leader balance, leader count should in range " - "[avg * (1 - deviation), avg * (1 + deviation)]"); - -namespace nebula { -namespace meta { - -ErrorOr Balancer::balance(std::vector&& lostHosts) { - std::lock_guard lg(lock_); - if (!running_) { - auto retCode = recovery(); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Recovery balancer failed!"; - finish(); - return retCode; - } - if (plan_ == nullptr) { - LOG(INFO) << "There is no corrupted plan need to recovery, so create a new one"; - retCode = buildBalancePlan(std::move(lostHosts)); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Create balance plan failed"; - finish(); - return retCode; - } - } - LOG(INFO) << "Start to invoke balance plan " << plan_->id(); - executor_->add(std::bind(&BalancePlan::invoke, plan_.get())); - running_ = true; - return plan_->id(); - } - CHECK(!!plan_); - LOG(INFO) << "Balance plan " << plan_->id() << " is still running"; - return plan_->id(); -} - -ErrorOr Balancer::show(BalanceID id) const { - std::lock_guard lg(lock_); - if (plan_ != nullptr && plan_->id() == id) { - return *plan_; - } - - if (kv_) { - BalancePlan plan(id, kv_, client_); - auto retCode = plan.recovery(false); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get balance plan failed, id " << id; - return retCode; - } - return plan; - } - return nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND; -} - -ErrorOr Balancer::stop() { - std::lock_guard lg(lock_); - if (!running_) { - return nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND; - } - CHECK(!!plan_); - plan_->stop(); - LOG(INFO) << "Stop balance plan " << plan_->id(); - return plan_->id(); -} - -ErrorOr Balancer::cleanLastInValidPlan() { - std::lock_guard lg(lock_); - auto* store = static_cast(kv_); - if (!store->isLeader(kDefaultSpaceId, kDefaultPartId)) { - return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; - } - if (running_) { - return nebula::cpp2::ErrorCode::E_BALANCER_RUNNING; - } - const auto& prefix = MetaKeyUtils::balancePlanPrefix(); - std::unique_ptr iter; - auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't access kvstore, ret = " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - // There should be at most one invalid plan, and it must be the latest one - if (iter->valid()) { - auto status = MetaKeyUtils::parseBalanceStatus(iter->val()); - if (status == BalanceStatus::FAILED) { - auto balanceId = MetaKeyUtils::parseBalanceID(iter->key()); - folly::Baton baton; - auto result = nebula::cpp2::ErrorCode::SUCCEEDED; - // Only remove the plan will be enough - kv_->asyncMultiRemove(kDefaultSpaceId, - kDefaultPartId, - {iter->key().str()}, - [&baton, &result](nebula::cpp2::ErrorCode code) { - result = code; - baton.post(); - }); - baton.wait(); - if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { - return result; - } - return balanceId; - } - } - return nebula::cpp2::ErrorCode::E_NO_INVALID_BALANCE_PLAN; -} - -nebula::cpp2::ErrorCode Balancer::recovery() { - CHECK(!plan_) << "plan should be nullptr now"; - if (kv_) { - auto* store = static_cast(kv_); - if (!store->isLeader(kDefaultSpaceId, kDefaultPartId)) { - // We need to check whether is leader or not, otherwise we would failed to - // persist state of BalancePlan and BalanceTask, so we just reject request - // if not leader. - return nebula::cpp2::ErrorCode::E_LEADER_CHANGED; - } - const auto& prefix = MetaKeyUtils::balancePlanPrefix(); - std::unique_ptr iter; - auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't access kvstore, ret = " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - std::vector corruptedPlans; - // The balance plan is stored with balance id desc order, there should be at - // most one failed or in_progress plan, and it must be the latest one - if (iter->valid()) { - auto status = MetaKeyUtils::parseBalanceStatus(iter->val()); - if (status == BalanceStatus::IN_PROGRESS || status == BalanceStatus::FAILED) { - auto balanceId = MetaKeyUtils::parseBalanceID(iter->key()); - corruptedPlans.emplace_back(balanceId); - } - } - if (corruptedPlans.empty()) { - LOG(INFO) << "No corrupted plan need to recovery!"; - return nebula::cpp2::ErrorCode::SUCCEEDED; - } - - CHECK_EQ(1, corruptedPlans.size()); - plan_ = std::make_unique(corruptedPlans[0], kv_, client_); - plan_->onFinished_ = [this]() { - auto self = plan_; - { - std::lock_guard lg(lock_); - if (LastUpdateTimeMan::update(kv_, time::WallClock::fastNowInMilliSec()) != - nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Balance plan " << plan_->id() << " update meta failed"; - } - finish(); - } - }; - auto recRet = plan_->recovery(); - if (recRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't recovery plan " << corruptedPlans[0]; - return recRet; - } - } - // save the balance plan again because FAILED tasks would be marked as - // IN_PROGRESS again - return plan_->saveInStore(); -} - -nebula::cpp2::ErrorCode Balancer::getAllSpaces( - std::vector>& spaces) { - // Get all spaces - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - const auto& prefix = MetaKeyUtils::spacePrefix(); - std::unique_ptr iter; - auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get all spaces failed, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - while (iter->valid()) { - auto spaceId = MetaKeyUtils::spaceId(iter->key()); - auto properties = MetaKeyUtils::parseSpace(iter->val()); - bool zoned = properties.group_name_ref().has_value(); - spaces.emplace_back(spaceId, *properties.replica_factor_ref(), zoned); - iter->next(); - } - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - -nebula::cpp2::ErrorCode Balancer::buildBalancePlan(std::vector&& lostHosts) { - if (plan_ != nullptr) { - LOG(ERROR) << "Balance plan should be nullptr now"; - return nebula::cpp2::ErrorCode::E_BALANCED; - } - - std::vector> spaces; - auto spacesRet = getAllSpaces(spaces); - if (spacesRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't get all spaces"; - return spacesRet; - } - - plan_ = std::make_unique(time::WallClock::fastNowInSec(), kv_, client_); - for (const auto& spaceInfo : spaces) { - auto spaceId = std::get<0>(spaceInfo); - auto spaceReplica = std::get<1>(spaceInfo); - auto dependentOnGroup = std::get<2>(spaceInfo); - LOG(INFO) << "Balance Space " << spaceId; - auto taskRet = genTasks(spaceId, spaceReplica, dependentOnGroup, std::move(lostHosts)); - if (!ok(taskRet)) { - LOG(ERROR) << "Generate tasks on space " << std::get<0>(spaceInfo) << " failed"; - return error(taskRet); - } - - auto tasks = std::move(value(taskRet)); - for (auto& task : tasks) { - plan_->addTask(std::move(task)); - } - } - - plan_->onFinished_ = [this]() { - auto self = plan_; - { - std::lock_guard lg(lock_); - if (LastUpdateTimeMan::update(kv_, time::WallClock::fastNowInMilliSec()) != - nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Balance plan " << plan_->id() << " update meta failed"; - } - finish(); - } - }; - if (plan_->tasks_.empty()) { - return nebula::cpp2::ErrorCode::E_BALANCED; - } - return plan_->saveInStore(); -} - -ErrorOr> Balancer::genTasks( - GraphSpaceID spaceId, - int32_t spaceReplica, - bool dependentOnGroup, - std::vector&& lostHosts) { - HostParts hostParts; - int32_t totalParts = 0; - // hostParts is current part allocation map - auto result = getHostParts(spaceId, dependentOnGroup, hostParts, totalParts); - if (!nebula::ok(result)) { - return nebula::error(result); - } - - auto retVal = nebula::value(result); - if (!retVal || totalParts == 0 || hostParts.empty()) { - LOG(ERROR) << "Invalid space " << spaceId; - return nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND; - } - - auto fetchHostPartsRet = fetchHostParts(spaceId, dependentOnGroup, hostParts, lostHosts); - if (!nebula::ok(fetchHostPartsRet)) { - LOG(ERROR) << "Fetch hosts and parts failed"; - return nebula::error(fetchHostPartsRet); - } - - auto hostPartsRet = nebula::value(fetchHostPartsRet); - auto confirmedHostParts = hostPartsRet.first; - auto activeHosts = hostPartsRet.second; - LOG(INFO) << "Now, try to balance the confirmedHostParts"; - - // We have two parts need to balance, the first one is parts on lost hosts and - // deleted hosts The seconds one is parts on unbalanced host in - // confirmedHostParts. - std::vector tasks; - // 1. Iterate through all hosts that would not be included in - // confirmedHostParts, - // move all parts in them to host with minimum part in confirmedHostParts - for (auto& lostHost : lostHosts) { - auto& lostParts = hostParts[lostHost]; - for (auto& partId : lostParts) { - LOG(INFO) << "Try balance part " << partId << " for lost host " << lostHost; - // check whether any peers which is alive - auto alive = checkReplica(hostParts, activeHosts, spaceReplica, partId); - if (!alive.ok()) { - LOG(ERROR) << "Check Replica failed: " << alive << " Part: " << partId; - return nebula::cpp2::ErrorCode::E_NO_VALID_HOST; - } - - auto retCode = - transferLostHost(tasks, confirmedHostParts, lostHost, spaceId, partId, dependentOnGroup); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Transfer lost host " << lostHost << " failed"; - return retCode; - } - } - } - - // 2. Make all hosts in confirmedHostParts balanced - if (balanceParts(plan_->id_, spaceId, confirmedHostParts, totalParts, tasks, dependentOnGroup)) { - return tasks; - } else { - return nebula::cpp2::ErrorCode::E_BAD_BALANCE_PLAN; - } -} - -nebula::cpp2::ErrorCode Balancer::transferLostHost(std::vector& tasks, - HostParts& confirmedHostParts, - const HostAddr& source, - GraphSpaceID spaceId, - PartitionID partId, - bool dependentOnGroup) { - // find a host with minimum parts which doesn't have this part - ErrorOr result; - if (dependentOnGroup) { - result = hostWithMinimalPartsForZone(source, confirmedHostParts, partId); - } else { - result = hostWithMinimalParts(confirmedHostParts, partId); - } - - if (!nebula::ok(result)) { - LOG(ERROR) << "Can't find a host which doesn't have part: " << partId; - return nebula::error(result); - } - const auto& targetHost = nebula::value(result); - confirmedHostParts[targetHost].emplace_back(partId); - tasks.emplace_back(plan_->id_, spaceId, partId, source, targetHost, kv_, client_); - zoneParts_[targetHost].second.emplace_back(partId); - auto zoneIt = - std::find(zoneParts_[source].second.begin(), zoneParts_[source].second.end(), partId); - if (zoneIt == zoneParts_[source].second.end()) { - LOG(ERROR) << "part not find " << partId << " at " << source; - } - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - -ErrorOr>> -Balancer::fetchHostParts(GraphSpaceID spaceId, - bool dependentOnGroup, - const HostParts& hostParts, - std::vector& lostHosts) { - ErrorOr> activeHostsRet; - if (dependentOnGroup) { - activeHostsRet = ActiveHostsMan::getActiveHostsWithGroup(kv_, spaceId); - } else { - activeHostsRet = ActiveHostsMan::getActiveHosts(kv_); - } - - if (!nebula::ok(activeHostsRet)) { - return nebula::error(activeHostsRet); - } - - std::vector expand; - auto activeHosts = nebula::value(activeHostsRet); - calDiff(hostParts, activeHosts, expand, lostHosts); - // confirmedHostParts is new part allocation map after balance, it would - // include newlyAdded and exclude lostHosts - HostParts confirmedHostParts(hostParts); - for (const auto& h : expand) { - LOG(INFO) << "Found new host " << h; - confirmedHostParts.emplace(h, std::vector()); - } - for (const auto& h : lostHosts) { - LOG(INFO) << "Lost host " << h; - confirmedHostParts.erase(h); - } - return std::make_pair(confirmedHostParts, activeHosts); -} - -bool Balancer::balanceParts(BalanceID balanceId, - GraphSpaceID spaceId, - HostParts& confirmedHostParts, - int32_t totalParts, - std::vector& tasks, - bool dependentOnGroup) { - auto avgLoad = static_cast(totalParts) / confirmedHostParts.size(); - VLOG(3) << "The expect avg load is " << avgLoad; - int32_t minLoad = std::floor(avgLoad); - int32_t maxLoad = std::ceil(avgLoad); - VLOG(3) << "The min load is " << minLoad << " max load is " << maxLoad; - - auto sortedHosts = sortedHostsByParts(confirmedHostParts); - if (sortedHosts.empty()) { - LOG(ERROR) << "Host is empty"; - return false; - } - - auto maxPartsHost = sortedHosts.back(); - auto minPartsHost = sortedHosts.front(); - auto& sourceHost = maxPartsHost.first; - auto& targetHost = minPartsHost.first; - if (innerBalance_) { - LOG(INFO) << "maxPartsHost.first " << maxPartsHost.first << " minPartsHost.first " - << minPartsHost.first; - while (!checkZoneLegal(maxPartsHost.first, minPartsHost.first)) { - sortedHosts.pop_back(); - maxPartsHost = sortedHosts.back(); - } - - auto& source = maxPartsHost.first; - auto iter = std::find_if(zoneParts_.begin(), zoneParts_.end(), [&source](const auto& pair) { - return source == pair.first; - }); - - auto& zoneName = iter->second.first; - int32_t hostsSize = zoneHosts_[zoneName].size(); - int32_t totalPartsZone = 0; - for (auto& host : zoneHosts_[zoneName]) { - auto it = confirmedHostParts.find(host); - if (it == confirmedHostParts.end()) { - LOG(ERROR) << "Host " << host << "not in confirmedHostParts"; - continue; - } - totalPartsZone += it->second.size(); - } - - avgLoad = static_cast(totalPartsZone) / hostsSize; - minLoad = std::floor(avgLoad); - maxLoad = std::ceil(avgLoad); - LOG(INFO) << "Update min and max loading Total parts in zone " << totalPartsZone - << ", total hosts " << hostsSize << " The expect avg load is " << avgLoad - << " The min load is " << minLoad << " max load is " << maxLoad; - } - - while (maxPartsHost.second > maxLoad || minPartsHost.second < minLoad) { - auto& partsFrom = confirmedHostParts[maxPartsHost.first]; - auto& partsTo = confirmedHostParts[minPartsHost.first]; - std::sort(partsFrom.begin(), partsFrom.end()); - std::sort(partsTo.begin(), partsTo.end()); - - LOG(INFO) << maxPartsHost.first << ":" << partsFrom.size() << " -> " << minPartsHost.first - << ":" << partsTo.size(); - std::vector diff; - std::set_difference(partsFrom.begin(), - partsFrom.end(), - partsTo.begin(), - partsTo.end(), - std::inserter(diff, diff.begin())); - bool noAction = true; - for (auto& partId : diff) { - LOG(INFO) << "partsFrom size " << partsFrom.size() << " partsTo size " << partsTo.size() - << " minLoad " << minLoad << " maxLoad " << maxLoad; - if (partsFrom.size() == partsTo.size() + 1 || - partsFrom.size() == static_cast(minLoad) || - partsTo.size() == static_cast(maxLoad)) { - VLOG(3) << "No need to move any parts from " << maxPartsHost.first << " to " - << minPartsHost.first; - break; - } - - LOG(INFO) << "[space:" << spaceId << ", part:" << partId << "] " << maxPartsHost.first << "->" - << minPartsHost.first; - auto it = std::find(partsFrom.begin(), partsFrom.end(), partId); - if (it == partsFrom.end()) { - LOG(ERROR) << "Part " << partId << " not found in partsFrom"; - return false; - } - - if (std::find(partsTo.begin(), partsTo.end(), partId) != partsTo.end()) { - LOG(ERROR) << "Part " << partId << " already existed in partsTo"; - return false; - } - - if (dependentOnGroup) { - if (!checkZoneLegal(sourceHost, targetHost)) { - LOG(INFO) << "sourceHost " << sourceHost << " targetHost " << targetHost - << " not same zone"; - - auto& parts = relatedParts_[targetHost]; - auto minIt = std::find(parts.begin(), parts.end(), partId); - if (minIt != parts.end()) { - LOG(INFO) << "Part " << partId << " have existed"; - continue; - } - } - - auto& sourceNoneName = zoneParts_[sourceHost].first; - auto sourceHosts = zoneHosts_.find(sourceNoneName); - for (auto& sh : sourceHosts->second) { - auto& parts = relatedParts_[sh]; - auto maxIt = std::find(parts.begin(), parts.end(), partId); - if (maxIt == parts.end()) { - LOG(INFO) << "Part " << partId << " not found on " << sh; - continue; - } - parts.erase(maxIt); - } - - auto& targetNoneName = zoneParts_[targetHost].first; - auto targetHosts = zoneHosts_.find(targetNoneName); - for (auto& th : targetHosts->second) { - relatedParts_[th].emplace_back(partId); - } - } - - partsFrom.erase(it); - partsTo.emplace_back(partId); - tasks.emplace_back( - balanceId, spaceId, partId, maxPartsHost.first, minPartsHost.first, kv_, client_); - noAction = false; - } - - if (noAction) { - LOG(INFO) << "Here is no action"; - break; - } - sortedHosts = sortedHostsByParts(confirmedHostParts); - maxPartsHost = sortedHosts.back(); - minPartsHost = sortedHosts.front(); - if (innerBalance_) { - while (!checkZoneLegal(maxPartsHost.first, minPartsHost.first)) { - sortedHosts.pop_back(); - maxPartsHost = sortedHosts.back(); - } - - auto& source = maxPartsHost.first; - auto iter = std::find_if(zoneParts_.begin(), zoneParts_.end(), [&source](const auto& pair) { - return source == pair.first; - }); - - auto& zoneName = iter->second.first; - int32_t hostsSize = zoneHosts_[zoneName].size(); - int32_t totalPartsZone = 0; - for (auto& host : zoneHosts_[zoneName]) { - auto it = confirmedHostParts.find(host); - if (it == confirmedHostParts.end()) { - LOG(ERROR) << "Host " << host << "not in confirmedHostParts"; - continue; - } - totalPartsZone += it->second.size(); - } - - avgLoad = static_cast(totalPartsZone) / hostsSize; - minLoad = std::floor(avgLoad); - maxLoad = std::ceil(avgLoad); - LOG(INFO) << "Update min and max loading Total parts in zone " << totalPartsZone - << ", total hosts " << hostsSize << " The expect avg load is " << avgLoad - << " The min load is " << minLoad << " max load is " << maxLoad; - } - } - LOG(INFO) << "Balance tasks num: " << tasks.size(); - for (auto& task : tasks) { - LOG(INFO) << task.taskIdStr(); - } - - relatedParts_.clear(); - return true; -} - -ErrorOr Balancer::getHostParts(GraphSpaceID spaceId, - bool dependentOnGroup, - HostParts& hostParts, - int32_t& totalParts) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - const auto& prefix = MetaKeyUtils::partPrefix(spaceId); - std::unique_ptr iter; - auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId << " " - << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); - auto partHosts = MetaKeyUtils::parsePartVal(iter->val()); - for (auto& ph : partHosts) { - hostParts[ph].emplace_back(partId); - } - totalParts++; - iter->next(); - } - - LOG(INFO) << "Host size: " << hostParts.size(); - auto key = MetaKeyUtils::spaceKey(spaceId); - std::string value; - retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, key, &value); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId - << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - auto properties = MetaKeyUtils::parseSpace(value); - if (totalParts != properties.get_partition_num()) { - LOG(ERROR) << "Partition number not equals"; - LOG(ERROR) << totalParts << " : " << properties.get_partition_num(); - return false; - } - - int32_t replica = properties.get_replica_factor(); - LOG(INFO) << "Replica " << replica; - if (dependentOnGroup && properties.group_name_ref().has_value()) { - auto groupName = *properties.group_name_ref(); - auto groupKey = MetaKeyUtils::groupKey(groupName); - std::string groupValue; - retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, groupKey, &groupValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get group " << groupName - << " failed: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - int32_t zoneSize = MetaKeyUtils::parseZoneNames(std::move(groupValue)).size(); - LOG(INFO) << "Zone Size " << zoneSize; - innerBalance_ = (replica == zoneSize); - - auto activeHostsRet = ActiveHostsMan::getActiveHostsWithGroup(kv_, spaceId); - if (!nebula::ok(activeHostsRet)) { - return nebula::error(activeHostsRet); - } - - std::vector expand; - auto activeHosts = nebula::value(activeHostsRet); - std::vector lostHosts; - calDiff(hostParts, activeHosts, expand, lostHosts); - // confirmedHostParts is new part allocation map after balance, it would include newlyAdded - // and exclude lostHosts - HostParts confirmedHostParts(hostParts); - for (const auto& h : expand) { - LOG(INFO) << "Found new host " << h; - confirmedHostParts.emplace(h, std::vector()); - } - for (const auto& h : lostHosts) { - LOG(INFO) << "Lost host " << h; - confirmedHostParts.erase(h); - } - - auto zonePartsRet = assembleZoneParts(groupName, confirmedHostParts); - if (zonePartsRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Assemble Zone Parts failed group: " << groupName; - return zonePartsRet; - } - } - - totalParts *= replica; - return true; -} - -nebula::cpp2::ErrorCode Balancer::assembleZoneParts(const std::string& groupName, - HostParts& hostParts) { - LOG(INFO) << "Balancer assembleZoneParts"; - auto groupKey = MetaKeyUtils::groupKey(groupName); - std::string groupValue; - auto retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, groupKey, &groupValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get group " << groupName - << " failed: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - // zoneHosts use to record this host belong to zone's hosts - std::unordered_map, std::vector> zoneHosts; - auto zoneNames = MetaKeyUtils::parseZoneNames(std::move(groupValue)); - for (auto zoneName : zoneNames) { - LOG(INFO) << "Zone Name: " << zoneName; - auto zoneKey = MetaKeyUtils::zoneKey(zoneName); - std::string zoneValue; - retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zone " << zoneName - << " failed: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - auto hosts = MetaKeyUtils::parseZoneHosts(std::move(zoneValue)); - for (const auto& host : hosts) { - LOG(INFO) << "Host for zone " << host; - auto pair = std::pair(std::move(host), zoneName); - auto& hs = zoneHosts[std::move(pair)]; - hs.insert(hs.end(), hosts.begin(), hosts.end()); - } - } - - for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - auto host = it->first; - LOG(INFO) << "Host: " << host; - auto zoneIter = - std::find_if(zoneHosts.begin(), zoneHosts.end(), [host](const auto& pair) -> bool { - return host == pair.first.first; - }); - - if (zoneIter == zoneHosts.end()) { - LOG(INFO) << it->first << " have lost"; - continue; - } - - auto& hosts = zoneIter->second; - auto name = zoneIter->first.second; - zoneHosts_[name] = hosts; - for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) { - auto partIter = hostParts.find(*hostIter); - LOG(INFO) << "Zone " << name << " have the host " << it->first; - if (partIter == hostParts.end()) { - zoneParts_[it->first] = ZoneNameAndParts(name, std::vector()); - } else { - zoneParts_[it->first] = ZoneNameAndParts(name, partIter->second); - } - } - } - - for (auto it = zoneHosts.begin(); it != zoneHosts.end(); it++) { - auto host = it->first.first; - auto& hosts = it->second; - for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) { - auto h = *hostIter; - auto iter = std::find_if(hostParts.begin(), hostParts.end(), [h](const auto& pair) -> bool { - return h == pair.first; - }); - - if (iter == hostParts.end()) { - continue; - } - - auto& parts = iter->second; - auto& hp = relatedParts_[host]; - hp.insert(hp.end(), parts.begin(), parts.end()); - } - } - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - -void Balancer::calDiff(const HostParts& hostParts, - const std::vector& activeHosts, - std::vector& expand, - std::vector& lost) { - for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - VLOG(1) << "Original Host " << it->first << ", parts " << it->second.size(); - if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end() && - std::find(lost.begin(), lost.end(), it->first) == lost.end()) { - lost.emplace_back(it->first); - } - } - for (auto& h : activeHosts) { - VLOG(1) << "Active host " << h; - if (hostParts.find(h) == hostParts.end()) { - expand.emplace_back(h); - } - } -} - -std::vector> Balancer::sortedHostsByParts(const HostParts& hostParts) { - std::vector> hosts; - for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - LOG(INFO) << "Host " << it->first << " parts " << it->second.size(); - hosts.emplace_back(it->first, it->second.size()); - } - std::sort(hosts.begin(), hosts.end(), [](const auto& l, const auto& r) { - if (l.second != r.second) { - return l.second < r.second; - } else { - return l.first.host < r.first.host; - } - }); - return hosts; -} - -Status Balancer::checkReplica(const HostParts& hostParts, - const std::vector& activeHosts, - int32_t replica, - PartitionID partId) { - // check host hold the part and alive - auto checkPart = [&](const auto& entry) { - auto& host = entry.first; - auto& parts = entry.second; - return std::find(parts.begin(), parts.end(), partId) != parts.end() && - std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end(); - }; - auto aliveReplica = std::count_if(hostParts.begin(), hostParts.end(), checkPart); - if (aliveReplica >= replica / 2 + 1) { - return Status::OK(); - } - return Status::Error("Not enough alive host hold the part %d", partId); -} - -ErrorOr Balancer::hostWithMinimalParts( - const HostParts& hostParts, PartitionID partId) { - auto hosts = sortedHostsByParts(hostParts); - for (auto& h : hosts) { - auto it = hostParts.find(h.first); - if (it == hostParts.end()) { - LOG(ERROR) << "Host " << h.first << " not found"; - return nebula::cpp2::ErrorCode::E_NO_HOSTS; - } - - if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) { - return h.first; - } - } - return nebula::cpp2::ErrorCode::E_NO_HOSTS; -} - -ErrorOr Balancer::hostWithMinimalPartsForZone( - const HostAddr& source, const HostParts& hostParts, PartitionID partId) { - auto hosts = sortedHostsByParts(hostParts); - for (auto& h : hosts) { - auto it = hostParts.find(h.first); - if (it == hostParts.end()) { - LOG(ERROR) << "Host " << h.first << " not found"; - return nebula::cpp2::ErrorCode::E_NO_HOSTS; - } - - LOG(INFO) << "source " << source << " h.first " << h.first; - if (std::find(it->second.begin(), it->second.end(), partId) == it->second.end()) { - return h.first; - } - } - return nebula::cpp2::ErrorCode::E_NO_HOSTS; -} - -nebula::cpp2::ErrorCode Balancer::leaderBalance() { - if (running_) { - LOG(INFO) << "Balance process still running"; - return nebula::cpp2::ErrorCode::E_BALANCER_RUNNING; - } - - folly::Promise promise; - auto future = promise.getFuture(); - // Space ID, Replica Factor and Dependent On Group - std::vector> spaces; - auto ret = getAllSpaces(spaces); - if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Can't get spaces"; - // TODO unify error code - if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { - ret = nebula::cpp2::ErrorCode::E_STORE_FAILURE; - } - return ret; - } - - bool expected = false; - if (inLeaderBalance_.compare_exchange_strong(expected, true)) { - hostLeaderMap_.reset(new HostLeaderMap); - auto status = client_->getLeaderDist(hostLeaderMap_.get()).get(); - if (!status.ok() || hostLeaderMap_->empty()) { - LOG(ERROR) << "Get leader distribution failed"; - inLeaderBalance_ = false; - return nebula::cpp2::ErrorCode::E_RPC_FAILURE; - } - - std::vector> futures; - for (const auto& spaceInfo : spaces) { - auto spaceId = std::get<0>(spaceInfo); - auto replicaFactor = std::get<1>(spaceInfo); - auto dependentOnGroup = std::get<2>(spaceInfo); - LeaderBalancePlan plan; - auto balanceResult = buildLeaderBalancePlan( - hostLeaderMap_.get(), spaceId, replicaFactor, dependentOnGroup, plan); - if (!nebula::ok(balanceResult) || !nebula::value(balanceResult)) { - LOG(ERROR) << "Building leader balance plan failed " - << "Space: " << spaceId; - continue; - } - simplifyLeaderBalancePlan(spaceId, plan); - for (const auto& task : plan) { - futures.emplace_back(client_->transLeader(std::get<0>(task), - std::get<1>(task), - std::move(std::get<2>(task)), - std::move(std::get<3>(task)))); - } - } - - int32_t failed = 0; - folly::collectAll(futures) - .via(executor_.get()) - .thenTry([&](const auto& result) { - auto tries = result.value(); - for (const auto& t : tries) { - if (!t.value().ok()) { - ++failed; - } - } - }) - .wait(); - - inLeaderBalance_ = false; - if (failed != 0) { - LOG(ERROR) << failed << " partition failed to transfer leader"; - } - return nebula::cpp2::ErrorCode::SUCCEEDED; - } - return nebula::cpp2::ErrorCode::E_BALANCER_RUNNING; -} - -ErrorOr Balancer::buildLeaderBalancePlan( - HostLeaderMap* hostLeaderMap, - GraphSpaceID spaceId, - int32_t replicaFactor, - bool dependentOnGroup, - LeaderBalancePlan& plan, - bool useDeviation) { - PartAllocation peersMap; - HostParts leaderHostParts; - size_t leaderParts = 0; - // store peers of all partitions in peerMap - folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock()); - const auto& prefix = MetaKeyUtils::partPrefix(spaceId); - std::unique_ptr iter; - auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Access kvstore failed, spaceId " << spaceId << static_cast(retCode); - return retCode; - } - - while (iter->valid()) { - auto key = iter->key(); - PartitionID partId; - memcpy(&partId, key.data() + prefix.size(), sizeof(PartitionID)); - auto peers = MetaKeyUtils::parsePartVal(iter->val()); - peersMap[partId] = std::move(peers); - ++leaderParts; - iter->next(); - } - - int32_t totalParts = 0; - HostParts allHostParts; - auto result = getHostParts(spaceId, dependentOnGroup, allHostParts, totalParts); - if (!nebula::ok(result)) { - return nebula::error(result); - } else { - auto retVal = nebula::value(result); - if (!retVal || totalParts == 0 || allHostParts.empty()) { - LOG(ERROR) << "Invalid space " << spaceId; - return false; - } - } - - std::unordered_set activeHosts; - for (const auto& host : *hostLeaderMap) { - // only balance leader between hosts which have valid partition - if (!allHostParts[host.first].empty()) { - activeHosts.emplace(host.first); - leaderHostParts[host.first] = (*hostLeaderMap)[host.first][spaceId]; - } - } - - if (activeHosts.empty()) { - LOG(ERROR) << "No active hosts"; - return false; - } - - if (dependentOnGroup) { - for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) { - auto min = it->second.size() / replicaFactor; - VLOG(3) << "Host: " << it->first << " Bounds: " << min << " : " << min + 1; - hostBounds_[it->first] = std::make_pair(min, min + 1); - } - } else { - size_t activeSize = activeHosts.size(); - size_t globalAvg = leaderParts / activeSize; - size_t globalMin = globalAvg; - size_t globalMax = globalAvg; - if (leaderParts % activeSize != 0) { - globalMax += 1; - } - - if (useDeviation) { - globalMin = std::ceil(static_cast(leaderParts) / activeSize * - (1 - FLAGS_leader_balance_deviation)); - globalMax = std::floor(static_cast(leaderParts) / activeSize * - (1 + FLAGS_leader_balance_deviation)); - } - VLOG(3) << "Build leader balance plan, expected min load: " << globalMin - << ", max load: " << globalMax << " avg: " << globalAvg; - - for (auto it = allHostParts.begin(); it != allHostParts.end(); it++) { - hostBounds_[it->first] = std::make_pair(globalMin, globalMax); - } - } - - while (true) { - int32_t taskCount = 0; - bool hasUnbalancedHost = false; - for (const auto& hostEntry : leaderHostParts) { - auto host = hostEntry.first; - auto& hostMinLoad = hostBounds_[host].first; - auto& hostMaxLoad = hostBounds_[host].second; - int32_t partSize = hostEntry.second.size(); - if (hostMinLoad <= partSize && partSize <= hostMaxLoad) { - VLOG(3) << partSize << " is between min load " << hostMinLoad << " and max load " - << hostMaxLoad; - continue; - } - - hasUnbalancedHost = true; - if (partSize < hostMinLoad) { - // need to acquire leader from other hosts - LOG(INFO) << "Acquire leaders to host: " << host << " loading: " << partSize - << " min loading " << hostMinLoad; - taskCount += acquireLeaders( - allHostParts, leaderHostParts, peersMap, activeHosts, host, plan, spaceId); - } else { - // need to transfer leader to other hosts - LOG(INFO) << "Giveup leaders from host: " << host << " loading: " << partSize - << " max loading " << hostMaxLoad; - taskCount += giveupLeaders(leaderHostParts, peersMap, activeHosts, host, plan, spaceId); - } - } - - // If every host is balanced or no more task during this loop, then the plan - // is done - if (!hasUnbalancedHost || taskCount == 0) { - LOG(INFO) << "Not need balance"; - break; - } - } - return true; -} - -int32_t Balancer::acquireLeaders(HostParts& allHostParts, - HostParts& leaderHostParts, - PartAllocation& peersMap, - std::unordered_set& activeHosts, - const HostAddr& target, - LeaderBalancePlan& plan, - GraphSpaceID spaceId) { - // host will loop for the partition which is not leader, and try to acquire the - // leader - int32_t taskCount = 0; - std::vector diff; - std::set_difference(allHostParts[target].begin(), - allHostParts[target].end(), - leaderHostParts[target].begin(), - leaderHostParts[target].end(), - std::back_inserter(diff)); - auto& targetLeaders = leaderHostParts[target]; - size_t minLoad = hostBounds_[target].first; - for (const auto& partId : diff) { - VLOG(3) << "Try acquire leader for part " << partId; - // find the leader of partId - auto sources = peersMap[partId]; - for (const auto& source : sources) { - if (source == target || !activeHosts.count(source)) { - continue; - } - - // if peer is the leader of partId and can transfer, then transfer it to - // host - auto& sourceLeaders = leaderHostParts[source]; - VLOG(3) << "Check peer: " << source << " min load: " << minLoad - << " peerLeaders size: " << sourceLeaders.size(); - auto it = std::find(sourceLeaders.begin(), sourceLeaders.end(), partId); - if (it != sourceLeaders.end() && minLoad < sourceLeaders.size()) { - sourceLeaders.erase(it); - targetLeaders.emplace_back(partId); - plan.emplace_back(spaceId, partId, source, target); - LOG(INFO) << "acquire plan trans leader space: " << spaceId << " part: " << partId - << " from " << source.host << ":" << source.port << " to " << target.host << ":" - << target.port; - ++taskCount; - break; - } - } - - // if host has enough leader, just return - if (targetLeaders.size() == minLoad) { - LOG(INFO) << "Host: " << target << "'s leader reach " << minLoad; - break; - } - } - return taskCount; -} - -int32_t Balancer::giveupLeaders(HostParts& leaderParts, - PartAllocation& peersMap, - std::unordered_set& activeHosts, - const HostAddr& source, - LeaderBalancePlan& plan, - GraphSpaceID spaceId) { - int32_t taskCount = 0; - auto& sourceLeaders = leaderParts[source]; - size_t maxLoad = hostBounds_[source].second; - - // host will try to transfer the extra leaders to other peers - for (auto it = sourceLeaders.begin(); it != sourceLeaders.end();) { - // find the leader of partId - auto partId = *it; - const auto& targets = peersMap[partId]; - bool isErase = false; - - // leader should move to the peer with lowest loading - auto target = - std::min_element(targets.begin(), targets.end(), [&](const auto& l, const auto& r) -> bool { - if (source == l || !activeHosts.count(l)) { - return false; - } - return leaderParts[l].size() < leaderParts[r].size(); - }); - - // If peer can accept this partition leader, than host will transfer to the - // peer - if (target != targets.end()) { - auto& targetLeaders = leaderParts[*target]; - int32_t targetLeaderSize = targetLeaders.size(); - if (targetLeaderSize < hostBounds_[*target].second) { - it = sourceLeaders.erase(it); - targetLeaders.emplace_back(partId); - plan.emplace_back(spaceId, partId, source, *target); - LOG(INFO) << "giveup plan trans leader space: " << spaceId << " part: " << partId - << " from " << source.host << ":" << source.port << " to " << target->host << ":" - << target->port; - ++taskCount; - isErase = true; - } - } - - // if host has enough leader, just return - if (sourceLeaders.size() == maxLoad) { - LOG(INFO) << "Host: " << source << "'s leader reach " << maxLoad; - break; - } - - if (!isErase) { - ++it; - } - } - return taskCount; -} - -void Balancer::simplifyLeaderBalancePlan(GraphSpaceID spaceId, LeaderBalancePlan& plan) { - // Within a leader balance plan, a partition may be moved several times, but - // actually we only need to transfer the leadership of a partition from the - // first host to the last host, and ignore the intermediate ones - std::unordered_map buckets; - for (auto& task : plan) { - buckets[std::get<1>(task)].emplace_back(task); - } - plan.clear(); - for (const auto& partEntry : buckets) { - plan.emplace_back(spaceId, - partEntry.first, - std::get<2>(partEntry.second.front()), - std::get<3>(partEntry.second.back())); - } -} - -nebula::cpp2::ErrorCode Balancer::collectZoneParts(const std::string& groupName, - HostParts& hostParts) { - auto groupKey = MetaKeyUtils::groupKey(groupName); - std::string groupValue; - auto retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, groupKey, &groupValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get group " << groupName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - // zoneHosts use to record this host belong to zone's hosts - std::unordered_map, std::vector> zoneHosts; - auto zoneNames = MetaKeyUtils::parseZoneNames(std::move(groupValue)); - for (auto zoneName : zoneNames) { - auto zoneKey = MetaKeyUtils::zoneKey(zoneName); - std::string zoneValue; - retCode = kv_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zone " << zoneName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - auto hosts = MetaKeyUtils::parseZoneHosts(std::move(zoneValue)); - for (const auto& host : hosts) { - auto pair = std::pair(std::move(host), zoneName); - auto& hs = zoneHosts[std::move(pair)]; - hs.insert(hs.end(), hosts.begin(), hosts.end()); - } - } - - for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - auto host = it->first; - auto zoneIter = - std::find_if(zoneHosts.begin(), zoneHosts.end(), [host](const auto& pair) -> bool { - return host == pair.first.first; - }); - - if (zoneIter == zoneHosts.end()) { - LOG(INFO) << it->first << " have lost"; - continue; - } - - auto& hosts = zoneIter->second; - auto name = zoneIter->first.second; - for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) { - auto partIter = hostParts.find(*hostIter); - if (partIter == hostParts.end()) { - zoneParts_[it->first] = ZoneNameAndParts(name, std::vector()); - } else { - zoneParts_[it->first] = ZoneNameAndParts(name, partIter->second); - } - } - } - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - -bool Balancer::checkZoneLegal(const HostAddr& source, const HostAddr& target) { - VLOG(3) << "Check " << source << " : " << target; - auto sourceIter = std::find_if(zoneParts_.begin(), zoneParts_.end(), [&source](const auto& pair) { - return source == pair.first; - }); - - if (sourceIter == zoneParts_.end()) { - LOG(INFO) << "Source " << source << " not found"; - return false; - } - - auto targetIter = std::find_if(zoneParts_.begin(), zoneParts_.end(), [&target](const auto& pair) { - return target == pair.first; - }); - - if (targetIter == zoneParts_.end()) { - LOG(INFO) << "Target " << target << " not found"; - return false; - } - - LOG(INFO) << sourceIter->second.first << " : " << targetIter->second.first; - return sourceIter->second.first == targetIter->second.first; -} - -} // namespace meta -} // namespace nebula diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h deleted file mode 100644 index 4a5331ee2a4..00000000000 --- a/src/meta/processors/admin/Balancer.h +++ /dev/null @@ -1,269 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef META_ADMIN_BALANCER_H_ -#define META_ADMIN_BALANCER_H_ - -#include -#include - -#include "common/network/NetworkUtils.h" -#include "common/time/WallClock.h" -#include "kvstore/KVStore.h" -#include "meta/processors/admin/AdminClient.h" -#include "meta/processors/admin/BalancePlan.h" -#include "meta/processors/admin/BalanceTask.h" - -namespace nebula { -namespace meta { - -using HostParts = std::unordered_map>; -using PartAllocation = std::unordered_map>; -using LeaderBalancePlan = std::vector>; -using ZoneNameAndParts = std::pair>; - -/** -There are two interfaces public: - * Balance: it will construct a balance plan and invoked it. If last balance -plan is not succeeded, it will - * try to resume it. - * - * Rollback: In many cases, if some plan failed forever, we call this interface -to rollback. - -Some notes: -1. Balance will generate balance plan according to current active hosts and -parts allocation -2. For the plan, we hope after moving the least parts , it will reach a -reasonable state. -3. Only one balance plan could be invoked at the same time. -4. Each balance plan has one id, and we could show the status by "balance id" -command and after FO, we could resume the balance plan by type "balance" again. -5. Each balance plan contains many balance tasks, the task represents the -minimum movement unit. -6. We save the whole balancePlan state in kvstore to do failover. -7. Each balance task contains serval steps. And it should be executed step by -step. -8. One task failed will result in the whole balance plan failed. -9. Currently, we hope tasks for the same part could be invoked serially - * */ -class Balancer { - FRIEND_TEST(BalanceTest, BalancePartsTest); - FRIEND_TEST(BalanceTest, NormalTest); - FRIEND_TEST(BalanceTest, SimpleTestWithZone); - FRIEND_TEST(BalanceTest, SpecifyHostTest); - FRIEND_TEST(BalanceTest, SpecifyMultiHostTest); - FRIEND_TEST(BalanceTest, MockReplaceMachineTest); - FRIEND_TEST(BalanceTest, SingleReplicaTest); - FRIEND_TEST(BalanceTest, TryToRecoveryTest); - FRIEND_TEST(BalanceTest, RecoveryTest); - FRIEND_TEST(BalanceTest, StopPlanTest); - FRIEND_TEST(BalanceTest, CleanLastInvalidBalancePlanTest); - FRIEND_TEST(BalanceTest, LeaderBalancePlanTest); - FRIEND_TEST(BalanceTest, SimpleLeaderBalancePlanTest); - FRIEND_TEST(BalanceTest, IntersectHostsLeaderBalancePlanTest); - FRIEND_TEST(BalanceTest, LeaderBalanceTest); - FRIEND_TEST(BalanceTest, ManyHostsLeaderBalancePlanTest); - FRIEND_TEST(BalanceTest, LeaderBalanceWithZoneTest); - FRIEND_TEST(BalanceTest, LeaderBalanceWithLargerZoneTest); - FRIEND_TEST(BalanceTest, LeaderBalanceWithComplexZoneTest); - FRIEND_TEST(BalanceTest, ExpansionZoneTest); - FRIEND_TEST(BalanceTest, ExpansionHostIntoZoneTest); - FRIEND_TEST(BalanceTest, ShrinkZoneTest); - FRIEND_TEST(BalanceTest, ShrinkHostFromZoneTest); - FRIEND_TEST(BalanceTest, BalanceWithComplexZoneTest); - FRIEND_TEST(BalanceIntegrationTest, LeaderBalanceTest); - FRIEND_TEST(BalanceIntegrationTest, BalanceTest); - - public: - static Balancer* instance(kvstore::KVStore* kv) { - static std::unique_ptr client(new AdminClient(kv)); - static std::unique_ptr balancer(new Balancer(kv, client.get())); - return balancer.get(); - } - - ~Balancer() = default; - - /* - * Return Error if reject the balance request, otherwise return balance id. - * */ - ErrorOr balance(std::vector&& lostHosts = {}); - - /** - * Show balance plan id status. - * */ - ErrorOr show(BalanceID id) const; - - /** - * Stop balance plan by canceling all waiting balance task. - * */ - ErrorOr stop(); - - /** - * Clean invalid plan, return the invalid plan key if any - * */ - ErrorOr cleanLastInValidPlan(); - - /** - * TODO(heng): rollback some balance plan. - */ - Status rollback(BalanceID id) { return Status::Error("unimplemented, %ld", id); } - - /** - * TODO(heng): Execute balance plan from outside. - * */ - Status execute(BalancePlan plan) { - UNUSED(plan); - return Status::Error("Unsupport it yet!"); - } - - /** - * TODO(heng): Execute specific balance plan by id. - * */ - Status execute(BalanceID id) { - UNUSED(id); - return Status::Error("Unsupport it yet!"); - } - - nebula::cpp2::ErrorCode leaderBalance(); - - void finish() { - CHECK(!lock_.try_lock()); - plan_.reset(); - running_ = false; - } - - bool isRunning() { - std::lock_guard lg(lock_); - return running_; - } - - private: - Balancer(kvstore::KVStore* kv, AdminClient* client) : kv_(kv), client_(client) { - executor_.reset(new folly::CPUThreadPoolExecutor(1)); - } - /* - * When the balancer failover, we should recovery the status. - * */ - nebula::cpp2::ErrorCode recovery(); - - /** - * Build balance plan and save it in kvstore. - * */ - nebula::cpp2::ErrorCode buildBalancePlan(std::vector&& lostHosts); - - ErrorOr> genTasks( - GraphSpaceID spaceId, - int32_t spaceReplica, - bool dependentOnGroup, - std::vector&& lostHosts); - - ErrorOr>> fetchHostParts( - GraphSpaceID spaceId, - bool dependentOnGroup, - const HostParts& hostParts, - std::vector& lostHosts); - - ErrorOr getHostParts(GraphSpaceID spaceId, - bool dependentOnGroup, - HostParts& hostParts, - int32_t& totalParts); - - nebula::cpp2::ErrorCode assembleZoneParts(const std::string& groupName, HostParts& hostParts); - - void calDiff(const HostParts& hostParts, - const std::vector& activeHosts, - std::vector& newlyAdded, - std::vector& lost); - - Status checkReplica(const HostParts& hostParts, - const std::vector& activeHosts, - int32_t replica, - PartitionID partId); - - ErrorOr hostWithMinimalParts(const HostParts& hostParts, - PartitionID partId); - - ErrorOr hostWithMinimalPartsForZone(const HostAddr& source, - const HostParts& hostParts, - PartitionID partId); - - bool balanceParts(BalanceID balanceId, - GraphSpaceID spaceId, - HostParts& newHostParts, - int32_t totalParts, - std::vector& tasks, - bool dependentOnGroup); - - nebula::cpp2::ErrorCode transferLostHost(std::vector& tasks, - HostParts& newHostParts, - const HostAddr& source, - GraphSpaceID spaceId, - PartitionID partId, - bool dependentOnGroup); - - std::vector> sortedHostsByParts(const HostParts& hostParts); - - nebula::cpp2::ErrorCode getAllSpaces( - std::vector>& spaces); - - ErrorOr buildLeaderBalancePlan(HostLeaderMap* hostLeaderMap, - GraphSpaceID spaceId, - int32_t replicaFactor, - bool dependentOnGroup, - LeaderBalancePlan& plan, - bool useDeviation = true); - - void simplifyLeaderBalancePlan(GraphSpaceID spaceId, LeaderBalancePlan& plan); - - int32_t acquireLeaders(HostParts& allHostParts, - HostParts& leaderHostParts, - PartAllocation& peersMap, - std::unordered_set& activeHosts, - const HostAddr& target, - LeaderBalancePlan& plan, - GraphSpaceID spaceId); - - int32_t giveupLeaders(HostParts& leaderHostParts, - PartAllocation& peersMap, - std::unordered_set& activeHosts, - const HostAddr& source, - LeaderBalancePlan& plan, - GraphSpaceID spaceId); - - nebula::cpp2::ErrorCode collectZoneParts(const std::string& groupName, HostParts& hostParts); - - bool checkZoneLegal(const HostAddr& source, const HostAddr& target); - - private: - std::atomic_bool running_{false}; - kvstore::KVStore* kv_{nullptr}; - AdminClient* client_{nullptr}; - // Current running plan. - std::shared_ptr plan_{nullptr}; - std::unique_ptr executor_; - std::atomic_bool inLeaderBalance_{false}; - - // Host => Graph => Partitions - std::unique_ptr hostLeaderMap_; - mutable std::mutex lock_; - - std::unordered_map> hostBounds_; - - // TODO: (darion) nesting map maybe better - std::unordered_map zoneParts_; - std::unordered_map> zoneHosts_; - - // if the space dependent on group, it use to record the partition - // contained in the zone related to the node. - std::unordered_map> relatedParts_; - - bool innerBalance_ = false; -}; - -} // namespace meta -} // namespace nebula - -#endif // META_ADMIN_BALANCER_H_ diff --git a/src/meta/processors/job/BalanceJobExecutor.cpp b/src/meta/processors/job/BalanceJobExecutor.cpp index 81ee4aa6231..56ebaaaba4e 100644 --- a/src/meta/processors/job/BalanceJobExecutor.cpp +++ b/src/meta/processors/job/BalanceJobExecutor.cpp @@ -755,64 +755,6 @@ ErrorOr DataBalanceJobExecutor::hostWithMinim return nebula::cpp2::ErrorCode::E_NO_HOSTS; } -nebula::cpp2::ErrorCode BalanceJobExecutor::collectZoneParts(const std::string& groupName, - HostParts& hostParts) { - auto groupKey = MetaKeyUtils::groupKey(groupName); - std::string groupValue; - auto retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, groupKey, &groupValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get group " << groupName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - // zoneHosts use to record this host belong to zone's hosts - std::unordered_map, std::vector> zoneHosts; - auto zoneNames = MetaKeyUtils::parseZoneNames(std::move(groupValue)); - for (auto zoneName : zoneNames) { - auto zoneKey = MetaKeyUtils::zoneKey(zoneName); - std::string zoneValue; - retCode = kvstore_->get(kDefaultSpaceId, kDefaultPartId, zoneKey, &zoneValue); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get zone " << zoneName - << " failed, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; - } - - auto hosts = MetaKeyUtils::parseZoneHosts(std::move(zoneValue)); - for (const auto& host : hosts) { - auto pair = std::pair(std::move(host), zoneName); - auto& hs = zoneHosts[std::move(pair)]; - hs.insert(hs.end(), hosts.begin(), hosts.end()); - } - } - - for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - auto host = it->first; - auto zoneIter = - std::find_if(zoneHosts.begin(), zoneHosts.end(), [host](const auto& pair) -> bool { - return host == pair.first.first; - }); - - if (zoneIter == zoneHosts.end()) { - LOG(INFO) << it->first << " have lost"; - continue; - } - - auto& hosts = zoneIter->second; - auto name = zoneIter->first.second; - for (auto hostIter = hosts.begin(); hostIter != hosts.end(); hostIter++) { - auto partIter = hostParts.find(*hostIter); - if (partIter == hostParts.end()) { - zoneParts_[it->first] = ZoneNameAndParts(name, std::vector()); - } else { - zoneParts_[it->first] = ZoneNameAndParts(name, partIter->second); - } - } - } - return nebula::cpp2::ErrorCode::SUCCEEDED; -} - bool DataBalanceJobExecutor::checkZoneLegal(const HostAddr& source, const HostAddr& target) { VLOG(3) << "Check " << source << " : " << target; auto sourceIter = std::find_if(zoneParts_.begin(), zoneParts_.end(), [&source](const auto& pair) { diff --git a/src/meta/processors/job/BalanceJobExecutor.h b/src/meta/processors/job/BalanceJobExecutor.h index 52440bf6eaf..0de93f5e16e 100644 --- a/src/meta/processors/job/BalanceJobExecutor.h +++ b/src/meta/processors/job/BalanceJobExecutor.h @@ -59,8 +59,6 @@ class BalanceJobExecutor : public MetaJobExecutor { nebula::cpp2::ErrorCode assembleZoneParts(const std::string& groupName, HostParts& hostParts); - nebula::cpp2::ErrorCode collectZoneParts(const std::string& groupName, HostParts& hostParts); - nebula::cpp2::ErrorCode save(const std::string& k, const std::string& v); protected: