Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support clean last invalid balance plan #2414

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/graph/BalanceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ void BalanceExecutor::execute() {
balanceLeader();
break;
case BalanceSentence::SubType::kData:
balanceData();
balanceData(false, false);
break;
case BalanceSentence::SubType::kDataStop:
balanceData(true);
balanceData(true, false);
break;
case BalanceSentence::SubType::kDataReset:
balanceData(false, true);
break;
case BalanceSentence::SubType::kShowBalancePlan:
showBalancePlan();
Expand Down Expand Up @@ -66,14 +69,15 @@ void BalanceExecutor::balanceLeader() {
std::move(future).via(runner).thenValue(cb).thenError(error);
}

void BalanceExecutor::balanceData(bool isStop) {
void BalanceExecutor::balanceData(bool isStop, bool isReset) {
std::vector<HostAddr> hostDelList;
auto hostDel = sentence_->hostDel();
if (hostDel != nullptr) {
hostDelList = hostDel->hosts();
}
auto future = ectx()->getMetaClient()->balance(std::move(hostDelList),
isStop);
isStop,
isReset);
auto *runner = ectx()->rctx()->runner();

auto cb = [this] (auto &&resp) {
Expand Down
2 changes: 1 addition & 1 deletion src/graph/BalanceExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BalanceExecutor final : public Executor {

void balanceLeader();

void balanceData(bool isStop = false);
void balanceData(bool isStop, bool isReset);

void stopBalanceData();

Expand Down
2 changes: 2 additions & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum ErrorCode {
E_NO_RUNNING_BALANCE_PLAN = -36,
E_NO_VALID_HOST = -37,
E_CORRUPTTED_BALANCE_PLAN = -38,
E_NO_INVALID_BALANCE_PLAN = -39,

// Authentication Failure
E_INVALID_PASSWORD = -41,
Expand Down Expand Up @@ -562,6 +563,7 @@ struct BalanceReq {
2: optional i64 id,
3: optional list<common.HostAddr> host_del,
4: optional bool stop,
5: optional bool reset,
}

enum TaskResult {
Expand Down
6 changes: 5 additions & 1 deletion src/meta/client/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,8 @@ MetaClient::getUserRoles(std::string account) {
}

folly::Future<StatusOr<int64_t>> MetaClient::balance(std::vector<HostAddr> hostDel,
bool isStop) {
bool isStop,
bool isReset) {
cpp2::BalanceReq req;
if (!hostDel.empty()) {
std::vector<nebula::cpp2::HostAddr> tHostDel;
Expand All @@ -2008,6 +2009,9 @@ folly::Future<StatusOr<int64_t>> MetaClient::balance(std::vector<HostAddr> hostD
if (isStop) {
req.set_stop(isStop);
}
if (isReset) {
req.set_reset(isReset);
}

folly::Promise<StatusOr<int64_t>> promise;
auto future = promise.getFuture();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/client/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ class MetaClient {

// Operations for admin
folly::Future<StatusOr<int64_t>>
balance(std::vector<HostAddr> hostDel, bool isStop = false);
balance(std::vector<HostAddr> hostDel, bool isStop, bool isReset);

folly::Future<StatusOr<std::vector<cpp2::BalanceTask>>>
showBalance(int64_t balanceId);
Expand Down
6 changes: 2 additions & 4 deletions src/meta/processors/admin/BalancePlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,8 @@ cpp2::ErrorCode BalancePlan::recovery(bool resume) {
task.startTimeMs_ = std::get<2>(tup);
task.endTimeMs_ = std::get<3>(tup);
if (resume && task.ret_ != BalanceTask::Result::SUCCEEDED) {
// Resume the failed task, skip the in-progress and invalid tasks
if (task.ret_ == BalanceTask::Result::FAILED) {
task.ret_ = BalanceTask::Result::IN_PROGRESS;
}
// Resume any task not finished
task.ret_ = BalanceTask::Result::IN_PROGRESS;
task.status_ = BalanceTask::Status::START;
if (!ActiveHostsMan::isLived(kv_, task.dst_)) {
task.ret_ = BalanceTask::Result::INVALID;
Expand Down
3 changes: 2 additions & 1 deletion src/meta/processors/admin/BalancePlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class BalancePlan {
FRIEND_TEST(BalanceTest, SingleReplicaTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, DispatchTasksTest);
FRIEND_TEST(BalanceTest, StopBalanceDataTest);
FRIEND_TEST(BalanceTest, StopAndRecoverTest);
FRIEND_TEST(BalanceTest, CleanLastInvalidBalancePlanTest);

public:
enum class Status : uint8_t {
Expand Down
17 changes: 17 additions & 0 deletions src/meta/processors/admin/BalanceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) {
onFinished();
return;
}
if (req.get_reset() != nullptr) {
if (!(*req.get_reset())) {
handleErrorCode(cpp2::ErrorCode::E_UNKNOWN);
onFinished();
return;
}
auto plan = Balancer::instance(kvstore_)->cleanLastInValidPlan();
if (!ok(plan)) {
handleErrorCode(error(plan));
onFinished();
return;
}
resp_.set_id(value(plan));
handleErrorCode(cpp2::ErrorCode::SUCCEEDED);
onFinished();
return;
}
if (req.get_id() != nullptr) {
auto ret = Balancer::instance(kvstore_)->show(*req.get_id());
if (!ret.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/BalanceTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void BalanceTask::invoke() {
endTimeMs_ = time::WallClock::fastNowInMilliSec();
saveInStore();
LOG(ERROR) << taskIdStr_ << "Task invalid, status " << static_cast<int32_t>(status_);
onFinished_();
onError_();
return;
}
if (ret_ == Result::FAILED) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/BalanceTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BalanceTask {
FRIEND_TEST(BalanceTest, SingleReplicaTest);
FRIEND_TEST(BalanceTest, NormalTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, StopBalanceDataTest);
FRIEND_TEST(BalanceTest, StopAndRecoverTest);

public:
enum class Status : uint8_t {
Expand Down
42 changes: 42 additions & 0 deletions src/meta/processors/admin/Balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,48 @@ StatusOr<BalanceID> Balancer::stop() {
return plan_->id();
}

ErrorOr<cpp2::ErrorCode, BalanceID> Balancer::cleanLastInValidPlan() {
std::lock_guard<std::mutex> lg(lock_);
auto* store = static_cast<kvstore::NebulaStore*>(kv_);
if (!store->isLeader(kDefaultSpaceId, kDefaultPartId)) {
return cpp2::ErrorCode::E_LEADER_CHANGED;
}
if (running_) {
return cpp2::ErrorCode::E_BALANCER_RUNNING;
}
const auto& prefix = BalancePlan::prefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Can't access kvstore, ret = " << static_cast<int32_t>(ret);
return MetaCommon::to(ret);
}
// There should be at most one invalid plan, and it must be the latest one
while (iter->valid()) {
auto status = BalancePlan::status(iter->val());
if (status == BalancePlan::Status::FAILED) {
auto balanceId = BalancePlan::id(iter->key());
folly::Baton<true, std::atomic> baton;
cpp2::ErrorCode result = cpp2::ErrorCode::SUCCEEDED;
// Only remove the plan will be enough
kv_->asyncMultiRemove(kDefaultSpaceId,
kDefaultPartId,
{iter->key().str()},
[&baton, &result] (kvstore::ResultCode code) {
result = MetaCommon::to(code);
baton.post();
});
baton.wait();
if (result != cpp2::ErrorCode::SUCCEEDED) {
return result;
}
return balanceId;
}
break;
}
return cpp2::ErrorCode::E_NO_INVALID_BALANCE_PLAN;
}

cpp2::ErrorCode Balancer::recovery() {
CHECK(!plan_) << "plan should be nullptr now";
if (kv_) {
Expand Down
8 changes: 7 additions & 1 deletion src/meta/processors/admin/Balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class Balancer {
FRIEND_TEST(BalanceTest, MockReplaceMachineTest);
FRIEND_TEST(BalanceTest, SingleReplicaTest);
FRIEND_TEST(BalanceTest, RecoveryTest);
FRIEND_TEST(BalanceTest, StopBalanceDataTest);
FRIEND_TEST(BalanceTest, StopAndRecoverTest);
FRIEND_TEST(BalanceTest, CleanLastInvalidBalancePlanTest);
FRIEND_TEST(BalanceTest, LeaderBalancePlanTest);
FRIEND_TEST(BalanceTest, SimpleLeaderBalancePlanTest);
FRIEND_TEST(BalanceTest, IntersectHostsLeaderBalancePlanTest);
Expand Down Expand Up @@ -81,6 +82,11 @@ class Balancer {
* */
StatusOr<BalanceID> stop();

/**
* Clean invalid plan, return the invalid plan key if any
* */
ErrorOr<cpp2::ErrorCode, BalanceID> cleanLastInValidPlan();

/**
* TODO(heng): rollback some balance plan.
*/
Expand Down
88 changes: 73 additions & 15 deletions src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,8 @@ TEST(BalanceTest, RecoveryTest) {
}
ASSERT_EQ(6, num);
}
LOG(INFO) << "Now let's recovery it.";
LOG(INFO) << "Now let's try to recovery it. Since the the host will expired in 1 second, "
<< "the src host would be regarded as offline, so all task will be invalid";
std::vector<Status> normalSts(7, Status::OK());
static_cast<TestFaultInjector*>(balancer.client_->faultInjector())->reset(std::move(normalSts));
ret = balancer.balance();
Expand All @@ -908,7 +909,7 @@ TEST(BalanceTest, RecoveryTest) {
auto id = BalancePlan::id(iter->key());
auto status = BalancePlan::status(iter->val());
ASSERT_EQ(balanceId, id);
ASSERT_EQ(BalancePlan::Status::SUCCEEDED, status);
ASSERT_EQ(BalancePlan::Status::FAILED, status);
num++;
iter->next();
}
Expand All @@ -934,6 +935,7 @@ TEST(BalanceTest, RecoveryTest) {
{
auto tup = BalanceTask::parseVal(iter->val());
task.status_ = std::get<0>(tup);
ASSERT_EQ(BalanceTask::Status::START, task.status_);
task.ret_ = std::get<1>(tup);
ASSERT_EQ(BalanceTask::Result::INVALID, task.ret_);
task.startTimeMs_ = std::get<2>(tup);
Expand All @@ -948,7 +950,7 @@ TEST(BalanceTest, RecoveryTest) {
}
}

TEST(BalanceTest, StopBalanceDataTest) {
TEST(BalanceTest, StopAndRecoverTest) {
FLAGS_task_concurrency = 1;
fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv(TestUtils::initKV(rootPath.path()));
Expand All @@ -972,8 +974,8 @@ TEST(BalanceTest, StopBalanceDataTest) {
sleep(1);
TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}});
std::vector<Status> sts(9, Status::OK());
std::unique_ptr<FaultInjector> injector(new TestFaultInjectorWithSleep(std::move(sts)));
auto client = std::make_unique<AdminClient>(std::move(injector));
std::unique_ptr<FaultInjector> sleepInjector(new TestFaultInjectorWithSleep(std::move(sts)));
auto client = std::make_unique<AdminClient>(std::move(sleepInjector));
Balancer balancer(kv.get(), std::move(client));
auto ret = balancer.balance();
CHECK(ok(ret));
Expand Down Expand Up @@ -1034,40 +1036,96 @@ TEST(BalanceTest, StopBalanceDataTest) {
ASSERT_EQ(5, taskStopped);
}

std::unique_ptr<FaultInjector> normalInjector(
new TestFaultInjector(std::vector<Status>(9, Status::OK())));
balancer.client_ = std::make_unique<AdminClient>(std::move(normalInjector));
TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}});
ret = balancer.balance();
CHECK(ok(ret));
ASSERT_NE(value(ret), balanceId);
ASSERT_EQ(value(ret), balanceId);
// resume stopped plan
sleep(1);
// all task should succeed
{
const auto& prefix = BalanceTask::prefix(balanceId);
std::unique_ptr<kvstore::KVIterator> iter;
auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
int32_t num = 0;
int32_t taskStarted = 0;
int32_t taskEnded = 0;
while (iter->valid()) {
BalanceTask task;
{
auto tup = BalanceTask::parseVal(iter->val());
task.status_ = std::get<0>(tup);
ASSERT_EQ(BalanceTask::Status::END, task.status_);
task.ret_ = std::get<1>(tup);
ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_);
task.startTimeMs_ = std::get<2>(tup);
task.endTimeMs_ = std::get<3>(tup);
if (task.status_ == BalanceTask::Status::END) {
++taskEnded;
} else if (task.status_ == BalanceTask::Status::START) {
++taskStarted;
}
}
num++;
iter->next();
}
ASSERT_EQ(6, num);
EXPECT_EQ(5, taskStarted);
EXPECT_EQ(1, taskEnded);
}
}

TEST(BalanceTest, CleanLastInvalidBalancePlanTest) {
FLAGS_task_concurrency = 1;
fs::TempDir rootPath("/tmp/BalanceTest.XXXXXX");
std::unique_ptr<kvstore::KVStore> kv(TestUtils::initKV(rootPath.path()));
FLAGS_expired_threshold_sec = 1;
TestUtils::createSomeHosts(kv.get());
{
cpp2::SpaceProperties properties;
properties.set_space_name("default_space");
properties.set_partition_num(8);
properties.set_replica_factor(3);
cpp2::CreateSpaceReq req;
req.set_properties(std::move(properties));
auto* processor = CreateSpaceProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(cpp2::ErrorCode::SUCCEEDED, resp.code);
ASSERT_EQ(1, resp.get_id().get_space_id());
}

sleep(1);
TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}});
std::vector<Status> sts(9, Status::OK());
std::unique_ptr<FaultInjector> injector(new TestFaultInjectorWithSleep(std::move(sts)));
auto client = std::make_unique<AdminClient>(std::move(injector));
Balancer balancer(kv.get(), std::move(client));
auto ret = balancer.balance();
CHECK(ok(ret));
auto balanceId = value(ret);

sleep(1);
// stop the running plan
TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}});
auto stopRet = balancer.stop();
CHECK(stopRet.ok());
ASSERT_EQ(stopRet.value(), balanceId);

// wait until the plan finished, no running plan for now
sleep(5);
TestUtils::registerHB(kv.get(), {{0, 0}, {1, 1}, {2, 2}});
auto cleanRet = balancer.cleanLastInValidPlan();
CHECK(ok(cleanRet));
ASSERT_EQ(value(cleanRet), balanceId);

{
const auto& prefix = BalancePlan::prefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto retcode = kv->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
ASSERT_EQ(retcode, kvstore::ResultCode::SUCCEEDED);
int num = 0;
while (iter->valid()) {
num++;
iter->next();
}
ASSERT_EQ(0, num);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/parser/AdminSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ class BalanceSentence final : public Sentence {
kLeader,
kData,
kDataStop,
kDataReset,
kShowBalancePlan,
};

Expand Down
Loading