diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 5fbb6ca61ce..95bd7d0bd75 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -948,7 +948,7 @@ Status MetaClient::handleResponse(const RESP& resp) { case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE: return Status::Error("Job not existed in chosen space!"); case nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER: - return Status::Error("Need to recover failed data balance job firstly!"); + return Status::Error("Need to recover or stop failed data balance job firstly!"); case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE: return Status::Error("Backup empty table!"); case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED: diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 09c9cde2e76..fcff47fe313 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -134,7 +134,7 @@ nebula::cpp2::ErrorCode AdminJobProcessor::addJobProcess(const cpp2::AdminJobReq auto retCode = jobMgr_->checkNeedRecoverJobExist(spaceId_, type); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "There is a failed data balance job, need to recover it firstly!"; + LOG(INFO) << "There is a failed data balance job, need to recover or stop it firstly!"; return retCode; } diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index d28684a908a..3bd3919bf67 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -90,7 +90,7 @@ nebula::cpp2::ErrorCode JobManager::handleRemainingJobs() { } } for (auto& jd : jds) { - jd.setStatus(cpp2::JobStatus::QUEUE, true); + jd.setStatus(cpp2::JobStatus::FAILED, true); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), @@ -454,6 +454,12 @@ ErrorOr> JobManager::getAllT } nebula::cpp2::ErrorCode JobManager::addJob(JobDescription& jobDesc, AdminClient* client) { + auto mutexIter = muJobFinished_.find(jobDesc.getSpace()); + if (mutexIter == muJobFinished_.end()) { + mutexIter = + muJobFinished_.emplace(jobDesc.getSpace(), std::make_unique()).first; + } + std::lock_guard lk(*(mutexIter->second)); auto spaceId = jobDesc.getSpace(); auto jobId = jobDesc.getJobId(); auto jobKey = MetaKeyUtils::jobKey(spaceId, jobId); @@ -646,7 +652,7 @@ nebula::cpp2::ErrorCode JobManager::checkNeedRecoverJobExist(GraphSpaceID spaceI auto status = std::get<2>(tup); if (type == cpp2::JobType::DATA_BALANCE || type == cpp2::JobType::ZONE_BALANCE) { // QUEUE: The job has not been executed, the machine restarted - if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::QUEUE) { + if (status == cpp2::JobStatus::FAILED) { return nebula::cpp2::ErrorCode::E_JOB_NEED_RECOVER; } } @@ -709,187 +715,130 @@ nebula::cpp2::ErrorCode JobManager::stopJob(GraphSpaceID spaceId, JobID jobId) { ErrorOr JobManager::recoverJob( GraphSpaceID spaceId, AdminClient* client, const std::vector& jobIds) { - int32_t recoveredJobNum = 0; - std::vector> jobKVs; + auto muIter = muJobFinished_.find(spaceId); + if (muIter == muJobFinished_.end()) { + muIter = muJobFinished_.emplace(spaceId, std::make_unique()).first; + } + std::lock_guard lk(*(muIter->second)); + std::set jobIdSet(jobIds.begin(), jobIds.end()); + std::map allJobs; adminClient_ = client; - if (jobIds.empty()) { - std::unique_ptr iter; - auto jobPre = MetaKeyUtils::jobPrefix(spaceId); - auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter); - if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode); - return retCode; + std::unique_ptr iter; + auto jobPre = MetaKeyUtils::jobPrefix(spaceId); + auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter); + if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + for (; iter->valid(); iter->next()) { + if (!MetaKeyUtils::isJobKey(iter->key())) { + continue; } - for (; iter->valid(); iter->next()) { - if (!MetaKeyUtils::isJobKey(iter->key())) { + auto optJobRet = JobDescription::makeJobDescription(iter->key(), iter->val()); + if (!nebula::ok(optJobRet)) { + LOG(INFO) << "make job description failed, " + << apache::thrift::util::enumNameSafe(nebula::error(optJobRet)); + } + auto optJob = nebula::value(optJobRet); + auto id = optJob.getJobId(); + allJobs.emplace(id, std::move(optJob)); + } + std::set jobsMaybeRecover; + for (auto& [id, job] : allJobs) { + auto status = job.getStatus(); + if (status == cpp2::JobStatus::FAILED || status == cpp2::JobStatus::STOPPED) { + jobsMaybeRecover.emplace(id); + } + } + std::set::reverse_iterator lastBalaceJobRecoverIt = jobsMaybeRecover.rend(); + for (auto it = jobsMaybeRecover.rbegin(); it != jobsMaybeRecover.rend(); it++) { + auto jobType = allJobs[*it].getJobType(); + if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) { + lastBalaceJobRecoverIt = it; + break; + } + } + int32_t recoveredJobNum = 0; + auto finalyRecover = [&]() -> nebula::cpp2::ErrorCode { + for (auto& jobId : jobsMaybeRecover) { + if (!jobIdSet.empty() && !jobIdSet.count(jobId)) { continue; } - jobKVs.emplace_back(std::make_pair(iter->key(), iter->val())); - } - } else { - std::vector jobKeys; - jobKeys.reserve(jobIds.size()); - std::vector> totalJobKVs; - for (int jobId : jobIds) { - jobKeys.emplace_back(MetaKeyUtils::jobKey(spaceId, jobId)); - } - std::vector jobVals; - auto retCode = kvStore_->multiGet(kDefaultSpaceId, kDefaultPartId, jobKeys, &jobVals); - if (retCode.first != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Can't find jobs, error: " << apache::thrift::util::enumNameSafe(retCode.first); - return retCode.first; - } - for (size_t i = 0; i < jobKeys.size(); i++) { - totalJobKVs.emplace_back(std::make_pair(jobKeys[i], jobVals[i])); - } - - // For DATA_BALANCE and ZONE_BALANCE job, jobs with STOPPED, FAILED, QUEUE status - // !!! The following situations can be recovered, only for jobs of the same type - // of DATA_BALANCE or ZONE_BALANCE。 - // QUEUE: The job has not been executed, then the machine restarted. - // FAILED: - // The failed job will be recovered. - // FAILED and QUEUE jobs will not exist at the same time. - // STOPPED: - // If only one stopped jobId is specified, No FINISHED job or FAILED job of the - // same type after this job. - // If multiple jobs of the same type are specified, only starttime latest jobId - // will can be recovered, no FINISHED job or FAILED job of the same type after this latest job. - // The same type of STOPPED job exists in the following form, sorted by starttime: - // STOPPED job1, FAILED job2 - // recover job job1 failed - // recover job job2 success - // STOPPED job1, FINISHED job2, STOPPED job3 - // recover job job1 failed - // recover job job3 success - // recover job job1,job3 Only job3 can recover - std::unordered_map> dupResult; - std::unordered_map> dupkeyVal; - - for (auto& jobkv : totalJobKVs) { - auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second); - if (nebula::ok(optJobRet)) { - auto optJob = nebula::value(optJobRet); - auto jobStatus = optJob.getStatus(); - auto jobId = optJob.getJobId(); - auto jobType = optJob.getJobType(); - auto jobStartTime = optJob.getStartTime(); - if (jobStatus != cpp2::JobStatus::QUEUE && jobStatus != cpp2::JobStatus::FAILED && - jobStatus != cpp2::JobStatus::STOPPED) { - continue; - } - - // handle DATA_BALANCE and ZONE_BALANCE - if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) { - // FAILED and QUEUE jobs will not exist at the same time. - if (jobStatus == cpp2::JobStatus::FAILED || jobStatus == cpp2::JobStatus::QUEUE) { - dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus); - dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second)); - continue; - } - - // current recover job status is stopped - auto findJobIter = dupResult.find(jobType); - if (findJobIter != dupResult.end()) { - auto oldJobInfo = findJobIter->second; - if (std::get<2>(oldJobInfo) != cpp2::JobStatus::STOPPED) { - continue; - } - } - - // For a stopped job, check whether there is the same type of finished or - // failed job after it. - std::unique_ptr iter; - auto jobPre = MetaKeyUtils::jobPrefix(spaceId); - auto code = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobPre, &iter); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(INFO) << "Fetch jobs failed, error: " << apache::thrift::util::enumNameSafe(code); - return code; - } - - bool findRest = false; - for (; iter->valid(); iter->next()) { - if (!MetaKeyUtils::isJobKey(iter->key())) { - continue; - } - - // eliminate oneself - auto keyPair = MetaKeyUtils::parseJobKey(iter->key()); - auto destJobId = keyPair.second; - if (destJobId == jobId) { - continue; - } - auto tup = MetaKeyUtils::parseJobVal(iter->val()); - auto destJobType = std::get<0>(tup); - auto destJobStatus = std::get<2>(tup); - auto destJobStartTime = std::get<3>(tup); - if (jobType == destJobType) { - // There is a specific type of failed job that does not allow recovery for the type of - // stopped job - if (destJobStatus == cpp2::JobStatus::FAILED) { - LOG(ERROR) << "There is a specific type of failed job that does not allow recovery " - "for the type of stopped job"; - findRest = true; - break; - } else if (destJobStatus == cpp2::JobStatus::FINISHED) { - // Compare the start time of the job - if (destJobStartTime > jobStartTime) { - findRest = true; - break; - } - } - } - } - if (!findRest) { - auto findStoppedJobIter = dupResult.find(jobType); - if (findStoppedJobIter != dupResult.end()) { - // update stopped job - auto oldJobInfo = findStoppedJobIter->second; - auto oldJobStartTime = std::get<1>(oldJobInfo); - if (jobStartTime > oldJobStartTime) { - auto oldJobId = std::get<0>(oldJobInfo); - dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus); - dupkeyVal.erase(oldJobId); - dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second)); - } - } else { - // insert - dupResult[jobType] = std::make_tuple(jobId, jobStartTime, jobStatus); - dupkeyVal.emplace(jobId, std::make_pair(jobkv.first, jobkv.second)); - } - } + auto& job = allJobs[jobId]; + JobID jid; + bool jobExist = checkOnRunningJobExist(spaceId, job.getJobType(), job.getParas(), jid); + if (!jobExist) { + job.setStatus(cpp2::JobStatus::QUEUE, true); + auto jobKey = MetaKeyUtils::jobKey(job.getSpace(), jobId); + auto jobVal = MetaKeyUtils::jobVal(job.getJobType(), + job.getParas(), + job.getStatus(), + job.getStartTime(), + job.getStopTime(), + job.getErrorCode()); + auto ret = save(jobKey, jobVal); + if (ret == nebula::cpp2::ErrorCode::SUCCEEDED) { + enqueue(spaceId, jobId, JbOp::RECOVER, job.getJobType()); + inFlightJobs_.emplace(jobId, job); } else { - jobKVs.emplace_back(std::make_pair(jobkv.first, jobkv.second)); + LOG(INFO) << "Add Job Failed"; + if (ret != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) { + ret = nebula::cpp2::ErrorCode::E_ADD_JOB_FAILURE; + } + return ret; } + ++recoveredJobNum; } } - for (auto& key : dupResult) { - auto jId = std::get<0>(key.second); - jobKVs.emplace_back(dupkeyVal[jId]); + return nebula::cpp2::ErrorCode::SUCCEEDED; + }; + nebula::cpp2::ErrorCode rc = nebula::cpp2::ErrorCode::SUCCEEDED; + if (lastBalaceJobRecoverIt == jobsMaybeRecover.rend()) { + LOG(INFO) << "no balance jobs, do recover happily"; + rc = finalyRecover(); + if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { + return recoveredJobNum; + } else { + return rc; + } + } + JobID lastBalanceJobFinished = -1; + for (auto it = allJobs.rbegin(); it != allJobs.rend(); it++) { + auto jobType = it->second.getJobType(); + if ((jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) && + it->second.getStatus() == cpp2::JobStatus::FINISHED) { + lastBalanceJobFinished = it->first; } } - - for (auto& jobkv : jobKVs) { - auto optJobRet = JobDescription::makeJobDescription(jobkv.first, jobkv.second); - if (nebula::ok(optJobRet)) { - auto optJob = nebula::value(optJobRet); - if (optJob.getStatus() == cpp2::JobStatus::QUEUE || - (jobIds.size() && (optJob.getStatus() == cpp2::JobStatus::FAILED || - optJob.getStatus() == cpp2::JobStatus::STOPPED))) { - // Check if the job exists - JobID jId = 0; - auto jobExist = - checkOnRunningJobExist(spaceId, optJob.getJobType(), optJob.getParas(), jId); - if (!jobExist) { - auto jobId = optJob.getJobId(); - enqueue(spaceId, jobId, JbOp::RECOVER, optJob.getJobType()); - inFlightJobs_.emplace(jobId, optJob); - ++recoveredJobNum; - } + for (auto it = jobsMaybeRecover.begin(); it != jobsMaybeRecover.end();) { + if (*it == *lastBalaceJobRecoverIt) { + break; + } + auto jobType = allJobs[*it].getJobType(); + if (jobType == cpp2::JobType::DATA_BALANCE || jobType == cpp2::JobType::ZONE_BALANCE) { + if (jobIdSet.empty() || jobIdSet.count(*it)) { + LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt + << " when there's a newer balance job " << *lastBalaceJobRecoverIt + << " stopped or failed"; } + it = jobsMaybeRecover.erase(it); + } else { + it++; + } + } + if (*lastBalaceJobRecoverIt < lastBalanceJobFinished) { + if (jobIdSet.empty() || jobIdSet.count(*lastBalaceJobRecoverIt)) { + LOG(INFO) << "can't recover a balance job " << *lastBalaceJobRecoverIt + << " that before a finished balance job " << lastBalanceJobFinished; } + jobsMaybeRecover.erase(*lastBalaceJobRecoverIt); + } + rc = finalyRecover(); + if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) { + return recoveredJobNum; + } else { + return rc; } - return recoveredJobNum; } nebula::cpp2::ErrorCode JobManager::save(const std::string& k, const std::string& v) { diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index ecf1d47d7d7..fd6a46cf6e4 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -551,8 +551,33 @@ TEST_F(JobManagerTest, RecoverJob) { jobMgr->bgThread_.join(); GraphSpaceID spaceId = 1; int32_t nJob = 3; - for (auto jobId = 0; jobId < nJob; ++jobId) { + for (auto jobId = 1; jobId <= nJob; ++jobId) { JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); + jd.setStatus(cpp2::JobStatus::STOPPED); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) { + JobDescription jd(spaceId, jobId, cpp2::JobType::COMPACT); + jd.setStatus(cpp2::JobStatus::STOPPED); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + for (auto jobId = nJob + 4; jobId <= nJob + 6; ++jobId) { + JobDescription jd(spaceId, jobId, cpp2::JobType::REBUILD_TAG_INDEX); + jd.setStatus(cpp2::JobStatus::STOPPED); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), @@ -562,15 +587,13 @@ TEST_F(JobManagerTest, RecoverJob) { jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr); - ASSERT_EQ(nebula::value(nJobRecovered), 1); + ASSERT_EQ(nebula::value(nJobRecovered), 3); } // case 2 - // For the same type of job, if there are stopped jobs and failed jobs in turn - // recover stopped job failed - // recover failed job succeeded + // For the balance job, if there are stopped jobs and failed jobs in turn + // only recover the last balance job { std::unique_ptr> jobMgr = getJobManager(); // set status to prevent running the job since AdminClient is a injector @@ -578,35 +601,56 @@ TEST_F(JobManagerTest, RecoverJob) { jobMgr->bgThread_.join(); GraphSpaceID spaceId = 1; int32_t nJob = 3; - for (auto jobId = 0; jobId < nJob; ++jobId) { - cpp2::JobStatus jobStatus; - if (jobId == 2) { - jobStatus = cpp2::JobStatus::FAILED; - } else { - jobStatus = cpp2::JobStatus::STOPPED; - } - JobDescription jd(spaceId, jobId, cpp2::JobType::DATA_BALANCE, {}, jobStatus); + for (auto jobId = 1; jobId <= nJob; ++jobId) { + cpp2::JobStatus jobStatus = cpp2::JobStatus::STOPPED; + JobDescription jd(spaceId, jobId, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), jd.getStatus(), - jobId + 100000, - jobId + 110000, + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) { + cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED; + JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + { + cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED; + JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1}); ASSERT_EQ(nebula::value(nJobRecovered), 0); - nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {2}); + ASSERT_EQ(nebula::value(nJobRecovered), 0); + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {3}); + ASSERT_EQ(nebula::value(nJobRecovered), 0); + + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {7}); ASSERT_EQ(nebula::value(nJobRecovered), 1); } // case 3 - // For the same type of job, if there are stopped jobs and finished jobs, stopped in turn - // recover stopped job befor finished job failed - // recover stopped job after finished job succeeded + // For the balance jobs, if there is a newer balance job, the failed or stopped jobs can't be + // recovered { std::unique_ptr> jobMgr = getJobManager(); // set status to prevent running the job since AdminClient is a injector @@ -614,29 +658,64 @@ TEST_F(JobManagerTest, RecoverJob) { jobMgr->bgThread_.join(); GraphSpaceID spaceId = 1; int32_t nJob = 4; - for (auto jobId = 0; jobId < nJob; ++jobId) { + for (auto jobId = 1; jobId <= nJob; ++jobId) { cpp2::JobStatus jobStatus; - if (jobId == 2) { - jobStatus = cpp2::JobStatus::FINISHED; + if (jobId / 2) { + jobStatus = cpp2::JobStatus::FAILED; } else { jobStatus = cpp2::JobStatus::STOPPED; } - JobDescription jd(spaceId, jobId, cpp2::JobType::DATA_BALANCE, {}, jobStatus); + JobDescription jd(spaceId, jobId, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), jd.getStatus(), - jobId + 100000, - jobId + 110000, + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) { + cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED; + JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + { + cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED; + JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), + jd.getErrorCode()); + jobMgr->save(jobKey, jobVal); + } + { + cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED; + JobDescription jd(spaceId, 8, cpp2::JobType::COMPACT, {}, jobStatus); + auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); + auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), + jd.getParas(), + jd.getStatus(), + jd.getStartTime(), + jd.getStopTime(), jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1}); ASSERT_EQ(nebula::value(nJobRecovered), 0); - nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {3}); - ASSERT_EQ(nebula::value(nJobRecovered), 1); + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr); + ASSERT_EQ(nebula::value(nJobRecovered), 2); } }