Skip to content

Commit

Permalink
Enhancement: "show jobs" only show space related (#2872)
Browse files Browse the repository at this point in the history
* Enhancement:change "show jobs" behavior to only print space related jobs.(#2815)

* show jobs will still remove all space expired jobs
* you will need choose one spaces before show job(s)

* FixReview: handle "stop job"&"recover job"
* only can stop jobs in chosen space
* only recover jobs in chosen space

* Fix&Add tck

* Fix review & format

Co-authored-by: Doodle <[email protected]>
  • Loading branch information
SuperYoko and critical27 authored Sep 24, 2021
1 parent 0ae8210 commit 3941945
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 24 deletions.
2 changes: 2 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Failed to get meta dir!");
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
return Status::Error("Job not in chosen space!");
case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE:
return Status::Error("Backup empty table!");
case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED:
Expand Down
3 changes: 2 additions & 1 deletion src/graph/validator/AdminJobValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ Status AdminJobValidator::validateImpl() {
}
}
}
} else {
sentence_->addPara(qctx()->rctx()->session()->space().name);
}

return Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminCmd::COMPACT:
case meta::cpp2::AdminCmd::FLUSH:
return true;
// TODO: Also space related, but not available in CreateJobExcutor now.
case meta::cpp2::AdminCmd::DATA_BALANCE:
case meta::cpp2::AdminCmd::DOWNLOAD:
case meta::cpp2::AdminCmd::INGEST:
Expand All @@ -49,7 +50,7 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminJobOp::SHOW:
case meta::cpp2::AdminJobOp::STOP:
case meta::cpp2::AdminJobOp::RECOVER:
return false;
return true;
}
return false;
}
Expand Down
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ enum ErrorCode {
E_BALANCER_FAILURE = -2047,
E_JOB_NOT_FINISHED = -2048,
E_TASK_REPORT_OUT_DATE = -2049,
E_JOB_NOT_IN_SPACE = -2050,
E_INVALID_JOB = -2065,

// Backup Failure
Expand Down
19 changes: 10 additions & 9 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::AdminJobOp::SHOW_All: {
auto ret = jobMgr->showJobs();
auto ret = jobMgr->showJobs(req.get_paras().back());
if (nebula::ok(ret)) {
result.set_job_desc(nebula::value(ret));
} else {
Expand All @@ -73,8 +73,9 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::AdminJobOp::SHOW: {
if (req.get_paras().empty()) {
LOG(ERROR) << "Parameter should be not empty";
static const size_t kShowArgsNum = 2;
if (req.get_paras().size() != kShowArgsNum) {
LOG(ERROR) << "Parameter number not matched";
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
Expand All @@ -85,8 +86,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

auto ret = jobMgr->showJob(iJob);
auto ret = jobMgr->showJob(iJob, req.get_paras().back());
if (nebula::ok(ret)) {
result.set_job_desc(std::vector<cpp2::JobDesc>{nebula::value(ret).first});
result.set_task_desc(nebula::value(ret).second);
Expand All @@ -96,8 +96,9 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::AdminJobOp::STOP: {
if (req.get_paras().empty()) {
LOG(ERROR) << "Parameter should be not empty";
static const size_t kStopJobArgsNum = 2;
if (req.get_paras().size() != kStopJobArgsNum) {
LOG(ERROR) << "Parameter number not matched";
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
Expand All @@ -107,11 +108,11 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
errorCode = jobMgr->stopJob(iJob);
errorCode = jobMgr->stopJob(iJob, req.get_paras().back());
break;
}
case nebula::meta::cpp2::AdminJobOp::RECOVER: {
auto ret = jobMgr->recoverJob();
auto ret = jobMgr->recoverJob(req.get_paras().back());
if (nebula::ok(ret)) {
result.set_recovered_job_num(nebula::value(ret));
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class JobDescription {
FRIEND_TEST(JobManagerTest, loadJobDescription);
FRIEND_TEST(JobManagerTest, showJobs);
FRIEND_TEST(JobManagerTest, showJob);
FRIEND_TEST(JobManagerTest, showJobsFromMultiSpace);
FRIEND_TEST(JobManagerTest, showJobInOtherSpace);
FRIEND_TEST(JobManagerTest, backupJob);
FRIEND_TEST(JobManagerTest, recoverJob);
FRIEND_TEST(GetStatsTest, StatsJob);
Expand Down
32 changes: 28 additions & 4 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "common/http/HttpClient.h"
#include "common/time/WallClock.h"
#include "interface/gen-cpp2/common_types.h"
#include "kvstore/Common.h"
#include "kvstore/KVIterator.h"
#include "meta/MetaServiceUtils.h"
Expand Down Expand Up @@ -375,7 +376,8 @@ void JobManager::enqueue(const JobID& jobId, const cpp2::AdminCmd& cmd) {
}
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJobs() {
ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJobs(
const std::string& spaceName) {
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -404,6 +406,9 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJob
expiredJobKeys.emplace_back(jobKey);
continue;
}
if (jobDesc.get_paras().back() != spaceName) {
continue;
}
ret.emplace_back(jobDesc);
} else { // iter-key() is a TaskKey
TaskDescription task(jobKey, iter->val());
Expand Down Expand Up @@ -477,7 +482,7 @@ bool JobManager::checkJobExist(const cpp2::AdminCmd& cmd,
}

ErrorOr<nebula::cpp2::ErrorCode, std::pair<cpp2::JobDesc, std::vector<cpp2::TaskDesc>>>
JobManager::showJob(JobID iJob) {
JobManager::showJob(JobID iJob, const std::string& spaceName) {
auto jobKey = JobDescription::makeJobKey(iJob);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
Expand All @@ -498,6 +503,10 @@ JobManager::showJob(JobID iJob) {
return nebula::error(optJobRet);
}
auto optJob = nebula::value(optJobRet);
if (optJob.getParas().back() != spaceName) {
LOG(WARNING) << "Show job " << iJob << " not in current space " << spaceName;
return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE;
}
ret.first = optJob.toJobDesc();
} else {
TaskDescription td(jKey, iter->val());
Expand All @@ -507,15 +516,27 @@ JobManager::showJob(JobID iJob) {
return ret;
}

nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob) {
nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob, const std::string& spaceName) {
LOG(INFO) << "try to stop job " << iJob;
auto optJobDescRet = JobDescription::loadJobDescription(iJob, kvStore_);
if (!nebula::ok(optJobDescRet)) {
auto retCode = nebula::error(optJobDescRet);
LOG(WARNING) << "LoadJobDesc failed, jobId " << iJob
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
auto optJobDesc = nebula::value(optJobDescRet);
if (optJobDesc.getParas().back() != spaceName) {
LOG(WARNING) << "Stop job " << iJob << " not in space " << spaceName;
return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE;
}
return jobFinished(iJob, cpp2::JobStatus::STOPPED);
}

/*
* Return: recovered job num.
* */
ErrorOr<nebula::cpp2::ErrorCode, JobID> JobManager::recoverJob() {
ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(const std::string& spaceName) {
int32_t recoveredJobNum = 0;
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter);
Expand All @@ -531,6 +552,9 @@ ErrorOr<nebula::cpp2::ErrorCode, JobID> JobManager::recoverJob() {
auto optJobRet = JobDescription::makeJobDescription(iter->key(), iter->val());
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
if (optJob.getParas().back() != spaceName) {
continue;
}
if (optJob.getStatus() == cpp2::JobStatus::QUEUE) {
// Check if the job exists
JobID jId = 0;
Expand Down
12 changes: 8 additions & 4 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
FRIEND_TEST(JobManagerTest, JobDeduplication);
FRIEND_TEST(JobManagerTest, loadJobDescription);
FRIEND_TEST(JobManagerTest, showJobs);
FRIEND_TEST(JobManagerTest, showJobsFromMultiSpace);
FRIEND_TEST(JobManagerTest, showJob);
FRIEND_TEST(JobManagerTest, showJobInOtherSpace);
FRIEND_TEST(JobManagerTest, recoverJob);
FRIEND_TEST(JobManagerTest, AddRebuildTagIndexJob);
FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob);
Expand Down Expand Up @@ -69,14 +71,16 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
*/
bool checkJobExist(const cpp2::AdminCmd& cmd, const std::vector<std::string>& paras, JobID& iJob);

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> showJobs();
ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> showJobs(
const std::string& spaceName);

ErrorOr<nebula::cpp2::ErrorCode, std::pair<cpp2::JobDesc, std::vector<cpp2::TaskDesc>>> showJob(
JobID iJob);
JobID iJob, const std::string& spaceName);

nebula::cpp2::ErrorCode stopJob(JobID iJob);
nebula::cpp2::ErrorCode stopJob(JobID iJob, const std::string& spaceName);

ErrorOr<nebula::cpp2::ErrorCode, JobID> recoverJob();
// return error/recovered job num
ErrorOr<nebula::cpp2::ErrorCode, uint32_t> recoverJob(const std::string& spaceName);

/**
* @brief persist job executed result, and do the cleanup
Expand Down
68 changes: 64 additions & 4 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ TEST_F(JobManagerTest, showJobs) {
jd2.setStatus(cpp2::JobStatus::FAILED);
jobMgr->addJob(jd2, adminClient_.get());

auto statusOrShowResult = jobMgr->showJobs();
auto statusOrShowResult = jobMgr->showJobs(paras1.back());
LOG(INFO) << "after show jobs";
ASSERT_TRUE(nebula::ok(statusOrShowResult));

Expand All @@ -273,6 +273,34 @@ TEST_F(JobManagerTest, showJobs) {
ASSERT_EQ(jobs[0].get_stop_time(), jd2.stopTime_);
}

TEST_F(JobManagerTest, showJobsFromMultiSpace) {
std::vector<std::string> paras1{"test_space"};
JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1);
jd1.setStatus(cpp2::JobStatus::RUNNING);
jd1.setStatus(cpp2::JobStatus::FINISHED);
jobMgr->addJob(jd1, adminClient_.get());

std::vector<std::string> paras2{"test_space2"};
JobDescription jd2(2, cpp2::AdminCmd::FLUSH, paras2);
jd2.setStatus(cpp2::JobStatus::RUNNING);
jd2.setStatus(cpp2::JobStatus::FAILED);
jobMgr->addJob(jd2, adminClient_.get());

auto statusOrShowResult = jobMgr->showJobs(paras2.back());
LOG(INFO) << "after show jobs";
ASSERT_TRUE(nebula::ok(statusOrShowResult));

auto& jobs = nebula::value(statusOrShowResult);
ASSERT_EQ(jobs.size(), 1);

ASSERT_EQ(jobs[0].get_id(), jd2.id_);
ASSERT_EQ(jobs[0].get_cmd(), cpp2::AdminCmd::FLUSH);
ASSERT_EQ(jobs[0].get_paras()[0], "test_space2");
ASSERT_EQ(jobs[0].get_status(), cpp2::JobStatus::FAILED);
ASSERT_EQ(jobs[0].get_start_time(), jd2.startTime_);
ASSERT_EQ(jobs[0].get_stop_time(), jd2.stopTime_);
}

HostAddr toHost(std::string strIp) { return HostAddr(strIp, 0); }

TEST_F(JobManagerTest, showJob) {
Expand Down Expand Up @@ -300,7 +328,7 @@ TEST_F(JobManagerTest, showJob) {
jobMgr->save(td2.taskKey(), td2.taskVal());

LOG(INFO) << "before jobMgr->showJob";
auto showResult = jobMgr->showJob(iJob);
auto showResult = jobMgr->showJob(iJob, paras.back());
LOG(INFO) << "after jobMgr->showJob";
ASSERT_TRUE(nebula::ok(showResult));
auto& jobs = nebula::value(showResult).first;
Expand Down Expand Up @@ -328,16 +356,48 @@ TEST_F(JobManagerTest, showJob) {
ASSERT_EQ(tasks[1].get_stop_time(), td2.stopTime_);
}

TEST_F(JobManagerTest, showJobInOtherSpace) {
std::vector<std::string> paras{"test_space"};

JobDescription jd(1, cpp2::AdminCmd::COMPACT, paras);
jd.setStatus(cpp2::JobStatus::RUNNING);
jd.setStatus(cpp2::JobStatus::FINISHED);
jobMgr->addJob(jd, adminClient_.get());

int32_t iJob = jd.id_;
int32_t task1 = 0;
auto host1 = toHost("127.0.0.1");

TaskDescription td1(iJob, task1, host1);
td1.setStatus(cpp2::JobStatus::RUNNING);
td1.setStatus(cpp2::JobStatus::FINISHED);
jobMgr->save(td1.taskKey(), td1.taskVal());

int32_t task2 = 1;
auto host2 = toHost("127.0.0.1");
TaskDescription td2(iJob, task2, host2);
td2.setStatus(cpp2::JobStatus::RUNNING);
td2.setStatus(cpp2::JobStatus::FAILED);
jobMgr->save(td2.taskKey(), td2.taskVal());

LOG(INFO) << "before jobMgr->showJob";
std::string chosenSpace = "spaceWithNoJob";
auto showResult = jobMgr->showJob(iJob, chosenSpace);
LOG(INFO) << "after jobMgr->showJob";
ASSERT_TRUE(!nebula::ok(showResult));
}

TEST_F(JobManagerTest, recoverJob) {
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::NOT_START;
auto spaceName = "test_space";
int32_t nJob = 3;
for (auto i = 0; i != nJob; ++i) {
JobDescription jd(i, cpp2::AdminCmd::FLUSH, {"test_space"});
JobDescription jd(i, cpp2::AdminCmd::FLUSH, {spaceName});
jobMgr->save(jd.jobKey(), jd.jobVal());
}

auto nJobRecovered = jobMgr->recoverJob();
auto nJobRecovered = jobMgr->recoverJob(spaceName);
ASSERT_EQ(nebula::value(nJobRecovered), 1);
}

Expand Down
Loading

0 comments on commit 3941945

Please sign in to comment.