Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor recover job #4515

Merged
merged 2 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ Status MetaClient::handleResponse(const RESP& resp) {
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
return Status::Error("Job not existed in chosen space!");
case nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER:
return Status::Error("Need to recover failed data balance job firstly!");
return Status::Error("Need to recover or stop failed data balance job firstly!");
case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE:
return Status::Error("Backup empty table!");
case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED:
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq

auto retCode = jobMgr_->checkNeedRecoverJobExist(spaceId_, type);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "There is a failed data balance job, need to recover it firstly!";
LOG(INFO) << "There is a failed data balance job, need to recover or stop it firstly!";
return retCode;
}

Expand All @@ -145,7 +145,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq
}

JobDescription jobDesc(spaceId_, nebula::value(jobId), type, paras);
auto errorCode = jobMgr_->addJob(jobDesc, adminClient_);
auto errorCode = jobMgr_->addJob(std::move(jobDesc), adminClient_);
if (errorCode == nebula::cpp2::ErrorCode::SUCCEEDED) {
result.job_id_ref() = nebula::value(jobId);
}
Expand Down
311 changes: 136 additions & 175 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,26 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
auto optJob = nebula::value(optJobRet);
std::unique_ptr<JobExecutor> je =
JobExecutorFactory::createJobExecutor(optJob, kvStore_, adminClient_);
// Only balance has been recovered
// Only balance would change
if (optJob.getStatus() == cpp2::JobStatus::RUNNING && je->isMetaJob()) {
jds.emplace_back(optJob);
jds.emplace_back(std::move(optJob));
Copy link
Contributor

@critical27 critical27 Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this would break the rule below?

for a balance job:
(1) if there's a newer finished balance job, the stopped or failed balance job can't be recovered.
(2) only the lasted stopped or failed balance job could be recovered

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the guarantee is at jobmanager.cpp:832

} else if (optJob.getStatus() == cpp2::JobStatus::QUEUE) {
auto mutexIter = muJobFinished_.find(optJob.getSpace());
if (mutexIter == muJobFinished_.end()) {
mutexIter =
muJobFinished_.emplace(optJob.getSpace(), std::make_unique<std::recursive_mutex>())
.first;
}
std::lock_guard<std::recursive_mutex> lk(*(mutexIter->second));
auto spaceId = optJob.getSpace();
auto jobId = optJob.getJobId();
enqueue(spaceId, jobId, JbOp::ADD, optJob.getJobType());
inFlightJobs_.emplace(std::move(jobId), std::move(optJob));
}
}
}
for (auto& jd : jds) {
jd.setStatus(cpp2::JobStatus::QUEUE, true);
jd.setStatus(cpp2::JobStatus::FAILED, true);
Copy link
Contributor

@critical27 critical27 Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you prefer FAILED instead of QUEUED?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the rule 1

auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand Down Expand Up @@ -453,7 +465,13 @@ ErrorOr<nebula::cpp2::ErrorCode, std::list<TaskDescription>> JobManager::getAllT
return taskDescriptions;
}

nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient* client) {
nebula::cpp2::ErrorCode JobManager::addJob(JobDescription jobDesc, AdminClient* client) {
auto mutexIter = muJobFinished_.find(jobDesc.getSpace());
if (mutexIter == muJobFinished_.end()) {
mutexIter =
muJobFinished_.emplace(jobDesc.getSpace(), std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(mutexIter->second));
auto spaceId = jobDesc.getSpace();
auto jobId = jobDesc.getJobId();
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
Expand All @@ -466,7 +484,7 @@ nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient*
auto rc = save(jobKey, jobVal);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
enqueue(spaceId, jobId, JbOp::ADD, jobDesc.getJobType());
inFlightJobs_.emplace(jobId, jobDesc);
inFlightJobs_.emplace(std::move(jobId), std::move(jobDesc));
} else {
LOG(INFO) << "Add Job Failed";
if (rc != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
Expand Down Expand Up @@ -645,8 +663,7 @@ nebula::cpp2::ErrorCode JobManager::checkNeedRecoverJobExist(GraphSpaceID spaceI
auto type = std::get<0>(tup);
auto status = std::get<2>(tup);
if (type == cpp2::JobType::DATA_BALANCE || type == cpp2::JobType::ZONE_BALANCE) {
// QUEUE: The job has not been executed, the machine restarted
if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::QUEUE) {
if (status == cpp2::JobStatus::FAILED) {
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
return nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER;
}
}
Expand Down Expand Up @@ -709,187 +726,131 @@ nebula::cpp2::ErrorCode JobManager::stopJob(GraphSpaceID spaceId, JobID jobId) {

ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
GraphSpaceID spaceId, AdminClient* client, const std::vector<int32_t>& jobIds) {
int32_t recoveredJobNum = 0;
std::vector<std::pair<std::string, std::string>> jobKVs;
auto muIter = muJobFinished_.find(spaceId);
if (muIter == muJobFinished_.end()) {
muIter = muJobFinished_.emplace(spaceId, std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(muIter->second));
std::set<JobID> jobIdSet(jobIds.begin(), jobIds.end());
std::map<JobID, JobDescription> allJobs;
adminClient_ = client;
if (jobIds.empty()) {
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
auto optJobRet = JobDescription::makeJobDescription(iter->key(), iter->val());
if (!nebula::ok(optJobRet)) {
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
LOG(INFO) << "make job description failed, "
<< apache::thrift::util::enumNameSafe(nebula::error(optJobRet));
return nebula::error(optJobRet);
}
auto optJob = nebula::value(optJobRet);
auto id = optJob.getJobId();
allJobs.emplace(id, std::move(optJob));
}
std::set<JobID> jobsMaybeRecover;
for (auto& [id, job] : allJobs) {
auto status = job.getStatus();
if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::STOPPED) {
jobsMaybeRecover.emplace(id);
}
}
std::set<JobID>::reverse_iterator lastBalaceJobRecoverIt = jobsMaybeRecover.rend();
for (auto it = jobsMaybeRecover.rbegin(); it != jobsMaybeRecover.rend(); it++) {
auto jobType = allJobs[*it].getJobType();
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
lastBalaceJobRecoverIt = it;
break;
}
}
int32_t recoveredJobNum = 0;
auto finalyRecover = [&]() -> nebula::cpp2::ErrorCode {
for (auto& jobId : jobsMaybeRecover) {
if (!jobIdSet.empty() && !jobIdSet.count(jobId)) {
continue;
}
jobKVs.emplace_back(std::make_pair(iter->key(), iter->val()));
}
} else {
std::vector<std::string> jobKeys;
jobKeys.reserve(jobIds.size());
std::vector<std::pair<std::string, std::string>> totalJobKVs;
for (int jobId : jobIds) {
jobKeys.emplace_back(MetaKeyUtils::jobKey(spaceId, jobId));
}
std::vector<std::string> jobVals;
auto retCode = kvStore_->multiGet(kDefaultSpaceId, kDefaultPartId, jobKeys, &jobVals);
if (retCode.first != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode.first);
return retCode.first;
}
for (size_t i = 0; i < jobKeys.size(); i++) {
totalJobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i]));
}

// For DATA_BALANCE and ZONE_BALANCE job, jobs with STOPPED, FAILED, QUEUE status
// !!! The following situations can be recovered, only for jobs of the same type
// of DATA_BALANCE or ZONE_BALANCE。
// QUEUE: The job has not been executed, then the machine restarted.
// FAILED:
// The failed job will be recovered.
// FAILED and QUEUE jobs will not exist at the same time.
// STOPPED:
// If only one stopped jobId is specified, No FINISHED job or FAILED job of the
// same type after this job.
// If multiple jobs of the same type are specified, only starttime latest jobId
// will can be recovered, no FINISHED job or FAILED job of the same type after this latest job.
// The same type of STOPPED job exists in the following form, sorted by starttime:
// STOPPED job1, FAILED job2
// recover job job1 failed
// recover job job2 success
// STOPPED job1, FINISHED job2, STOPPED job3
// recover job job1 failed
// recover job job3 success
// recover job job1,job3 Only job3 can recover
std::unordered_map<cpp2::JobType, std::tuple<JobID, int64_t, cpp2::JobStatus>> dupResult;
std::unordered_map<JobID, std::pair<std::string, std::string>> dupkeyVal;

for (auto& jobkv : totalJobKVs) {
auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second);
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
auto jobStatus = optJob.getStatus();
auto jobId = optJob.getJobId();
auto jobType = optJob.getJobType();
auto jobStartTime = optJob.getStartTime();
if (jobStatus != cpp2::JobStatus::QUEUE && jobStatus != cpp2::JobStatus::FAILED &&
jobStatus != cpp2::JobStatus::STOPPED) {
continue;
}

// handle DATA_BALANCE and ZONE_BALANCE
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
// FAILED and QUEUE jobs will not exist at the same time.
if (jobStatus == cpp2::JobStatus::FAILED || jobStatus == cpp2::JobStatus::QUEUE) {
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
continue;
}

// current recover job status is stopped
auto findJobIter = dupResult.find(jobType);
if (findJobIter != dupResult.end()) {
auto oldJobInfo = findJobIter->second;
if (std::get<2>(oldJobInfo) != cpp2::JobStatus::STOPPED) {
continue;
}
}

// For a stopped job, check whether there is the same type of finished or
// failed job after it.
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto code = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(code);
return code;
}

bool findRest = false;
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
continue;
}

// eliminate oneself
auto keyPair = MetaKeyUtils::parseJobKey(iter->key());
auto destJobId = keyPair.second;
if (destJobId == jobId) {
continue;
}
auto tup = MetaKeyUtils::parseJobVal(iter->val());
auto destJobType = std::get<0>(tup);
auto destJobStatus = std::get<2>(tup);
auto destJobStartTime = std::get<3>(tup);
if (jobType == destJobType) {
// There is a specific type of failed job that does not allow recovery for the type of
// stopped job
if (destJobStatus == cpp2::JobStatus::FAILED) {
LOG(ERROR) << "There is a specific type of failed job that does not allow recovery "
"for the type of stopped job";
findRest = true;
break;
} else if (destJobStatus == cpp2::JobStatus::FINISHED) {
// Compare the start time of the job
if (destJobStartTime > jobStartTime) {
findRest = true;
break;
}
}
}
}
if (!findRest) {
auto findStoppedJobIter = dupResult.find(jobType);
if (findStoppedJobIter != dupResult.end()) {
// update stopped job
auto oldJobInfo = findStoppedJobIter->second;
auto oldJobStartTime = std::get<1>(oldJobInfo);
if (jobStartTime > oldJobStartTime) {
auto oldJobId = std::get<0>(oldJobInfo);
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.erase(oldJobId);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
}
} else {
// insert
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
}
}
auto& job = allJobs[jobId];
JobID jid;
bool jobExist = checkOnRunningJobExist(spaceId, job.getJobType(), job.getParas(), jid);
if (!jobExist) {
job.setStatus(cpp2::JobStatus::QUEUE, true);
auto jobKey = MetaKeyUtils::jobKey(job.getSpace(), jobId);
auto jobVal = MetaKeyUtils::jobVal(job.getJobType(),
job.getParas(),
job.getStatus(),
job.getStartTime(),
job.getStopTime(),
job.getErrorCode());
auto ret = save(jobKey, jobVal);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
enqueue(spaceId, jobId, JbOp::RECOVER, job.getJobType());
inFlightJobs_.emplace(jobId, job);
} else {
jobKVs.emplace_back(std::make_pair(jobkv.first, jobkv.second));
LOG(INFO) << "Add Job Failed";
if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
ret = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE;
}
return ret;
}
++recoveredJobNum;
}
}
for (auto& key : dupResult) {
auto jId = std::get<0>(key.second);
jobKVs.emplace_back(dupkeyVal[jId]);
return nebula::cpp2::ErrorCode::SUCCEEDED;
};
nebula::cpp2::ErrorCode rc = nebula::cpp2::ErrorCode::SUCCEEDED;
if (lastBalaceJobRecoverIt == jobsMaybeRecover.rend()) {
LOG(INFO) << "no balance jobs, do recover happily";
rc = finalyRecover();
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
return recoveredJobNum;
} else {
return rc;
}
}
JobID lastBalanceJobFinished = -1;
for (auto it = allJobs.rbegin(); it != allJobs.rend(); it++) {
auto jobType = it->second.getJobType();
if ((jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) &&
it->second.getStatus() == cpp2::JobStatus::FINISHED) {
lastBalanceJobFinished = it->first;
}
}

for (auto& jobkv : jobKVs) {
auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second);
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
if (optJob.getStatus() == cpp2::JobStatus::QUEUE ||
(jobIds.size() && (optJob.getStatus() == cpp2::JobStatus::FAILED ||
optJob.getStatus() == cpp2::JobStatus::STOPPED))) {
// Check if the job exists
JobID jId = 0;
auto jobExist =
checkOnRunningJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId);
if (!jobExist) {
auto jobId = optJob.getJobId();
enqueue(spaceId, jobId, JbOp::RECOVER, optJob.getJobType());
inFlightJobs_.emplace(jobId, optJob);
++recoveredJobNum;
}
for (auto it = jobsMaybeRecover.begin(); it != jobsMaybeRecover.end();) {
if (*it == *lastBalaceJobRecoverIt) {
break;
}
auto jobType = allJobs[*it].getJobType();
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
if (jobIdSet.empty() || jobIdSet.count(*it)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " when there's a newer balance job " << *lastBalaceJobRecoverIt
<< " stopped or failed";
}
it = jobsMaybeRecover.erase(it);
} else {
it++;
}
}
if (*lastBalaceJobRecoverIt < lastBalanceJobFinished) {
if (jobIdSet.empty() || jobIdSet.count(*lastBalaceJobRecoverIt)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " that before a finished balance job " << lastBalanceJobFinished;
}
jobsMaybeRecover.erase(*lastBalaceJobRecoverIt);
}
rc = finalyRecover();
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
return recoveredJobNum;
} else {
return rc;
}
return recoveredJobNum;
}

nebula::cpp2::ErrorCode JobManager::save(const std::string& k, const std::string& v) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class JobManager : public boost::noncopyable, public nebula::cpp::NonMovable {
* @param client
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode addJob(JobDescription& jobDesc, AdminClient* client);
nebula::cpp2::ErrorCode addJob(JobDescription jobDesc, AdminClient* client);

/**
* @brief The same job in inFlightJobs_.
Expand Down
Loading