Skip to content

Commit

Permalink
[Enhancement] Support disable table base compaction by time ranges (#…
Browse files Browse the repository at this point in the history
…50120)

Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo authored Oct 11, 2024
1 parent f56c992 commit 1c8e4b9
Show file tree
Hide file tree
Showing 32 changed files with 778 additions and 1 deletion.
1 change: 1 addition & 0 deletions be/src/agent/agent_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ using CloneAgentTaskRequest = AgentTaskRequestWithReqBody<TCloneReq>;
using StorageMediumMigrateTaskRequest = AgentTaskRequestWithReqBody<TStorageMediumMigrateReq>;
using CheckConsistencyTaskRequest = AgentTaskRequestWithReqBody<TCheckConsistencyReq>;
using CompactionTaskRequest = AgentTaskRequestWithReqBody<TCompactionReq>;
using CompactionControlTaskRequest = AgentTaskRequestWithReqBody<TCompactionControlReq>;
using UploadAgentTaskRequest = AgentTaskRequestWithReqBody<TUploadReq>;
using DownloadAgentTaskRequest = AgentTaskRequestWithReqBody<TDownloadReq>;
using SnapshotAgentTaskRequest = AgentTaskRequestWithReqBody<TSnapshotRequest>;
Expand Down
13 changes: 13 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class AgentServer::Impl {
std::unique_ptr<ThreadPool> _thread_pool_storage_medium_migrate;
std::unique_ptr<ThreadPool> _thread_pool_check_consistency;
std::unique_ptr<ThreadPool> _thread_pool_compaction;
std::unique_ptr<ThreadPool> _thread_pool_compaction_control;
std::unique_ptr<ThreadPool> _thread_pool_update_schema;

std::unique_ptr<ThreadPool> _thread_pool_upload;
Expand Down Expand Up @@ -214,6 +215,9 @@ void AgentServer::Impl::init_or_die() {
BUILD_DYNAMIC_TASK_THREAD_POOL("manual_compaction", 0, 1, std::numeric_limits<int>::max(),
_thread_pool_compaction);

BUILD_DYNAMIC_TASK_THREAD_POOL("compaction_control", 0, 1, std::numeric_limits<int>::max(),
_thread_pool_compaction_control);

BUILD_DYNAMIC_TASK_THREAD_POOL("update_schema", 0, config::update_schema_worker_count,
std::numeric_limits<int>::max(), _thread_pool_update_schema);

Expand Down Expand Up @@ -296,6 +300,7 @@ void AgentServer::Impl::stop() {
_thread_pool_storage_medium_migrate->shutdown();
_thread_pool_check_consistency->shutdown();
_thread_pool_compaction->shutdown();
_thread_pool_compaction_control->shutdown();
_thread_pool_update_schema->shutdown();
_thread_pool_upload->shutdown();
_thread_pool_download->shutdown();
Expand Down Expand Up @@ -368,6 +373,7 @@ void AgentServer::Impl::submit_tasks(TAgentResult& agent_result, const std::vect
HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE, storage_medium_migrate_req);
HANDLE_TYPE(TTaskType::CHECK_CONSISTENCY, check_consistency_req);
HANDLE_TYPE(TTaskType::COMPACTION, compaction_req);
HANDLE_TYPE(TTaskType::COMPACTION_CONTROL, compaction_control_req);
HANDLE_TYPE(TTaskType::UPLOAD, upload_req);
HANDLE_TYPE(TTaskType::UPDATE_SCHEMA, update_schema_req);
HANDLE_TYPE(TTaskType::DOWNLOAD, download_req);
Expand Down Expand Up @@ -480,6 +486,10 @@ void AgentServer::Impl::submit_tasks(TAgentResult& agent_result, const std::vect
HANDLE_TASK(TTaskType::COMPACTION, all_tasks, run_compaction_task, CompactionTaskRequest, compaction_req,
_exec_env);
break;
case TTaskType::COMPACTION_CONTROL:
HANDLE_TASK(TTaskType::COMPACTION_CONTROL, all_tasks, run_compaction_control_task,
CompactionControlTaskRequest, compaction_control_req, _exec_env);
break;
case TTaskType::UPDATE_SCHEMA:
HANDLE_TASK(TTaskType::UPDATE_SCHEMA, all_tasks, run_update_schema_task, UpdateSchemaTaskRequest,
update_schema_req, _exec_env);
Expand Down Expand Up @@ -675,6 +685,9 @@ ThreadPool* AgentServer::Impl::get_thread_pool(int type) const {
case TTaskType::COMPACTION:
ret = _thread_pool_compaction.get();
break;
case TTaskType::COMPACTION_CONTROL:
ret = _thread_pool_compaction_control.get();
break;
case TTaskType::UPDATE_SCHEMA:
ret = _thread_pool_update_schema.get();
break;
Expand Down
24 changes: 24 additions & 0 deletions be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "storage/task/engine_alter_tablet_task.h"
#include "storage/task/engine_checksum_task.h"
#include "storage/task/engine_clone_task.h"
#include "storage/task/engine_compaction_control_task.h"
#include "storage/task/engine_manual_compaction_task.h"
#include "storage/task/engine_storage_migration_task.h"
#include "storage/txn_manager.h"
Expand Down Expand Up @@ -551,6 +552,29 @@ void run_compaction_task(const std::shared_ptr<CompactionTaskRequest>& agent_tas
remove_task_info(agent_task_req->task_type, agent_task_req->signature);
}

void run_compaction_control_task(const std::shared_ptr<CompactionControlTaskRequest>& agent_task_req,
ExecEnv* exec_env) {
const TCompactionControlReq& compaction_req = agent_task_req->task_req;
TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
TStatus task_status;

EngineCompactionControlTask engine_task(compaction_req.table_to_disable_deadline);
(void)StorageEngine::instance()->execute_task(&engine_task);

task_status.__set_status_code(status_code);
task_status.__set_error_msgs(error_msgs);

TFinishTaskRequest finish_task_request;
finish_task_request.__set_backend(BackendOptions::get_localBackend());
finish_task_request.__set_task_type(agent_task_req->task_type);
finish_task_request.__set_signature(agent_task_req->signature);
finish_task_request.__set_task_status(task_status);

finish_task(finish_task_request);
remove_task_info(agent_task_req->task_type, agent_task_req->signature);
}

void run_update_schema_task(const std::shared_ptr<UpdateSchemaTaskRequest>& agent_task_req, ExecEnv* exec_env) {
const TUpdateSchemaReq& update_schema_req = agent_task_req->task_req;
TStatusCode::type status_code = TStatusCode::OK;
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/agent_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ void run_storage_medium_migrate_task(const std::shared_ptr<StorageMediumMigrateT
ExecEnv* exec_env);
void run_check_consistency_task(const std::shared_ptr<CheckConsistencyTaskRequest>& agent_task_req, ExecEnv* exec_env);
void run_compaction_task(const std::shared_ptr<CompactionTaskRequest>& agent_task_req, ExecEnv* exec_env);
void run_compaction_control_task(const std::shared_ptr<CompactionControlTaskRequest>& agent_task_req,
ExecEnv* exec_env);
void run_update_schema_task(const std::shared_ptr<UpdateSchemaTaskRequest>& agent_task_req, ExecEnv* exec_env);
void run_upload_task(const std::shared_ptr<UploadAgentTaskRequest>& agent_task_req, ExecEnv* exec_env);
void run_download_task(const std::shared_ptr<DownloadAgentTaskRequest>& agent_task_req, ExecEnv* exec_env);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ set(STORAGE_FILES
task/engine_checksum_task.cpp
task/engine_clone_task.cpp
task/engine_manual_compaction_task.cpp
task/engine_compaction_control_task.cpp
task/engine_storage_migration_task.cpp
task/engine_alter_tablet_task.cpp
aggregate_iterator.cpp
Expand Down
20 changes: 20 additions & 0 deletions be/src/storage/compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>
#include <thread>

#include "compaction_manager.h"
#include "storage/data_dir.h"
#include "util/starrocks_metrics.h"
#include "util/thread.h"
Expand Down Expand Up @@ -211,6 +212,19 @@ bool CompactionManager::_check_precondition(const CompactionCandidate& candidate
return false;
}

// check if the table base compaction is disabled
if (candidate.type == CompactionType::BASE_COMPACTION &&
_table_to_disable_deadline_map.find(tablet->tablet_meta()->table_id()) !=
_table_to_disable_deadline_map.end()) {
int64_t deadline = _table_to_disable_deadline_map[tablet->tablet_meta()->table_id()];
if (deadline > 0 && UnixSeconds() < deadline) {
VLOG(2) << "skip tablet:" << tablet->tablet_id() << " because table is disabled";
return false;
} else {
_table_to_disable_deadline_map.erase(tablet->tablet_meta()->table_id());
}
}

int64_t last_failure_ts = 0;
DataDir* data_dir = tablet->data_dir();
if (candidate.type == CUMULATIVE_COMPACTION) {
Expand Down Expand Up @@ -597,4 +611,10 @@ int CompactionManager::get_waiting_task_num() {
return _compaction_candidates.size();
}

void CompactionManager::disable_table_compaction(int64_t table_id, int64_t deadline) {
std::lock_guard lg(_candidates_mutex);
VLOG(2) << "disable table compaction, table_id:" << table_id << ", deadline:" << deadline;
_table_to_disable_deadline_map[table_id] = deadline;
}

} // namespace starrocks
3 changes: 3 additions & 0 deletions be/src/storage/compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class CompactionManager {

int get_waiting_task_num();

void disable_table_compaction(int64_t table_id, int64_t deadline);

private:
CompactionManager(const CompactionManager& compaction_manager) = delete;
CompactionManager(CompactionManager&& compaction_manager) = delete;
Expand All @@ -153,6 +155,7 @@ class CompactionManager {
std::unordered_map<DataDir*, uint16_t> _data_dir_to_cumulative_task_num_map;
std::unordered_map<DataDir*, uint16_t> _data_dir_to_base_task_num_map;
std::unordered_map<CompactionType, uint16_t> _type_to_task_num_map;
std::unordered_map<int64_t, int64_t> _table_to_disable_deadline_map;
std::unique_ptr<ThreadPool> _update_candidate_pool;
std::mutex _dispatch_mutex;
std::thread _dispatch_update_candidate_thread;
Expand Down
37 changes: 37 additions & 0 deletions be/src/storage/task/engine_compaction_control_task.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "storage/task/engine_compaction_control_task.h"

#include <memory>

#include "runtime/exec_env.h"
#include "storage/compaction_manager.h"
#include "storage/olap_define.h"
#include "storage/storage_engine.h"

namespace starrocks {

EngineCompactionControlTask::EngineCompactionControlTask(const std::map<TTableId, int64_t>& table_to_disable_deadline)
: _table_to_disable_deadline(std::move(table_to_disable_deadline)) {}

Status EngineCompactionControlTask::execute() {
CompactionManager* compaction_manager = StorageEngine::instance()->compaction_manager();
for (auto& entry : _table_to_disable_deadline) {
compaction_manager->disable_table_compaction(entry.first, entry.second);
}
return Status::OK();
}

} // namespace starrocks
37 changes: 37 additions & 0 deletions be/src/storage/task/engine_compaction_control_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "gen_cpp/AgentService_types.h"
#include "storage/olap_define.h"
#include "storage/task/engine_task.h"

namespace starrocks {

// base class for storage engine
// add "Engine" as task prefix to prevent duplicate name with agent task
class EngineCompactionControlTask : public EngineTask {
public:
Status execute() override;

EngineCompactionControlTask(const std::map<TTableId, int64_t>& table_to_disable_deadline);

~EngineCompactionControlTask() override = default;

private:
std::map<TTableId, int64_t> _table_to_disable_deadline;
};

} // namespace starrocks
54 changes: 54 additions & 0 deletions be/test/storage/compaction_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,60 @@ TEST_F(CompactionManagerTest, test_candidates_exceede) {
}
}

TEST_F(CompactionManagerTest, test_disable_compaction) {
std::vector<CompactionCandidate> candidates;
DataDir data_dir("./data_dir");
for (int i = 0; i < 10; i++) {
TabletSharedPtr tablet = std::make_shared<Tablet>();
TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
tablet_meta->set_tablet_id(i);
tablet->set_tablet_meta(tablet_meta);
tablet->set_data_dir(&data_dir);
tablet->set_tablet_state(TABLET_RUNNING);

CompactionCandidate candidate;
candidate.tablet = tablet;
candidate.score = i;
candidates.push_back(candidate);
}

std::random_device rd;
std::mt19937 g(rd());
std::shuffle(candidates.begin(), candidates.end(), g);

_engine->compaction_manager()->update_candidates(candidates);

{
ASSERT_EQ(10, _engine->compaction_manager()->candidates_size());

int64_t valid_condidates = 0;
while (true) {
CompactionCandidate candidate;
auto valid = _engine->compaction_manager()->pick_candidate(&candidate);
if (!valid) {
break;
}
++valid_condidates;
}
ASSERT_EQ(10, valid_condidates);
}

_engine->compaction_manager()->disable_table_compaction(0, UnixSeconds() + 3600);

{
int64_t valid_condidates = 0;
while (true) {
CompactionCandidate candidate;
auto valid = _engine->compaction_manager()->pick_candidate(&candidate);
if (!valid) {
break;
}
++valid_condidates;
}
ASSERT_EQ(0, valid_condidates);
}
}

class MockCompactionTask : public CompactionTask {
public:
MockCompactionTask() : CompactionTask(HORIZONTAL_COMPACTION) {}
Expand Down
7 changes: 7 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,13 @@ under the License.
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>

</dependencies>

<build>
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/alter/AlterJobExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,16 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE)) {
schemaChangeHandler.updateTableMeta(db, tableName.getTbl(), properties,
TTabletMetaType.ENABLE_LOAD_PROFILE);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BASE_COMPACTION_FORBIDDEN_TIME_RANGES)) {
try {
GlobalStateMgr.getCurrentState().getCompactionControlScheduler().updateTableForbiddenTimeRanges(
table.getId(), properties.get(PropertyAnalyzer.PROPERTIES_BASE_COMPACTION_FORBIDDEN_TIME_RANGES));
schemaChangeHandler.updateTableMeta(db, tableName.getTbl(), properties,
TTabletMetaType.BASE_COMPACTION_FORBIDDEN_TIME_RANGES);
} catch (Exception e) {
LOG.warn("Failed to update base compaction forbidden time ranges: ", e);
throw new DdlException("Failed to update base compaction forbidden time ranges: " + e.getMessage());
}
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,12 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (enableLoadProfile == olapTable.enableLoadProfile()) {
return;
}
} else if (metaType == TTabletMetaType.BASE_COMPACTION_FORBIDDEN_TIME_RANGES) {
String baseCompactionForbiddenTimeRanges = properties.get(
PropertyAnalyzer.PROPERTIES_BASE_COMPACTION_FORBIDDEN_TIME_RANGES);
if (baseCompactionForbiddenTimeRanges.equals(olapTable.getBaseCompactionForbiddenTimeRanges())) {
return;
}
} else if (metaType == TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC) {
int primaryIndexCacheExpireSec = Integer.parseInt(properties.get(
PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));
Expand Down
23 changes: 23 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2571,6 +2571,13 @@ public Long getMutableBucketNum() {
return (long) 0;
}

public String getBaseCompactionForbiddenTimeRanges() {
if (tableProperty != null) {
return tableProperty.getBaseCompactionForbiddenTimeRanges();
}
return "";
}

public void setAutomaticBucketSize(long bucketSize) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
Expand Down Expand Up @@ -2608,6 +2615,16 @@ public void setEnableLoadProfile(boolean enableLoadProfile) {
tableProperty.buildEnableLoadProfile();
}

public void setBaseCompactionForbiddenTimeRanges(String baseCompactionForbiddenTimeRanges) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty
.modifyTableProperties(PropertyAnalyzer.PROPERTIES_BASE_COMPACTION_FORBIDDEN_TIME_RANGES,
baseCompactionForbiddenTimeRanges);
tableProperty.buildBaseCompactionForbiddenTimeRanges();
}

public TWriteQuorumType writeQuorum() {
if (tableProperty != null) {
return tableProperty.writeQuorum();
Expand Down Expand Up @@ -3267,6 +3284,12 @@ public Map<String, String> getProperties() {
properties.put(PropertyAnalyzer.PROPERTIES_ENABLE_LOAD_PROFILE, "true");
}

// base compaction forbidden time ranges
if (!getBaseCompactionForbiddenTimeRanges().isEmpty()) {
properties.put(PropertyAnalyzer.PROPERTIES_BASE_COMPACTION_FORBIDDEN_TIME_RANGES,
getBaseCompactionForbiddenTimeRanges());
}

// locations
Multimap<String, String> locationsMap = getLocation();
if (locationsMap != null) {
Expand Down
Loading

0 comments on commit 1c8e4b9

Please sign in to comment.