From 3941945221d5822cbfbae2c913f5628c6cba6b64 Mon Sep 17 00:00:00 2001 From: Alex Xing <90179377+SuperYoko@users.noreply.github.com> Date: Fri, 24 Sep 2021 22:44:23 +0800 Subject: [PATCH] Enhancement: "show jobs" only show space related (#2872) * Enhancement:change "show jobs" behavior to only print space related jobs.(#2815) * show jobs will still remove all space expired jobs * you will need choose one spaces before show job(s) * FixReview: handle "stop job"&"recover job" * only can stop jobs in chosen space * only recover jobs in chosen space * Fix&Add tck * Fix review & format Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> --- src/clients/meta/MetaClient.cpp | 2 + src/graph/validator/AdminJobValidator.cpp | 3 +- src/graph/validator/AdminJobValidator.h | 3 +- src/interface/common.thrift | 1 + src/meta/processors/job/AdminJobProcessor.cpp | 19 +++-- src/meta/processors/job/JobDescription.h | 2 + src/meta/processors/job/JobManager.cpp | 32 ++++++- src/meta/processors/job/JobManager.h | 12 ++- src/meta/test/JobManagerTest.cpp | 68 ++++++++++++++- tests/tck/job/Job.feature | 84 ++++++++++++++++++- 10 files changed, 202 insertions(+), 24 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 87e276e5409..55612eb64d9 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -798,6 +798,8 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("Failed to get meta dir!"); case nebula::cpp2::ErrorCode::E_INVALID_JOB: return Status::Error("No valid job!"); + case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE: + return Status::Error("Job not in chosen space!"); case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE: return Status::Error("Backup empty table!"); case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED: diff --git a/src/graph/validator/AdminJobValidator.cpp b/src/graph/validator/AdminJobValidator.cpp index e174699b6d6..c2d9cfd7793 100644 --- a/src/graph/validator/AdminJobValidator.cpp +++ b/src/graph/validator/AdminJobValidator.cpp @@ -49,8 +49,9 @@ Status AdminJobValidator::validateImpl() { } } } + } else { + sentence_->addPara(qctx()->rctx()->session()->space().name); } - return Status::OK(); } diff --git a/src/graph/validator/AdminJobValidator.h b/src/graph/validator/AdminJobValidator.h index 37a650f6c1d..ea96710e10a 100644 --- a/src/graph/validator/AdminJobValidator.h +++ b/src/graph/validator/AdminJobValidator.h @@ -38,6 +38,7 @@ class AdminJobValidator final : public Validator { case meta::cpp2::AdminCmd::COMPACT: case meta::cpp2::AdminCmd::FLUSH: return true; + // TODO: Also space related, but not available in CreateJobExcutor now. case meta::cpp2::AdminCmd::DATA_BALANCE: case meta::cpp2::AdminCmd::DOWNLOAD: case meta::cpp2::AdminCmd::INGEST: @@ -49,7 +50,7 @@ class AdminJobValidator final : public Validator { case meta::cpp2::AdminJobOp::SHOW: case meta::cpp2::AdminJobOp::STOP: case meta::cpp2::AdminJobOp::RECOVER: - return false; + return true; } return false; } diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 6f05fb32fbf..001ae49950d 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -324,6 +324,7 @@ enum ErrorCode { E_BALANCER_FAILURE = -2047, E_JOB_NOT_FINISHED = -2048, E_TASK_REPORT_OUT_DATE = -2049, + E_JOB_NOT_IN_SPACE = -2050, E_INVALID_JOB = -2065, // Backup Failure diff --git a/src/meta/processors/job/AdminJobProcessor.cpp b/src/meta/processors/job/AdminJobProcessor.cpp index 9e9595fbd6f..efb2e8eb1eb 100644 --- a/src/meta/processors/job/AdminJobProcessor.cpp +++ b/src/meta/processors/job/AdminJobProcessor.cpp @@ -64,7 +64,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { break; } case nebula::meta::cpp2::AdminJobOp::SHOW_All: { - auto ret = jobMgr->showJobs(); + auto ret = jobMgr->showJobs(req.get_paras().back()); if (nebula::ok(ret)) { result.set_job_desc(nebula::value(ret)); } else { @@ -73,8 +73,9 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { break; } case nebula::meta::cpp2::AdminJobOp::SHOW: { - if (req.get_paras().empty()) { - LOG(ERROR) << "Parameter should be not empty"; + static const size_t kShowArgsNum = 2; + if (req.get_paras().size() != kShowArgsNum) { + LOG(ERROR) << "Parameter number not matched"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -85,8 +86,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } - - auto ret = jobMgr->showJob(iJob); + auto ret = jobMgr->showJob(iJob, req.get_paras().back()); if (nebula::ok(ret)) { result.set_job_desc(std::vector{nebula::value(ret).first}); result.set_task_desc(nebula::value(ret).second); @@ -96,8 +96,9 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { break; } case nebula::meta::cpp2::AdminJobOp::STOP: { - if (req.get_paras().empty()) { - LOG(ERROR) << "Parameter should be not empty"; + static const size_t kStopJobArgsNum = 2; + if (req.get_paras().size() != kStopJobArgsNum) { + LOG(ERROR) << "Parameter number not matched"; errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } @@ -107,11 +108,11 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) { errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM; break; } - errorCode = jobMgr->stopJob(iJob); + errorCode = jobMgr->stopJob(iJob, req.get_paras().back()); break; } case nebula::meta::cpp2::AdminJobOp::RECOVER: { - auto ret = jobMgr->recoverJob(); + auto ret = jobMgr->recoverJob(req.get_paras().back()); if (nebula::ok(ret)) { result.set_recovered_job_num(nebula::value(ret)); } else { diff --git a/src/meta/processors/job/JobDescription.h b/src/meta/processors/job/JobDescription.h index b6f538b7ea1..746e8d69e1a 100644 --- a/src/meta/processors/job/JobDescription.h +++ b/src/meta/processors/job/JobDescription.h @@ -27,6 +27,8 @@ class JobDescription { FRIEND_TEST(JobManagerTest, loadJobDescription); FRIEND_TEST(JobManagerTest, showJobs); FRIEND_TEST(JobManagerTest, showJob); + FRIEND_TEST(JobManagerTest, showJobsFromMultiSpace); + FRIEND_TEST(JobManagerTest, showJobInOtherSpace); FRIEND_TEST(JobManagerTest, backupJob); FRIEND_TEST(JobManagerTest, recoverJob); FRIEND_TEST(GetStatsTest, StatsJob); diff --git a/src/meta/processors/job/JobManager.cpp b/src/meta/processors/job/JobManager.cpp index 94d21f5cc94..b083d16a155 100644 --- a/src/meta/processors/job/JobManager.cpp +++ b/src/meta/processors/job/JobManager.cpp @@ -14,6 +14,7 @@ #include "common/http/HttpClient.h" #include "common/time/WallClock.h" +#include "interface/gen-cpp2/common_types.h" #include "kvstore/Common.h" #include "kvstore/KVIterator.h" #include "meta/MetaServiceUtils.h" @@ -375,7 +376,8 @@ void JobManager::enqueue(const JobID& jobId, const cpp2::AdminCmd& cmd) { } } -ErrorOr> JobManager::showJobs() { +ErrorOr> JobManager::showJobs( + const std::string& spaceName) { std::unique_ptr iter; auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { @@ -404,6 +406,9 @@ ErrorOr> JobManager::showJob expiredJobKeys.emplace_back(jobKey); continue; } + if (jobDesc.get_paras().back() != spaceName) { + continue; + } ret.emplace_back(jobDesc); } else { // iter-key() is a TaskKey TaskDescription task(jobKey, iter->val()); @@ -477,7 +482,7 @@ bool JobManager::checkJobExist(const cpp2::AdminCmd& cmd, } ErrorOr>> -JobManager::showJob(JobID iJob) { +JobManager::showJob(JobID iJob, const std::string& spaceName) { auto jobKey = JobDescription::makeJobKey(iJob); std::unique_ptr iter; auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter); @@ -498,6 +503,10 @@ JobManager::showJob(JobID iJob) { return nebula::error(optJobRet); } auto optJob = nebula::value(optJobRet); + if (optJob.getParas().back() != spaceName) { + LOG(WARNING) << "Show job " << iJob << " not in current space " << spaceName; + return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE; + } ret.first = optJob.toJobDesc(); } else { TaskDescription td(jKey, iter->val()); @@ -507,15 +516,27 @@ JobManager::showJob(JobID iJob) { return ret; } -nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob) { +nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob, const std::string& spaceName) { LOG(INFO) << "try to stop job " << iJob; + auto optJobDescRet = JobDescription::loadJobDescription(iJob, kvStore_); + if (!nebula::ok(optJobDescRet)) { + auto retCode = nebula::error(optJobDescRet); + LOG(WARNING) << "LoadJobDesc failed, jobId " << iJob + << " error: " << apache::thrift::util::enumNameSafe(retCode); + return retCode; + } + auto optJobDesc = nebula::value(optJobDescRet); + if (optJobDesc.getParas().back() != spaceName) { + LOG(WARNING) << "Stop job " << iJob << " not in space " << spaceName; + return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE; + } return jobFinished(iJob, cpp2::JobStatus::STOPPED); } /* * Return: recovered job num. * */ -ErrorOr JobManager::recoverJob() { +ErrorOr JobManager::recoverJob(const std::string& spaceName) { int32_t recoveredJobNum = 0; std::unique_ptr iter; auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter); @@ -531,6 +552,9 @@ ErrorOr JobManager::recoverJob() { auto optJobRet = JobDescription::makeJobDescription(iter->key(), iter->val()); if (nebula::ok(optJobRet)) { auto optJob = nebula::value(optJobRet); + if (optJob.getParas().back() != spaceName) { + continue; + } if (optJob.getStatus() == cpp2::JobStatus::QUEUE) { // Check if the job exists JobID jId = 0; diff --git a/src/meta/processors/job/JobManager.h b/src/meta/processors/job/JobManager.h index 378c8d01a5e..90bdfd695e2 100644 --- a/src/meta/processors/job/JobManager.h +++ b/src/meta/processors/job/JobManager.h @@ -36,7 +36,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab FRIEND_TEST(JobManagerTest, JobDeduplication); FRIEND_TEST(JobManagerTest, loadJobDescription); FRIEND_TEST(JobManagerTest, showJobs); + FRIEND_TEST(JobManagerTest, showJobsFromMultiSpace); FRIEND_TEST(JobManagerTest, showJob); + FRIEND_TEST(JobManagerTest, showJobInOtherSpace); FRIEND_TEST(JobManagerTest, recoverJob); FRIEND_TEST(JobManagerTest, AddRebuildTagIndexJob); FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob); @@ -69,14 +71,16 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab */ bool checkJobExist(const cpp2::AdminCmd& cmd, const std::vector& paras, JobID& iJob); - ErrorOr> showJobs(); + ErrorOr> showJobs( + const std::string& spaceName); ErrorOr>> showJob( - JobID iJob); + JobID iJob, const std::string& spaceName); - nebula::cpp2::ErrorCode stopJob(JobID iJob); + nebula::cpp2::ErrorCode stopJob(JobID iJob, const std::string& spaceName); - ErrorOr recoverJob(); + // return error/recovered job num + ErrorOr recoverJob(const std::string& spaceName); /** * @brief persist job executed result, and do the cleanup diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 0364772c875..f2832d8ac3d 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -253,7 +253,7 @@ TEST_F(JobManagerTest, showJobs) { jd2.setStatus(cpp2::JobStatus::FAILED); jobMgr->addJob(jd2, adminClient_.get()); - auto statusOrShowResult = jobMgr->showJobs(); + auto statusOrShowResult = jobMgr->showJobs(paras1.back()); LOG(INFO) << "after show jobs"; ASSERT_TRUE(nebula::ok(statusOrShowResult)); @@ -273,6 +273,34 @@ TEST_F(JobManagerTest, showJobs) { ASSERT_EQ(jobs[0].get_stop_time(), jd2.stopTime_); } +TEST_F(JobManagerTest, showJobsFromMultiSpace) { + std::vector paras1{"test_space"}; + JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1); + jd1.setStatus(cpp2::JobStatus::RUNNING); + jd1.setStatus(cpp2::JobStatus::FINISHED); + jobMgr->addJob(jd1, adminClient_.get()); + + std::vector paras2{"test_space2"}; + JobDescription jd2(2, cpp2::AdminCmd::FLUSH, paras2); + jd2.setStatus(cpp2::JobStatus::RUNNING); + jd2.setStatus(cpp2::JobStatus::FAILED); + jobMgr->addJob(jd2, adminClient_.get()); + + auto statusOrShowResult = jobMgr->showJobs(paras2.back()); + LOG(INFO) << "after show jobs"; + ASSERT_TRUE(nebula::ok(statusOrShowResult)); + + auto& jobs = nebula::value(statusOrShowResult); + ASSERT_EQ(jobs.size(), 1); + + ASSERT_EQ(jobs[0].get_id(), jd2.id_); + ASSERT_EQ(jobs[0].get_cmd(), cpp2::AdminCmd::FLUSH); + ASSERT_EQ(jobs[0].get_paras()[0], "test_space2"); + ASSERT_EQ(jobs[0].get_status(), cpp2::JobStatus::FAILED); + ASSERT_EQ(jobs[0].get_start_time(), jd2.startTime_); + ASSERT_EQ(jobs[0].get_stop_time(), jd2.stopTime_); +} + HostAddr toHost(std::string strIp) { return HostAddr(strIp, 0); } TEST_F(JobManagerTest, showJob) { @@ -300,7 +328,7 @@ TEST_F(JobManagerTest, showJob) { jobMgr->save(td2.taskKey(), td2.taskVal()); LOG(INFO) << "before jobMgr->showJob"; - auto showResult = jobMgr->showJob(iJob); + auto showResult = jobMgr->showJob(iJob, paras.back()); LOG(INFO) << "after jobMgr->showJob"; ASSERT_TRUE(nebula::ok(showResult)); auto& jobs = nebula::value(showResult).first; @@ -328,16 +356,48 @@ TEST_F(JobManagerTest, showJob) { ASSERT_EQ(tasks[1].get_stop_time(), td2.stopTime_); } +TEST_F(JobManagerTest, showJobInOtherSpace) { + std::vector paras{"test_space"}; + + JobDescription jd(1, cpp2::AdminCmd::COMPACT, paras); + jd.setStatus(cpp2::JobStatus::RUNNING); + jd.setStatus(cpp2::JobStatus::FINISHED); + jobMgr->addJob(jd, adminClient_.get()); + + int32_t iJob = jd.id_; + int32_t task1 = 0; + auto host1 = toHost("127.0.0.1"); + + TaskDescription td1(iJob, task1, host1); + td1.setStatus(cpp2::JobStatus::RUNNING); + td1.setStatus(cpp2::JobStatus::FINISHED); + jobMgr->save(td1.taskKey(), td1.taskVal()); + + int32_t task2 = 1; + auto host2 = toHost("127.0.0.1"); + TaskDescription td2(iJob, task2, host2); + td2.setStatus(cpp2::JobStatus::RUNNING); + td2.setStatus(cpp2::JobStatus::FAILED); + jobMgr->save(td2.taskKey(), td2.taskVal()); + + LOG(INFO) << "before jobMgr->showJob"; + std::string chosenSpace = "spaceWithNoJob"; + auto showResult = jobMgr->showJob(iJob, chosenSpace); + LOG(INFO) << "after jobMgr->showJob"; + ASSERT_TRUE(!nebula::ok(showResult)); +} + TEST_F(JobManagerTest, recoverJob) { // set status to prevent running the job since AdminClient is a injector jobMgr->status_ = JobManager::JbmgrStatus::NOT_START; + auto spaceName = "test_space"; int32_t nJob = 3; for (auto i = 0; i != nJob; ++i) { - JobDescription jd(i, cpp2::AdminCmd::FLUSH, {"test_space"}); + JobDescription jd(i, cpp2::AdminCmd::FLUSH, {spaceName}); jobMgr->save(jd.jobKey(), jd.jobVal()); } - auto nJobRecovered = jobMgr->recoverJob(); + auto nJobRecovered = jobMgr->recoverJob(spaceName); ASSERT_EQ(nebula::value(nJobRecovered), 1); } diff --git a/tests/tck/job/Job.feature b/tests/tck/job/Job.feature index fb129036420..bace1f4a71a 100644 --- a/tests/tck/job/Job.feature +++ b/tests/tck/job/Job.feature @@ -32,8 +32,30 @@ Feature: Submit job space requirements """ Then a SemanticError should be raised at runtime: - Scenario: Not existed job + Scenario: Operate job require space: Given an empty graph + When executing query: + """ + SHOW JOB 123456; + """ + Then a SemanticError should be raised at runtime: Space was not chosen. + When executing query: + """ + STOP JOB 123456; + """ + Then a SemanticError should be raised at runtime: Space was not chosen. + When executing query: + """ + RECOVER JOB; + """ + Then a SemanticError should be raised at runtime: Space was not chosen. + + Scenario: Not exist job + Given create a space with following options: + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | FIXED_STRING(20) | + And wait 6 seconds When executing query: """ SHOW JOB 123456; @@ -98,6 +120,66 @@ Feature: Submit job space requirements """ Then an ExecutionError should be raised at runtime: Save job failure! + Scenario: Submit and show jobs in other space + Given create a space with following options: + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | FIXED_STRING(20) | + And wait 6 seconds + When executing query: + """ + SUBMIT JOB COMPACT; + """ + Then the result should be, in any order: + | New Job Id | + | /\d+/ | + And wait 1 seconds + When executing query: + """ + SUBMIT JOB FLUSH; + """ + Then the result should be, in any order: + | New Job Id | + | /\d+/ | + And wait 1 seconds + When executing query: + """ + SUBMIT JOB STATS; + """ + Then the result should be, in any order: + | New Job Id | + | /\d+/ | + And wait 10 seconds + When executing query: + """ + SHOW JOBS; + """ + Then the result should be, the first 3 records in order, and register Job Id as a list named job_id: + | Job Id | Command | Status | Start Time | Stop Time | + | /\d+/ | "STATS" | "FINISHED" | /\w+/ | /\w+/ | + | /\d+/ | "FLUSH" | "FINISHED" | /\w+/ | /\w+/ | + | /\d+/ | "COMPACT" | "FINISHED" | /\w+/ | /\w+/ | + Given create a space with following options: + | partition_num | 9 | + | replica_factor | 1 | + | vid_type | FIXED_STRING(20) | + When executing query: + """ + SHOW JOBS; + """ + Then the result should be, in order: + | Job Id | Command | Status | Start Time | Stop Time | + When executing query, fill replace holders with element index of 0 in job_id: + """ + SHOW JOB {}; + """ + Then an ExecutionError should be raised at runtime:Job not in chosen space! + When executing query, fill replace holders with element index of 0 in job_id: + """ + STOP JOB {}; + """ + Then an ExecutionError should be raised at runtime:Job not in chosen space! + # This is skipped becuase it is hard to simulate the situation # When executing query: # """