Skip to content

Commit

Permalink
refactor recover job
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Aug 23, 2022
1 parent 7f185c9 commit 552b729
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 202 deletions.
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ Status MetaClient::handleResponse(const RESP& resp) {
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
return Status::Error("Job not existed in chosen space!");
case nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER:
return Status::Error("Need to recover failed data balance job firstly!");
return Status::Error("Need to recover or stop failed data balance job firstly!");
case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE:
return Status::Error("Backup empty table!");
case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED:
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq

auto retCode = jobMgr_->checkNeedRecoverJobExist(spaceId_, type);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "There is a failed data balance job, need to recover it firstly!";
LOG(INFO) << "There is a failed data balance job, need to recover or stop it firstly!";
return retCode;
}

Expand Down
289 changes: 119 additions & 170 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() {
}
}
for (auto& jd : jds) {
jd.setStatus(cpp2::JobStatus::QUEUE, true);
jd.setStatus(cpp2::JobStatus::FAILED, true);
auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId());
auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(),
jd.getParas(),
Expand Down Expand Up @@ -454,6 +454,12 @@ ErrorOr<nebula::cpp2::ErrorCode, std::list<TaskDescription>> JobManager::getAllT
}

nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient* client) {
auto mutexIter = muJobFinished_.find(jobDesc.getSpace());
if (mutexIter == muJobFinished_.end()) {
mutexIter =
muJobFinished_.emplace(jobDesc.getSpace(), std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(mutexIter->second));
auto spaceId = jobDesc.getSpace();
auto jobId = jobDesc.getJobId();
auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId);
Expand Down Expand Up @@ -646,7 +652,7 @@ nebula::cpp2::ErrorCode JobManager::checkNeedRecoverJobExist(GraphSpaceID spaceI
auto status = std::get<2>(tup);
if (type == cpp2::JobType::DATA_BALANCE || type == cpp2::JobType::ZONE_BALANCE) {
// QUEUE: The job has not been executed, the machine restarted
if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::QUEUE) {
if (status == cpp2::JobStatus::FAILED) {
return nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER;
}
}
Expand Down Expand Up @@ -709,187 +715,130 @@ nebula::cpp2::ErrorCode JobManager::stopJob(GraphSpaceID spaceId, JobID jobId) {

ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(
GraphSpaceID spaceId, AdminClient* client, const std::vector<int32_t>& jobIds) {
int32_t recoveredJobNum = 0;
std::vector<std::pair<std::string, std::string>> jobKVs;
auto muIter = muJobFinished_.find(spaceId);
if (muIter == muJobFinished_.end()) {
muIter = muJobFinished_.emplace(spaceId, std::make_unique<std::recursive_mutex>()).first;
}
std::lock_guard<std::recursive_mutex> lk(*(muIter->second));
std::set<JobID> jobIdSet(jobIds.begin(), jobIds.end());
std::map<JobID, JobDescription> allJobs;
adminClient_ = client;
if (jobIds.empty()) {
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
continue;
}
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
auto optJobRet = JobDescription::makeJobDescription(iter->key(), iter->val());
if (!nebula::ok(optJobRet)) {
LOG(INFO) << "make job description failed, "
<< apache::thrift::util::enumNameSafe(nebula::error(optJobRet));
}
auto optJob = nebula::value(optJobRet);
auto id = optJob.getJobId();
allJobs.emplace(id, std::move(optJob));
}
std::set<JobID> jobsMaybeRecover;
for (auto& [id, job] : allJobs) {
auto status = job.getStatus();
if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::STOPPED) {
jobsMaybeRecover.emplace(id);
}
}
std::set<JobID>::reverse_iterator lastBalaceJobRecoverIt = jobsMaybeRecover.rend();
for (auto it = jobsMaybeRecover.rbegin(); it != jobsMaybeRecover.rend(); it++) {
auto jobType = allJobs[*it].getJobType();
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
lastBalaceJobRecoverIt = it;
break;
}
}
int32_t recoveredJobNum = 0;
auto finalyRecover = [&]() -> nebula::cpp2::ErrorCode {
for (auto& jobId : jobsMaybeRecover) {
if (!jobIdSet.empty() && !jobIdSet.count(jobId)) {
continue;
}
jobKVs.emplace_back(std::make_pair(iter->key(), iter->val()));
}
} else {
std::vector<std::string> jobKeys;
jobKeys.reserve(jobIds.size());
std::vector<std::pair<std::string, std::string>> totalJobKVs;
for (int jobId : jobIds) {
jobKeys.emplace_back(MetaKeyUtils::jobKey(spaceId, jobId));
}
std::vector<std::string> jobVals;
auto retCode = kvStore_->multiGet(kDefaultSpaceId, kDefaultPartId, jobKeys, &jobVals);
if (retCode.first != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode.first);
return retCode.first;
}
for (size_t i = 0; i < jobKeys.size(); i++) {
totalJobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i]));
}

// For DATA_BALANCE and ZONE_BALANCE job, jobs with STOPPED, FAILED, QUEUE status
// !!! The following situations can be recovered, only for jobs of the same type
// of DATA_BALANCE or ZONE_BALANCE。
// QUEUE: The job has not been executed, then the machine restarted.
// FAILED:
// The failed job will be recovered.
// FAILED and QUEUE jobs will not exist at the same time.
// STOPPED:
// If only one stopped jobId is specified, No FINISHED job or FAILED job of the
// same type after this job.
// If multiple jobs of the same type are specified, only starttime latest jobId
// will can be recovered, no FINISHED job or FAILED job of the same type after this latest job.
// The same type of STOPPED job exists in the following form, sorted by starttime:
// STOPPED job1, FAILED job2
// recover job job1 failed
// recover job job2 success
// STOPPED job1, FINISHED job2, STOPPED job3
// recover job job1 failed
// recover job job3 success
// recover job job1,job3 Only job3 can recover
std::unordered_map<cpp2::JobType, std::tuple<JobID, int64_t, cpp2::JobStatus>> dupResult;
std::unordered_map<JobID, std::pair<std::string, std::string>> dupkeyVal;

for (auto& jobkv : totalJobKVs) {
auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second);
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
auto jobStatus = optJob.getStatus();
auto jobId = optJob.getJobId();
auto jobType = optJob.getJobType();
auto jobStartTime = optJob.getStartTime();
if (jobStatus != cpp2::JobStatus::QUEUE && jobStatus != cpp2::JobStatus::FAILED &&
jobStatus != cpp2::JobStatus::STOPPED) {
continue;
}

// handle DATA_BALANCE and ZONE_BALANCE
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
// FAILED and QUEUE jobs will not exist at the same time.
if (jobStatus == cpp2::JobStatus::FAILED || jobStatus == cpp2::JobStatus::QUEUE) {
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
continue;
}

// current recover job status is stopped
auto findJobIter = dupResult.find(jobType);
if (findJobIter != dupResult.end()) {
auto oldJobInfo = findJobIter->second;
if (std::get<2>(oldJobInfo) != cpp2::JobStatus::STOPPED) {
continue;
}
}

// For a stopped job, check whether there is the same type of finished or
// failed job after it.
std::unique_ptr<kvstore::KVIterator> iter;
auto jobPre = MetaKeyUtils::jobPrefix(spaceId);
auto code = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(code);
return code;
}

bool findRest = false;
for (; iter->valid(); iter->next()) {
if (!MetaKeyUtils::isJobKey(iter->key())) {
continue;
}

// eliminate oneself
auto keyPair = MetaKeyUtils::parseJobKey(iter->key());
auto destJobId = keyPair.second;
if (destJobId == jobId) {
continue;
}
auto tup = MetaKeyUtils::parseJobVal(iter->val());
auto destJobType = std::get<0>(tup);
auto destJobStatus = std::get<2>(tup);
auto destJobStartTime = std::get<3>(tup);
if (jobType == destJobType) {
// There is a specific type of failed job that does not allow recovery for the type of
// stopped job
if (destJobStatus == cpp2::JobStatus::FAILED) {
LOG(ERROR) << "There is a specific type of failed job that does not allow recovery "
"for the type of stopped job";
findRest = true;
break;
} else if (destJobStatus == cpp2::JobStatus::FINISHED) {
// Compare the start time of the job
if (destJobStartTime > jobStartTime) {
findRest = true;
break;
}
}
}
}
if (!findRest) {
auto findStoppedJobIter = dupResult.find(jobType);
if (findStoppedJobIter != dupResult.end()) {
// update stopped job
auto oldJobInfo = findStoppedJobIter->second;
auto oldJobStartTime = std::get<1>(oldJobInfo);
if (jobStartTime > oldJobStartTime) {
auto oldJobId = std::get<0>(oldJobInfo);
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.erase(oldJobId);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
}
} else {
// insert
dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus);
dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second));
}
}
auto& job = allJobs[jobId];
JobID jid;
bool jobExist = checkOnRunningJobExist(spaceId, job.getJobType(), job.getParas(), jid);
if (!jobExist) {
job.setStatus(cpp2::JobStatus::QUEUE, true);
auto jobKey = MetaKeyUtils::jobKey(job.getSpace(), jobId);
auto jobVal = MetaKeyUtils::jobVal(job.getJobType(),
job.getParas(),
job.getStatus(),
job.getStartTime(),
job.getStopTime(),
job.getErrorCode());
auto ret = save(jobKey, jobVal);
if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) {
enqueue(spaceId, jobId, JbOp::RECOVER, job.getJobType());
inFlightJobs_.emplace(jobId, job);
} else {
jobKVs.emplace_back(std::make_pair(jobkv.first, jobkv.second));
LOG(INFO) << "Add Job Failed";
if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
ret = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE;
}
return ret;
}
++recoveredJobNum;
}
}
for (auto& key : dupResult) {
auto jId = std::get<0>(key.second);
jobKVs.emplace_back(dupkeyVal[jId]);
return nebula::cpp2::ErrorCode::SUCCEEDED;
};
nebula::cpp2::ErrorCode rc = nebula::cpp2::ErrorCode::SUCCEEDED;
if (lastBalaceJobRecoverIt == jobsMaybeRecover.rend()) {
LOG(INFO) << "no balance jobs, do recover happily";
rc = finalyRecover();
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
return recoveredJobNum;
} else {
return rc;
}
}
JobID lastBalanceJobFinished = -1;
for (auto it = allJobs.rbegin(); it != allJobs.rend(); it++) {
auto jobType = it->second.getJobType();
if ((jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) &&
it->second.getStatus() == cpp2::JobStatus::FINISHED) {
lastBalanceJobFinished = it->first;
}
}

for (auto& jobkv : jobKVs) {
auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second);
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
if (optJob.getStatus() == cpp2::JobStatus::QUEUE ||
(jobIds.size() && (optJob.getStatus() == cpp2::JobStatus::FAILED ||
optJob.getStatus() == cpp2::JobStatus::STOPPED))) {
// Check if the job exists
JobID jId = 0;
auto jobExist =
checkOnRunningJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId);
if (!jobExist) {
auto jobId = optJob.getJobId();
enqueue(spaceId, jobId, JbOp::RECOVER, optJob.getJobType());
inFlightJobs_.emplace(jobId, optJob);
++recoveredJobNum;
}
for (auto it = jobsMaybeRecover.begin(); it != jobsMaybeRecover.end();) {
if (*it == *lastBalaceJobRecoverIt) {
break;
}
auto jobType = allJobs[*it].getJobType();
if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) {
if (jobIdSet.empty() || jobIdSet.count(*it)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " when there's a newer balance job " << *lastBalaceJobRecoverIt
<< " stopped or failed";
}
it = jobsMaybeRecover.erase(it);
} else {
it++;
}
}
if (*lastBalaceJobRecoverIt < lastBalanceJobFinished) {
if (jobIdSet.empty() || jobIdSet.count(*lastBalaceJobRecoverIt)) {
LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt
<< " that before a finished balance job " << lastBalanceJobFinished;
}
jobsMaybeRecover.erase(*lastBalaceJobRecoverIt);
}
rc = finalyRecover();
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
return recoveredJobNum;
} else {
return rc;
}
return recoveredJobNum;
}

nebula::cpp2::ErrorCode JobManager::save(const std::string& k, const std::string& v) {
Expand Down
Loading

0 comments on commit 552b729

Please sign in to comment.