Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: "show jobs" only show space related #2872

Merged
merged 7 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Failed to get meta dir!");
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
return Status::Error("Job not in chosen space!");
case nebula::cpp2::ErrorCode::E_BACKUP_EMPTY_TABLE:
return Status::Error("Backup empty table!");
case nebula::cpp2::ErrorCode::E_BACKUP_TABLE_FAILED:
Expand Down
3 changes: 2 additions & 1 deletion src/graph/validator/AdminJobValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ Status AdminJobValidator::validateImpl() {
}
}
}
} else {
sentence_->addPara(qctx()->rctx()->session()->space().name);
}

return Status::OK();
}

Expand Down
3 changes: 2 additions & 1 deletion src/graph/validator/AdminJobValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminCmd::COMPACT:
case meta::cpp2::AdminCmd::FLUSH:
return true;
// TODO: Also space related, but not available in CreateJobExcutor now.
case meta::cpp2::AdminCmd::DATA_BALANCE:
case meta::cpp2::AdminCmd::DOWNLOAD:
case meta::cpp2::AdminCmd::INGEST:
Expand All @@ -49,7 +50,7 @@ class AdminJobValidator final : public Validator {
case meta::cpp2::AdminJobOp::SHOW:
case meta::cpp2::AdminJobOp::STOP:
case meta::cpp2::AdminJobOp::RECOVER:
return false;
return true;
}
return false;
}
Expand Down
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ enum ErrorCode {
E_BALANCER_FAILURE = -2047,
E_JOB_NOT_FINISHED = -2048,
E_TASK_REPORT_OUT_DATE = -2049,
E_JOB_NOT_IN_SPACE = -2050,
E_INVALID_JOB = -2065,

// Backup Failure
Expand Down
19 changes: 10 additions & 9 deletions src/meta/processors/job/AdminJobProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::AdminJobOp::SHOW_All: {
auto ret = jobMgr->showJobs();
auto ret = jobMgr->showJobs(req.get_paras().back());
if (nebula::ok(ret)) {
result.set_job_desc(nebula::value(ret));
} else {
Expand All @@ -73,8 +73,9 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::AdminJobOp::SHOW: {
if (req.get_paras().empty()) {
LOG(ERROR) << "Parameter should be not empty";
static const size_t kShowArgsNum = 2;
if (req.get_paras().size() != kShowArgsNum) {
LOG(ERROR) << "Parameter number not matched";
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
Expand All @@ -85,8 +86,7 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}

auto ret = jobMgr->showJob(iJob);
auto ret = jobMgr->showJob(iJob, req.get_paras().back());
if (nebula::ok(ret)) {
result.set_job_desc(std::vector<cpp2::JobDesc>{nebula::value(ret).first});
result.set_task_desc(nebula::value(ret).second);
Expand All @@ -96,8 +96,9 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
break;
}
case nebula::meta::cpp2::AdminJobOp::STOP: {
if (req.get_paras().empty()) {
LOG(ERROR) << "Parameter should be not empty";
static const size_t kStopJobArgsNum = 2;
if (req.get_paras().size() != kStopJobArgsNum) {
LOG(ERROR) << "Parameter number not matched";
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
Expand All @@ -107,11 +108,11 @@ void AdminJobProcessor::process(const cpp2::AdminJobReq& req) {
errorCode = nebula::cpp2::ErrorCode::E_INVALID_PARM;
break;
}
errorCode = jobMgr->stopJob(iJob);
errorCode = jobMgr->stopJob(iJob, req.get_paras().back());
break;
}
case nebula::meta::cpp2::AdminJobOp::RECOVER: {
auto ret = jobMgr->recoverJob();
auto ret = jobMgr->recoverJob(req.get_paras().back());
if (nebula::ok(ret)) {
result.set_recovered_job_num(nebula::value(ret));
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/processors/job/JobDescription.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class JobDescription {
FRIEND_TEST(JobManagerTest, loadJobDescription);
FRIEND_TEST(JobManagerTest, showJobs);
FRIEND_TEST(JobManagerTest, showJob);
FRIEND_TEST(JobManagerTest, showJobsFromMultiSpace);
FRIEND_TEST(JobManagerTest, showJobInOtherSpace);
FRIEND_TEST(JobManagerTest, backupJob);
FRIEND_TEST(JobManagerTest, recoverJob);
FRIEND_TEST(GetStatsTest, StatsJob);
Expand Down
32 changes: 28 additions & 4 deletions src/meta/processors/job/JobManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "common/http/HttpClient.h"
#include "common/time/WallClock.h"
#include "interface/gen-cpp2/common_types.h"
#include "kvstore/Common.h"
#include "kvstore/KVIterator.h"
#include "meta/MetaServiceUtils.h"
Expand Down Expand Up @@ -375,7 +376,8 @@ void JobManager::enqueue(const JobID& jobId, const cpp2::AdminCmd& cmd) {
}
}

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJobs() {
ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJobs(
const std::string& spaceName) {
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter);
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -404,6 +406,9 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> JobManager::showJob
expiredJobKeys.emplace_back(jobKey);
continue;
}
if (jobDesc.get_paras().back() != spaceName) {
continue;
}
ret.emplace_back(jobDesc);
} else { // iter-key() is a TaskKey
TaskDescription task(jobKey, iter->val());
Expand Down Expand Up @@ -477,7 +482,7 @@ bool JobManager::checkJobExist(const cpp2::AdminCmd& cmd,
}

ErrorOr<nebula::cpp2::ErrorCode, std::pair<cpp2::JobDesc, std::vector<cpp2::TaskDesc>>>
JobManager::showJob(JobID iJob) {
JobManager::showJob(JobID iJob, const std::string& spaceName) {
auto jobKey = JobDescription::makeJobKey(iJob);
std::unique_ptr<kvstore::KVIterator> iter;
auto rc = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, jobKey, &iter);
Expand All @@ -498,6 +503,10 @@ JobManager::showJob(JobID iJob) {
return nebula::error(optJobRet);
}
auto optJob = nebula::value(optJobRet);
if (optJob.getParas().back() != spaceName) {
LOG(WARNING) << "Show job " << iJob << " not in current space " << spaceName;
return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE;
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
}
ret.first = optJob.toJobDesc();
} else {
TaskDescription td(jKey, iter->val());
Expand All @@ -507,15 +516,27 @@ JobManager::showJob(JobID iJob) {
return ret;
}

nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob) {
nebula::cpp2::ErrorCode JobManager::stopJob(JobID iJob, const std::string& spaceName) {
LOG(INFO) << "try to stop job " << iJob;
auto optJobDescRet = JobDescription::loadJobDescription(iJob, kvStore_);
if (!nebula::ok(optJobDescRet)) {
auto retCode = nebula::error(optJobDescRet);
LOG(WARNING) << "LoadJobDesc failed, jobId " << iJob
<< " error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
auto optJobDesc = nebula::value(optJobDescRet);
if (optJobDesc.getParas().back() != spaceName) {
LOG(WARNING) << "Stop job " << iJob << " not in space " << spaceName;
return nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE;
}
return jobFinished(iJob, cpp2::JobStatus::STOPPED);
}

/*
* Return: recovered job num.
* */
ErrorOr<nebula::cpp2::ErrorCode, JobID> JobManager::recoverJob() {
ErrorOr<nebula::cpp2::ErrorCode, uint32_t> JobManager::recoverJob(const std::string& spaceName) {
int32_t recoveredJobNum = 0;
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kvStore_->prefix(kDefaultSpaceId, kDefaultPartId, JobUtil::jobPrefix(), &iter);
Expand All @@ -531,6 +552,9 @@ ErrorOr<nebula::cpp2::ErrorCode, JobID> JobManager::recoverJob() {
auto optJobRet = JobDescription::makeJobDescription(iter->key(), iter->val());
if (nebula::ok(optJobRet)) {
auto optJob = nebula::value(optJobRet);
if (optJob.getParas().back() != spaceName) {
continue;
}
if (optJob.getStatus() == cpp2::JobStatus::QUEUE) {
// Check if the job exists
JobID jId = 0;
Expand Down
12 changes: 8 additions & 4 deletions src/meta/processors/job/JobManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
FRIEND_TEST(JobManagerTest, JobDeduplication);
FRIEND_TEST(JobManagerTest, loadJobDescription);
FRIEND_TEST(JobManagerTest, showJobs);
FRIEND_TEST(JobManagerTest, showJobsFromMultiSpace);
FRIEND_TEST(JobManagerTest, showJob);
FRIEND_TEST(JobManagerTest, showJobInOtherSpace);
FRIEND_TEST(JobManagerTest, recoverJob);
FRIEND_TEST(JobManagerTest, AddRebuildTagIndexJob);
FRIEND_TEST(JobManagerTest, AddRebuildEdgeIndexJob);
Expand Down Expand Up @@ -69,14 +71,16 @@ class JobManager : public nebula::cpp::NonCopyable, public nebula::cpp::NonMovab
*/
bool checkJobExist(const cpp2::AdminCmd& cmd, const std::vector<std::string>& paras, JobID& iJob);

ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> showJobs();
ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::JobDesc>> showJobs(
const std::string& spaceName);

ErrorOr<nebula::cpp2::ErrorCode, std::pair<cpp2::JobDesc, std::vector<cpp2::TaskDesc>>> showJob(
JobID iJob);
JobID iJob, const std::string& spaceName);

nebula::cpp2::ErrorCode stopJob(JobID iJob);
nebula::cpp2::ErrorCode stopJob(JobID iJob, const std::string& spaceName);

ErrorOr<nebula::cpp2::ErrorCode, JobID> recoverJob();
// return error/recovered job num
ErrorOr<nebula::cpp2::ErrorCode, uint32_t> recoverJob(const std::string& spaceName);

/**
* @brief persist job executed result, and do the cleanup
Expand Down
68 changes: 64 additions & 4 deletions src/meta/test/JobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ TEST_F(JobManagerTest, showJobs) {
jd2.setStatus(cpp2::JobStatus::FAILED);
jobMgr->addJob(jd2, adminClient_.get());

auto statusOrShowResult = jobMgr->showJobs();
auto statusOrShowResult = jobMgr->showJobs(paras1.back());
LOG(INFO) << "after show jobs";
ASSERT_TRUE(nebula::ok(statusOrShowResult));

Expand All @@ -273,6 +273,34 @@ TEST_F(JobManagerTest, showJobs) {
ASSERT_EQ(jobs[0].get_stop_time(), jd2.stopTime_);
}

TEST_F(JobManagerTest, showJobsFromMultiSpace) {
std::vector<std::string> paras1{"test_space"};
JobDescription jd1(1, cpp2::AdminCmd::COMPACT, paras1);
jd1.setStatus(cpp2::JobStatus::RUNNING);
jd1.setStatus(cpp2::JobStatus::FINISHED);
jobMgr->addJob(jd1, adminClient_.get());

std::vector<std::string> paras2{"test_space2"};
JobDescription jd2(2, cpp2::AdminCmd::FLUSH, paras2);
jd2.setStatus(cpp2::JobStatus::RUNNING);
jd2.setStatus(cpp2::JobStatus::FAILED);
jobMgr->addJob(jd2, adminClient_.get());

auto statusOrShowResult = jobMgr->showJobs(paras2.back());
LOG(INFO) << "after show jobs";
ASSERT_TRUE(nebula::ok(statusOrShowResult));

auto& jobs = nebula::value(statusOrShowResult);
ASSERT_EQ(jobs.size(), 1);

ASSERT_EQ(jobs[0].get_id(), jd2.id_);
ASSERT_EQ(jobs[0].get_cmd(), cpp2::AdminCmd::FLUSH);
ASSERT_EQ(jobs[0].get_paras()[0], "test_space2");
ASSERT_EQ(jobs[0].get_status(), cpp2::JobStatus::FAILED);
ASSERT_EQ(jobs[0].get_start_time(), jd2.startTime_);
ASSERT_EQ(jobs[0].get_stop_time(), jd2.stopTime_);
}

HostAddr toHost(std::string strIp) { return HostAddr(strIp, 0); }

TEST_F(JobManagerTest, showJob) {
Expand Down Expand Up @@ -300,7 +328,7 @@ TEST_F(JobManagerTest, showJob) {
jobMgr->save(td2.taskKey(), td2.taskVal());

LOG(INFO) << "before jobMgr->showJob";
auto showResult = jobMgr->showJob(iJob);
auto showResult = jobMgr->showJob(iJob, paras.back());
LOG(INFO) << "after jobMgr->showJob";
ASSERT_TRUE(nebula::ok(showResult));
auto& jobs = nebula::value(showResult).first;
Expand Down Expand Up @@ -328,16 +356,48 @@ TEST_F(JobManagerTest, showJob) {
ASSERT_EQ(tasks[1].get_stop_time(), td2.stopTime_);
}

TEST_F(JobManagerTest, showJobInOtherSpace) {
std::vector<std::string> paras{"test_space"};

JobDescription jd(1, cpp2::AdminCmd::COMPACT, paras);
jd.setStatus(cpp2::JobStatus::RUNNING);
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
jd.setStatus(cpp2::JobStatus::FINISHED);
jobMgr->addJob(jd, adminClient_.get());

int32_t iJob = jd.id_;
int32_t task1 = 0;
auto host1 = toHost("127.0.0.1");

TaskDescription td1(iJob, task1, host1);
td1.setStatus(cpp2::JobStatus::RUNNING);
SuperYoko marked this conversation as resolved.
Show resolved Hide resolved
td1.setStatus(cpp2::JobStatus::FINISHED);
jobMgr->save(td1.taskKey(), td1.taskVal());

int32_t task2 = 1;
auto host2 = toHost("127.0.0.1");
TaskDescription td2(iJob, task2, host2);
td2.setStatus(cpp2::JobStatus::RUNNING);
td2.setStatus(cpp2::JobStatus::FAILED);
jobMgr->save(td2.taskKey(), td2.taskVal());

LOG(INFO) << "before jobMgr->showJob";
std::string chosenSpace = "spaceWithNoJob";
auto showResult = jobMgr->showJob(iJob, chosenSpace);
LOG(INFO) << "after jobMgr->showJob";
ASSERT_TRUE(!nebula::ok(showResult));
}

TEST_F(JobManagerTest, recoverJob) {
// set status to prevent running the job since AdminClient is a injector
jobMgr->status_ = JobManager::JbmgrStatus::NOT_START;
auto spaceName = "test_space";
int32_t nJob = 3;
for (auto i = 0; i != nJob; ++i) {
JobDescription jd(i, cpp2::AdminCmd::FLUSH, {"test_space"});
JobDescription jd(i, cpp2::AdminCmd::FLUSH, {spaceName});
jobMgr->save(jd.jobKey(), jd.jobVal());
}

auto nJobRecovered = jobMgr->recoverJob();
auto nJobRecovered = jobMgr->recoverJob(spaceName);
ASSERT_EQ(nebula::value(nJobRecovered), 1);
}

Expand Down
Loading