Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: select from JOB_INFO should always in online mode #3963

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object JobInfoManager {
}

def getAllJobs(): List[JobInfo] = {
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)
// TODO: Reorder in output, use orderby desc if SQL supported
resultSetToJobs(rs).sortWith(_.getId > _.getId)
Expand All @@ -82,7 +82,7 @@ object JobInfoManager {
def getUnfinishedJobs(): List[JobInfo] = {
// TODO: Now we can not add index for `state` and run sql with
// s"SELECT * FROM $tableName WHERE state NOT IN (${JobInfo.FINAL_STATE.mkString(",")})"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)

val jobs = mutable.ArrayBuffer[JobInfo]()
Expand All @@ -99,7 +99,7 @@ object JobInfoManager {
}

def stopJob(jobId: Int): JobInfo = {
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)

val jobInfo = if (rs.getFetchSize == 0) {
Expand Down Expand Up @@ -131,7 +131,7 @@ object JobInfoManager {

def getJob(jobId: Int): Option[JobInfo] = {
// TODO: Require to get only one row, https://github.com/4paradigm/OpenMLDB/issues/704
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId"
val sql = s"SELECT * FROM $JOB_INFO_TABLE_NAME WHERE id = $jobId CONFIG (execute_mode = 'online')"
val rs = sqlExecutor.executeSQL(INTERNAL_DB_NAME, sql)

if (rs.getFetchSize == 0) {
Expand Down
10 changes: 5 additions & 5 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@
std::string meta_table = openmldb::nameserver::PRE_AGG_META_NAME;
std::string select_aggr_info =
absl::StrCat("select aggr_db, aggr_table from ", meta_db, ".", meta_table, " where base_table = '",
table_info->name(), "' and base_db='", table_info->db(), "';");
table_info->name(), "' and base_db='", table_info->db(), "' CONFIG (execute_mode = 'online');");
auto rs = ExecuteSQL("", select_aggr_info, true, true, 0, status);
WARN_NOT_OK_AND_RET(status, "get aggr info failed", false);
if (rs->Size() > 0) {
Expand Down Expand Up @@ -5143,7 +5143,7 @@
std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(int job_id,
::hybridse::sdk::Status* status) {
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO WHERE id = " + std::to_string(job_id);
std::string sql = absl::Substitute("SELECT * FROM JOB_INFO WHERE id = $0 CONFIG (execute_mode = 'online')", job_id);

Check warning on line 5146 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L5146

Added line #L5146 was not covered by tests

auto rs = ExecuteSQLParameterized(db, sql, {}, status);
if (!status->IsOK()) {
Expand All @@ -5164,7 +5164,7 @@

std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::GetJobResultSet(::hybridse::sdk::Status* status) {
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO";
std::string sql = "SELECT * FROM JOB_INFO CONFIG (execute_mode = 'online')";

Check warning on line 5167 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L5167

Added line #L5167 was not covered by tests
auto rs = ExecuteSQLParameterized(db, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), status);
if (!status->IsOK()) {
return {};
Expand All @@ -5187,7 +5187,7 @@
return this->GetJobResultSet(job_id, status);
}
std::string db = openmldb::nameserver::INTERNAL_DB;
std::string sql = "SELECT * FROM JOB_INFO;";
std::string sql = "SELECT * FROM JOB_INFO CONFIG (execute_mode = 'online');";

Check warning on line 5190 in src/sdk/sql_cluster_router.cc

View check run for this annotation

Codecov / codecov/patch

src/sdk/sql_cluster_router.cc#L5190

Added line #L5190 was not covered by tests
auto rs = ExecuteSQLParameterized(db, sql, {}, status);
if (!status->IsOK()) {
return {};
Expand Down Expand Up @@ -5226,7 +5226,7 @@
}

absl::StatusOr<bool> SQLClusterRouter::GetUser(const std::string& name, UserInfo* user_info) {
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME);
std::string sql = absl::StrCat("select * from ", nameserver::USER_INFO_NAME, " CONFIG (execute_mode = 'online')");
hybridse::sdk::Status status;
auto rs =
ExecuteSQLParameterized(nameserver::INTERNAL_DB, sql, std::shared_ptr<openmldb::sdk::SQLRequestRow>(), &status);
Expand Down
Loading