diff --git a/cases/plan/cmd.yaml b/cases/plan/cmd.yaml index 50b5fa94343..58eb872268f 100644 --- a/cases/plan/cmd.yaml +++ b/cases/plan/cmd.yaml @@ -649,6 +649,22 @@ cases: +-cmd_type: drop function +-if_exists: true +-args: [func1] + - id: truncate_stmt + desc: truncate + sql: TRUNCATE TABLE t1; + expect: + node_tree_str: | + +-node[CMD] + +-cmd_type: truncate table + +-args: [t1] + - id: truncate_stmt_db + desc: truncate + sql: TRUNCATE TABLE db1.t1; + expect: + node_tree_str: | + +-node[CMD] + +-cmd_type: truncate table + +-args: [db1, t1] - id: exit_stmt desc: exit statement sql: EXIT; diff --git a/docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md b/docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md new file mode 100644 index 00000000000..3bd9360d920 --- /dev/null +++ b/docs/en/reference/sql/ddl/TRUNCATE_TABLE_STATEMENT.md @@ -0,0 +1,16 @@ +# TRUNCATE TABLE + +``` +TRUNCATE TABLE table_name +``` + +`TRUNCATE TABLE` statement is used to clear the specified table. + +## Example: clear t1 + +```sql +TRUNCATE TABLE t1; +-- Truncate table t1? yes/no +-- yes +-- SUCCEED +``` \ No newline at end of file diff --git a/docs/en/reference/sql/ddl/index.rst b/docs/en/reference/sql/ddl/index.rst index dbc94cc1f3d..bff9db48fb0 100644 --- a/docs/en/reference/sql/ddl/index.rst +++ b/docs/en/reference/sql/ddl/index.rst @@ -24,3 +24,4 @@ Data Definition Statement (DDL) SHOW_FUNCTIONS DROP_FUNCTION SHOW_CREATE_TABLE_STATEMENT + TRUNCATE_TABLE_STATEMENT diff --git a/docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md b/docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md new file mode 100644 index 00000000000..8ffb623f26f --- /dev/null +++ b/docs/zh/openmldb_sql/ddl/TRUNCATE_TABLE_STATEMENT.md @@ -0,0 +1,16 @@ +# TRUNCATE TABLE + +``` +TRUNCATE TABLE table_name +``` + +`TRUNCATE TABLE`语句用清空指定的表。 + +## Example: 清空t1表 + +```sql +TRUNCATE TABLE t1; +-- Truncate table t1? yes/no +-- yes +-- SUCCEED +``` \ No newline at end of file diff --git a/docs/zh/openmldb_sql/ddl/index.rst b/docs/zh/openmldb_sql/ddl/index.rst index efd36734261..9e420def154 100644 --- a/docs/zh/openmldb_sql/ddl/index.rst +++ b/docs/zh/openmldb_sql/ddl/index.rst @@ -24,3 +24,4 @@ SHOW_FUNCTIONS DROP_FUNCTION SHOW_CREATE_TABLE_STATEMENT + TRUNCATE_TABLE_STATEMENT \ No newline at end of file diff --git a/hybridse/include/node/node_enum.h b/hybridse/include/node/node_enum.h index fc1dde18b07..baa3bdb2afe 100644 --- a/hybridse/include/node/node_enum.h +++ b/hybridse/include/node/node_enum.h @@ -285,6 +285,7 @@ enum CmdType { kCmdDropFunction, kCmdShowJobLog, kCmdShowCreateTable, + kCmdTruncate, kCmdFake, // not a real cmd, for testing purpose only kLastCmd = kCmdFake, }; diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index 3847366c148..a0e8e0bec8f 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -76,6 +76,7 @@ static absl::flat_hash_map CreateCmdTypeNamesMap() { {CmdType::kCmdDropFunction, "drop function"}, {CmdType::kCmdShowFunctions, "show functions"}, {CmdType::kCmdShowJobLog, "show joblog"}, + {CmdType::kCmdTruncate, "truncate table"}, }; for (auto kind = 0; kind < CmdType::kLastCmd; ++kind) { DCHECK(map.find(static_cast(kind)) != map.end()); diff --git a/hybridse/src/planv2/ast_node_converter.cc b/hybridse/src/planv2/ast_node_converter.cc index f2fa6fad4e2..2592c19fb99 100644 --- a/hybridse/src/planv2/ast_node_converter.cc +++ b/hybridse/src/planv2/ast_node_converter.cc @@ -611,6 +611,16 @@ base::Status ConvertStatement(const zetasql::ASTStatement* statement, node::Node *output = node; break; } + case zetasql::AST_TRUNCATE_STATEMENT: { + const zetasql::ASTTruncateStatement* truncate_statement = + statement->GetAsOrNull(); + std::vector names; + CHECK_STATUS(AstPathExpressionToStringList(truncate_statement->target_path(), names)); + auto node = + dynamic_cast(node_manager->MakeCmdNode(node::CmdType::kCmdTruncate, names)); + *output = node; + break; + } case zetasql::AST_DROP_FUNCTION_STATEMENT: { const zetasql::ASTDropFunctionStatement* drop_fun_statement = statement->GetAsOrNull(); diff --git a/src/base/status.h b/src/base/status.h index 4a4eb867724..a6854e287b6 100644 --- a/src/base/status.h +++ b/src/base/status.h @@ -93,6 +93,7 @@ enum ReturnCode { kExceedMaxMemory = 160, kInvalidArgs = 161, kCheckIndexFailed = 162, + kCatalogUpdateFailed = 163, kNameserverIsNotLeader = 300, kAutoFailoverIsEnabled = 301, kEndpointIsNotExist = 302, @@ -127,7 +128,10 @@ enum ReturnCode { kCheckParameterFailed = 331, kCreateProcedureFailedOnTablet = 332, kCreateFunctionFailedOnTablet = 333, - kOPAlreadyExists = 317, + kOPAlreadyExists = 334, + kOffsetMismatch = 335, + kGetTabletFailed = 336, + kTruncateTableFailed = 337, kReplicaClusterAliasDuplicate = 400, kConnectRelicaClusterZkFailed = 401, kNotSameReplicaName = 402, diff --git a/src/catalog/tablet_catalog.cc b/src/catalog/tablet_catalog.cc index cdf979167fc..233077f32fb 100644 --- a/src/catalog/tablet_catalog.cc +++ b/src/catalog/tablet_catalog.cc @@ -213,7 +213,7 @@ void TabletTableHandler::AddTable(std::shared_ptr<::openmldb::storage::Table> ta do { old_tables = std::atomic_load_explicit(&tables_, std::memory_order_acquire); new_tables = std::make_shared(*old_tables); - new_tables->emplace(table->GetPid(), table); + new_tables->insert_or_assign(table->GetPid(), table); } while (!atomic_compare_exchange_weak(&tables_, &old_tables, new_tables)); } diff --git a/src/client/ns_client.cc b/src/client/ns_client.cc index 1475d634bd0..2b3a4a4ad45 100644 --- a/src/client/ns_client.cc +++ b/src/client/ns_client.cc @@ -297,6 +297,19 @@ bool NsClient::DropTable(const std::string& db, const std::string& name, std::st return false; } +base::Status NsClient::TruncateTable(const std::string& db, const std::string& name) { + ::openmldb::nameserver::TruncateTableRequest request; + request.set_name(name); + request.set_db(db); + ::openmldb::nameserver::TruncateTableResponse response; + bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::TruncateTable, &request, &response, + FLAGS_request_timeout_ms, 1); + if (ok && response.code() == 0) { + return {}; + } + return {response.code(), response.msg()}; +} + bool NsClient::SyncTable(const std::string& name, const std::string& cluster_alias, uint32_t pid, std::string& msg) { ::openmldb::nameserver::SyncTableRequest request; request.set_name(name); diff --git a/src/client/ns_client.h b/src/client/ns_client.h index 503885dce48..467219f4ec8 100644 --- a/src/client/ns_client.h +++ b/src/client/ns_client.h @@ -112,6 +112,8 @@ class NsClient : public Client { bool DropTable(const std::string& db, const std::string& name, std::string& msg); // NOLINT + base::Status TruncateTable(const std::string& db, const std::string& name); + bool SyncTable(const std::string& name, const std::string& cluster_alias, uint32_t pid, std::string& msg); // NOLINT diff --git a/src/client/tablet_client.cc b/src/client/tablet_client.cc index 938a1b747d7..3a51a7e8f94 100644 --- a/src/client/tablet_client.cc +++ b/src/client/tablet_client.cc @@ -153,6 +153,20 @@ bool TabletClient::SQLBatchRequestQuery(const std::string& db, const std::string return true; } +base::Status TabletClient::TruncateTable(uint32_t tid, uint32_t pid) { + ::openmldb::api::TruncateTableRequest request; + ::openmldb::api::TruncateTableResponse response; + request.set_tid(tid); + request.set_pid(pid); + if (!client_.SendRequest(&::openmldb::api::TabletServer_Stub::TruncateTable, &request, &response, + FLAGS_request_timeout_ms, 1)) { + return {base::ReturnCode::kRPCError, "send request failed!"}; + } else if (response.code() == 0) { + return {}; + } + return {response.code(), response.msg()}; +} + base::Status TabletClient::CreateTable(const ::openmldb::api::TableMeta& table_meta) { ::openmldb::api::CreateTableRequest request; ::openmldb::api::TableMeta* table_meta_ptr = request.mutable_table_meta(); diff --git a/src/client/tablet_client.h b/src/client/tablet_client.h index f9dfd897361..ec3ab346cc7 100644 --- a/src/client/tablet_client.h +++ b/src/client/tablet_client.h @@ -56,6 +56,8 @@ class TabletClient : public Client { base::Status CreateTable(const ::openmldb::api::TableMeta& table_meta); + base::Status TruncateTable(uint32_t tid, uint32_t pid); + bool UpdateTableMetaForAddField(uint32_t tid, const std::vector& cols, const openmldb::common::VersionPair& pair, std::string& msg); // NOLINT diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 8f17d276be6..a1637e369d9 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -1078,6 +1078,47 @@ TEST_P(DBSDKTest, DeployWithBias) { ASSERT_TRUE(cs->GetNsClient()->DropDatabase(db, msg)); } +TEST_P(DBSDKTest, Truncate) { + auto cli = GetParam(); + sr = cli->sr; + std::string db_name = "test2"; + std::string table_name = "test1"; + std::string ddl = "create table test1 (c1 string, c2 int, c3 bigint, INDEX(KEY=c1, ts=c3));"; + ProcessSQLs(sr, { + "set @@execute_mode = 'online'", + absl::StrCat("create database ", db_name, ";"), + absl::StrCat("use ", db_name, ";"), + ddl, + }); + hybridse::sdk::Status status; + sr->ExecuteSQL(absl::StrCat("truncate table ", table_name, ";"), &status); + ASSERT_TRUE(status.IsOK()) << status.ToString(); + auto res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 0); + for (int i = 0; i < 10; i++) { + std::string key = absl::StrCat("key", i); + for (int j = 0; j < 10; j++) { + uint64_t ts = 1000 + j; + sr->ExecuteSQL(absl::StrCat("insert into ", table_name, " values ('", key, "', 11, ", ts, ");"), &status); + } + } + + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 100); + sr->ExecuteSQL(absl::StrCat("truncate table ", table_name, ";"), &status); + ASSERT_TRUE(status.IsOK()) << status.ToString(); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 0); + sr->ExecuteSQL(absl::StrCat("insert into ", table_name, " values ('aa', 11, 100);"), &status); + res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status); + ASSERT_EQ(res->Size(), 1); + ProcessSQLs(sr, { + absl::StrCat("use ", db_name, ";"), + absl::StrCat("drop table ", table_name), + absl::StrCat("drop database ", db_name), + }); +} + TEST_P(DBSDKTest, DeletetRange) { auto cli = GetParam(); sr = cli->sr; diff --git a/src/nameserver/name_server_impl.cc b/src/nameserver/name_server_impl.cc index c76054d622b..35d11f4d6ec 100644 --- a/src/nameserver/name_server_impl.cc +++ b/src/nameserver/name_server_impl.cc @@ -3716,6 +3716,69 @@ void NameServerImpl::CreateTable(RpcController* controller, const CreateTableReq } } +void NameServerImpl::TruncateTable(RpcController* controller, const TruncateTableRequest* request, + TruncateTableResponse* response, Closure* done) { + brpc::ClosureGuard done_guard(done); + const std::string& db = request->db(); + const std::string& name = request->name(); + std::shared_ptr<::openmldb::nameserver::TableInfo> table_info; + { + std::lock_guard lock(mu_); + if (!GetTableInfoUnlock(request->name(), request->db(), &table_info)) { + PDLOG(WARNING, "table[%s] does not exist in db [%s]", name.c_str(), db.c_str()); + response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); + response->set_msg("table does not exist"); + return; + } + if (IsExistActiveOp(db, name)) { + PDLOG(WARNING, "there is active op. db [%s] name [%s]", db.c_str(), name.c_str()); + response->set_code(::openmldb::base::ReturnCode::kOPAlreadyExists); + response->set_msg("there is active op"); + return; + } + } + uint32_t tid = table_info->tid(); + for (const auto& partition : table_info->table_partition()) { + uint32_t offset = 0; + for (const auto& partition_meta : partition.partition_meta()) { + if (partition_meta.offset() != offset) { + if (offset == 0) { + offset = partition_meta.offset(); + } else { + PDLOG(WARNING, "table[%s] partition [%d] offset mismatch", name.c_str(), partition.pid()); + response->set_code(::openmldb::base::ReturnCode::kOffsetMismatch); + response->set_msg("partition offset mismatch"); + return; + } + } + } + } + for (const auto& partition : table_info->table_partition()) { + uint32_t pid = partition.pid(); + for (const auto& partition_meta : partition.partition_meta()) { + const auto& endpoint = partition_meta.endpoint(); + auto tablet_ptr = GetTablet(endpoint); + if (!tablet_ptr) { + PDLOG(WARNING, "endpoint[%s] can not find client", endpoint.c_str()); + response->set_code(::openmldb::base::ReturnCode::kGetTabletFailed); + response->set_msg("fail to get client, endpint " + endpoint); + return; + } + auto status = tablet_ptr->client_->TruncateTable(tid, pid); + if (!status.OK()) { + PDLOG(WARNING, "truncate failed, tid[%u] pid[%u] endpoint[%s] msg [%s]", + tid, pid, endpoint.c_str(), status.GetMsg().c_str()); + response->set_code(::openmldb::base::ReturnCode::kTruncateTableFailed); + response->set_msg(status.GetMsg()); + return; + } + } + } + PDLOG(INFO, "truncate success, db[%s] name[%s]", db.c_str(), name.c_str()); + response->set_code(::openmldb::base::ReturnCode::kOk); + response->set_msg("ok"); +} + bool NameServerImpl::SaveTableInfo(std::shared_ptr table_info) { std::string table_value; table_info->SerializeToString(&table_value); @@ -10571,5 +10634,26 @@ bool NameServerImpl::IsExistActiveOp(const std::string& db, const std::string& n return false; } +bool NameServerImpl::IsExistActiveOp(const std::string& db, const std::string& name) { + for (const auto& op_list : task_vec_) { + if (op_list.empty()) { + continue; + } + for (const auto& op_data : op_list) { + if (!db.empty() && op_data->op_info_.db() != db) { + continue; + } + if (!name.empty() && op_data->op_info_.name() != name) { + continue; + } + if (op_data->op_info_.task_status() == api::TaskStatus::kInited || + op_data->op_info_.task_status() == api::TaskStatus::kDoing) { + return true; + } + } + } + return false; +} + } // namespace nameserver } // namespace openmldb diff --git a/src/nameserver/name_server_impl.h b/src/nameserver/name_server_impl.h index b9755c4aa1c..593c0bb536f 100644 --- a/src/nameserver/name_server_impl.h +++ b/src/nameserver/name_server_impl.h @@ -165,6 +165,9 @@ class NameServerImpl : public NameServer { void DropTable(RpcController* controller, const DropTableRequest* request, GeneralResponse* response, Closure* done); + void TruncateTable(RpcController* controller, const TruncateTableRequest* request, + TruncateTableResponse* response, Closure* done); + void AddTableField(RpcController* controller, const AddTableFieldRequest* request, GeneralResponse* response, Closure* done); @@ -688,6 +691,7 @@ class NameServerImpl : public NameServer { bool IsExistDataBase(const std::string& db); bool IsExistActiveOp(const std::string& db, const std::string& name, api::OPType op_type); + bool IsExistActiveOp(const std::string& db, const std::string& name); private: std::mutex mu_; diff --git a/src/proto/name_server.proto b/src/proto/name_server.proto index 08383b4f7c0..219b83a0b73 100755 --- a/src/proto/name_server.proto +++ b/src/proto/name_server.proto @@ -121,6 +121,16 @@ message DropTableRequest { optional string db = 4 [default = ""]; } +message TruncateTableRequest { + optional string name = 1; + optional string db = 2; +} + +message TruncateTableResponse { + optional int32 code = 1; + optional string msg = 2; +} + message LoadTableRequest { optional string name = 1; optional string endpoint = 2; @@ -531,6 +541,7 @@ message DeploySQLResponse { service NameServer { rpc CreateTable(CreateTableRequest) returns (GeneralResponse); rpc DropTable(DropTableRequest) returns (GeneralResponse); + rpc TruncateTable(TruncateTableRequest) returns (TruncateTableResponse); rpc ShowTablet(ShowTabletRequest) returns (ShowTabletResponse); rpc ShowTable(ShowTableRequest) returns (ShowTableResponse); rpc MakeSnapshotNS(MakeSnapshotNSRequest) returns (GeneralResponse); diff --git a/src/proto/tablet.proto b/src/proto/tablet.proto index 0938c9d965c..ee0ec5beae1 100755 --- a/src/proto/tablet.proto +++ b/src/proto/tablet.proto @@ -363,6 +363,16 @@ message DropTableResponse { optional string msg = 2; } +message TruncateTableRequest { + optional int32 tid = 1; + optional int32 pid = 2; +} + +message TruncateTableResponse { + optional int32 code = 1; + optional string msg = 2; +} + message GetTableSchemaRequest { optional int32 tid = 1; optional int32 pid = 2; @@ -905,6 +915,7 @@ service TabletServer { rpc CreateTable(CreateTableRequest) returns (CreateTableResponse); rpc LoadTable(LoadTableRequest) returns (GeneralResponse); rpc DropTable(DropTableRequest) returns (DropTableResponse); + rpc TruncateTable(TruncateTableRequest) returns (TruncateTableResponse); rpc GetTableStatus(GetTableStatusRequest) returns (GetTableStatusResponse); rpc GetTableSchema(GetTableSchemaRequest) returns (GetTableSchemaResponse); rpc GetTableFollower(GetTableFollowerRequest) returns (GetTableFollowerResponse); diff --git a/src/sdk/interactive.h b/src/sdk/interactive.h new file mode 100644 index 00000000000..c4480da9bc7 --- /dev/null +++ b/src/sdk/interactive.h @@ -0,0 +1,144 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_SDK_INTERACTIVE_H_ +#define SRC_SDK_INTERACTIVE_H_ + +#include +#include + +#include "base/status.h" + +namespace openmldb { +namespace sdk { + +inline const std::string DROP_TABLE_MSG = + "DROP TABLE is a dangerous operation. Once deleted, it is very difficult to recover. \n" + "You may also note that: \n" + "- If a snapshot of a partition is being generated while dropping a table, " + "the partition will not be deleted successfully.\n" + "- By default, the deleted data is moved to the folder `recycle`.\n" + "Please refer to this link for more details: " + base::NOTICE_URL; + +inline const std::string DROP_DEPLOYMENT_MSG = + "- DROP DEPLOYMENT will not delete the index that is created automatically.\n" + "- DROP DEPLOYMENT will not delete data in the pre-aggregation table in the long window setting."; + +inline const std::string DROP_INDEX_MSG = + "DROP INDEX is a dangerous operation. Once deleted, it is very difficult to recover.\n" + "You may also note that: \n" + "- You have to wait for 2 garbage collection intervals (gc_interval) to create the same index.\n" + "- The index will not be deleted immediately, " + "it remains until after 2 garbage collection intervals.\n" + "Please refer to the doc for more details: " + base::NOTICE_URL; + +inline const std::string DROP_FUNCTION_MSG = + "This will lead to execution failure or system crash " + "if any active deployment is using the function."; + +enum class CmdType { + kDrop = 1, + kTruncate = 2, +}; + +enum class TargetType { + kTable = 1, + kDeployment = 2, + kIndex = 3, + kFunction = 4, + kProcedure = 5, +}; + +class InteractiveValidator { + public: + InteractiveValidator() = default; + explicit InteractiveValidator(bool interactive) : interactive_(interactive) {} + + bool Interactive() { return interactive_; } + void SetInteractive(bool interactive) { interactive_ = interactive; } + + bool Check(CmdType cmd_type, TargetType target, const std::string& name) { + if (!interactive_) { + return true; + } + std::string msg; + if (cmd_type == CmdType::kDrop) { + switch (target) { + case TargetType::kTable: + msg = DROP_TABLE_MSG; + break; + case TargetType::kDeployment: + msg = DROP_DEPLOYMENT_MSG; + break; + case TargetType::kIndex: + msg = DROP_INDEX_MSG; + break; + case TargetType::kFunction: + msg = DROP_FUNCTION_MSG; + break; + default: + break; + } + } + if (!msg.empty()) { + printf("%s\n", msg.c_str()); + } + std::string cmd_str = CmdType2Str(cmd_type); + std::string target_str = TargetType2Str(target); + printf("%s %s %s? yes/no\n", cmd_str.c_str(), target_str.c_str(), name.c_str()); + std::string input; + std::cin >> input; + std::transform(input.begin(), input.end(), input.begin(), ::tolower); + if (input != "yes") { + printf("'%s %s' cmd is canceled!\n", cmd_str.c_str(), name.c_str()); + return false; + } + return true; + } + + private: + std::string CmdType2Str(CmdType type) { + if (type == CmdType::kDrop) { + return "Drop"; + } else { + return "Truncate"; + } + } + + std::string TargetType2Str(TargetType type) { + switch (type) { + case TargetType::kTable: + return "table"; + case TargetType::kDeployment: + return "deployment"; + case TargetType::kIndex: + return "index"; + case TargetType::kFunction: + return "function"; + default: + return ""; + } + return ""; + } + + private: + bool interactive_ = false; +}; + +} // namespace sdk +} // namespace openmldb + +#endif // SRC_SDK_INTERACTIVE_H_ diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 2556eac681e..d838870d65b 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -210,7 +210,7 @@ class BatchQueryFutureImpl : public QueryFuture { SQLClusterRouter::SQLClusterRouter(const SQLRouterOptions& options) : options_(std::make_shared(options)), is_cluster_mode_(true), - interactive_(false), + interactive_validator_(), cluster_sdk_(nullptr), mu_(), rand_(::baidu::common::timer::now_time()) {} @@ -218,7 +218,7 @@ SQLClusterRouter::SQLClusterRouter(const SQLRouterOptions& options) SQLClusterRouter::SQLClusterRouter(const StandaloneOptions& options) : options_(std::make_shared(options)), is_cluster_mode_(false), - interactive_(false), + interactive_validator_(), cluster_sdk_(nullptr), mu_(), rand_(::baidu::common::timer::now_time()) {} @@ -226,7 +226,7 @@ SQLClusterRouter::SQLClusterRouter(const StandaloneOptions& options) SQLClusterRouter::SQLClusterRouter(DBSDK* sdk) : options_(), is_cluster_mode_(sdk->IsClusterMode()), - interactive_(false), + interactive_validator_(), cluster_sdk_(sdk), mu_(), rand_(::baidu::common::timer::now_time()) { @@ -1818,7 +1818,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } case hybridse::node::kCmdDropFunction: { std::string name = cmd_node->GetArgs()[0]; - if (!CheckAnswerIfInteractive("function", name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kFunction, name)) { return {}; } auto base_status = ns_ptr->DropFunction(name, cmd_node->IsIfExists()); @@ -1874,7 +1874,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h return {}; } std::string sp_name = cmd_node->GetArgs()[0]; - if (!CheckAnswerIfInteractive("procedure", sp_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kProcedure, sp_name)) { return {}; } if (ns_ptr->DropProcedure(db, sp_name, msg)) { @@ -1944,7 +1944,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, sp ? "not a deployment" : "deployment not found"}; return {}; } - if (!CheckAnswerIfInteractive("deployment", deploy_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kDeployment, deploy_name)) { return {}; } if (ns_ptr->DropProcedure(db_name, deploy_name, msg)) { @@ -2021,15 +2021,11 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {}; std::string db_name = db; std::string table_name; - if (cmd_node->GetArgs().size() == 2) { - db_name = cmd_node->GetArgs()[0]; - table_name = cmd_node->GetArgs()[1]; - } else if (cmd_node->GetArgs().size() == 1) { - table_name = cmd_node->GetArgs()[0]; - } else { - *status = {StatusCode::kCmdError, "Invalid Cmd Args size"}; + if (!ParseNamesFromArgs(db, cmd_node->GetArgs(), &db_name, &table_name).IsOK()) { + *status = {StatusCode::kCmdError, msg}; + return {}; } - if (!CheckAnswerIfInteractive("table", table_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kTable, table_name)) { return {}; } if (DropTable(db_name, table_name, cmd_node->IsIfExists(), status)) { @@ -2037,6 +2033,23 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h } return {}; } + case hybridse::node::kCmdTruncate: { + *status = {}; + std::string db_name; + std::string table_name; + if (!ParseNamesFromArgs(db, cmd_node->GetArgs(), &db_name, &table_name).IsOK()) { + *status = {StatusCode::kCmdError, msg}; + return {}; + } + if (!interactive_validator_.Check(CmdType::kTruncate, TargetType::kTable, table_name)) { + return {}; + } + auto base_status = ns_ptr->TruncateTable(db_name, table_name); + if (!base_status.OK()) { + *status = {StatusCode::kCmdError, base_status.GetMsg()}; + } + return {}; + } case hybridse::node::kCmdDropIndex: { std::string db_name = db; std::string table_name; @@ -2052,7 +2065,7 @@ std::shared_ptr SQLClusterRouter::HandleSQLCmd(const h *status = {StatusCode::kCmdError, "Invalid Cmd Args size"}; return {}; } - if (!CheckAnswerIfInteractive("index", index_name + " on " + table_name)) { + if (!interactive_validator_.Check(CmdType::kDrop, TargetType::kIndex, index_name + " on " + table_name)) { return {}; } ret = ns_ptr->DeleteIndex(db_name, table_name, index_name, msg); @@ -2114,7 +2127,7 @@ base::Status SQLClusterRouter::HandleSQLCreateTable(hybridse::node::CreatePlanNo if (!ns_ptr->CreateTable(table_info, create_node->GetIfNotExist(), msg)) { return base::Status(base::ReturnCode::kSQLCmdRunError, msg); } - if (interactive_ && table_info.column_key_size() == 0) { + if (interactive_validator_.Interactive() && table_info.column_key_size() == 0) { return base::Status{base::ReturnCode::kOk, "As there is no index specified, a default index type `absolute 0` will be created. " "The data attached to the index will never expire to be deleted. " @@ -3078,10 +3091,8 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod } ::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& db, - const std::vector& args, std::string* db_name, - std::string* name) { + const std::vector& args, std::string* db_name, std::string* name) { if (args.size() == 1) { - // only sp name, no db_name if (db.empty()) { return {StatusCode::kCmdError, "Please enter database first"}; } @@ -3096,44 +3107,6 @@ ::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& return {}; } -bool SQLClusterRouter::CheckAnswerIfInteractive(const std::string& drop_type, const std::string& name) { - if (interactive_) { - std::string msg; - if (drop_type == "table") { - msg = "DROP TABLE is a dangerous operation. Once deleted, it is very difficult to recover. \n" - "You may also note that: \n" - "- If a snapshot of a partition is being generated while dropping a table, " - "the partition will not be deleted successfully.\n" - "- By default, the deleted data is moved to the folder `recycle`.\n" - "Please refer to this link for more details: " + base::NOTICE_URL; - } else if (drop_type == "deployment") { - msg = "- DROP DEPLOYMENT will not delete the index that is created automatically.\n" - "- DROP DEPLOYMENT will not delete data in the pre-aggregation table in the long window setting."; - } else if (drop_type == "index") { - msg = "DROP INDEX is a dangerous operation. Once deleted, it is very difficult to recover.\n" - "You may also note that: \n" - "- You have to wait for 2 garbage collection intervals (gc_interval) to create the same index.\n" - "- The index will not be deleted immediately, " - "it remains until after 2 garbage collection intervals.\n" - "Please refer to the doc for more details: " + base::NOTICE_URL; - } else if (drop_type == "function") { - msg = "This will lead to execution failure or system crash if any active deployment is using the function."; - } - if (!msg.empty()) { - printf("%s\n", msg.c_str()); - } - printf("Drop %s %s? yes/no\n", drop_type.c_str(), name.c_str()); - std::string input; - std::cin >> input; - std::transform(input.begin(), input.end(), input.begin(), ::tolower); - if (input != "yes") { - printf("'Drop %s' cmd is canceled!\n", name.c_str()); - return false; - } - } - return true; -} - std::string SQLClusterRouter::GetDatabase() { std::lock_guard<::openmldb::base::SpinMutex> lock(mu_); return db_; @@ -3144,7 +3117,7 @@ void SQLClusterRouter::SetDatabase(const std::string& db) { db_ = db; } -void SQLClusterRouter::SetInteractive(bool value) { interactive_ = value; } +void SQLClusterRouter::SetInteractive(bool value) { interactive_validator_.SetInteractive(value); } ::openmldb::base::Status SQLClusterRouter::SaveResultSet(const std::string& file_path, const std::shared_ptr& options_map, diff --git a/src/sdk/sql_cluster_router.h b/src/sdk/sql_cluster_router.h index d2e6b52b790..f5661c9a1bb 100644 --- a/src/sdk/sql_cluster_router.h +++ b/src/sdk/sql_cluster_router.h @@ -34,6 +34,7 @@ #include "nameserver/system_table.h" #include "sdk/db_sdk.h" #include "sdk/file_option_parser.h" +#include "sdk/interactive.h" #include "sdk/sql_cache.h" #include "sdk/sql_router.h" #include "sdk/table_reader_impl.h" @@ -421,7 +422,7 @@ class SQLClusterRouter : public SQLRouter { std::string db_; std::map session_variables_; bool is_cluster_mode_; - bool interactive_; + InteractiveValidator interactive_validator_; DBSDK* cluster_sdk_; std::map>>> input_lru_cache_; diff --git a/src/storage/aggregator.h b/src/storage/aggregator.h index 035b126518a..373e23633cd 100644 --- a/src/storage/aggregator.h +++ b/src/storage/aggregator.h @@ -157,6 +157,8 @@ class Aggregator { // set the filter column info that not initialized in constructor bool SetFilter(absl::string_view filter_col); + std::shared_ptr GetAggTable() { return aggr_table_; } + protected: codec::Schema base_table_schema_; diff --git a/src/storage/disk_table.cc b/src/storage/disk_table.cc index 8484eee1315..b41c9f8fd3c 100644 --- a/src/storage/disk_table.cc +++ b/src/storage/disk_table.cc @@ -363,6 +363,48 @@ bool DiskTable::Get(uint32_t idx, const std::string& pk, uint64_t ts, std::strin bool DiskTable::Get(const std::string& pk, uint64_t ts, std::string& value) { return Get(0, pk, ts, value); } +base::Status DiskTable::Truncate() { + const rocksdb::Snapshot* snapshot = db_->GetSnapshot(); + absl::Cleanup release_snapshot = [this, snapshot] { this->db_->ReleaseSnapshot(snapshot); }; + rocksdb::ReadOptions ro = rocksdb::ReadOptions(); + ro.snapshot = snapshot; + ro.prefix_same_as_start = true; + ro.pin_data = true; + rocksdb::WriteBatch batch; + for (const auto& inner_index : *(table_index_.GetAllInnerIndex())) { + uint32_t idx = inner_index->GetId(); + std::unique_ptr it(db_->NewIterator(ro, cf_hs_[idx + 1])); + it->SeekToFirst(); + if (it->Valid()) { + std::string start_key(it->key().data(), it->key().size()); + it->SeekToLast(); + if (it->Valid()) { + rocksdb::Slice cur_pk; + uint64_t ts = 0; + uint32_t ts_idx = 0; + const auto& indexs = inner_index->GetIndex(); + std::string end_key; + if (indexs.size() > 1) { + ParseKeyAndTs(true, it->key(), &cur_pk, &ts, &ts_idx); + end_key = CombineKeyTs(cur_pk, 0, ts_idx); + } else { + ParseKeyAndTs(false, it->key(), &cur_pk, &ts, &ts_idx); + end_key = CombineKeyTs(cur_pk, 0); + } + PDLOG(INFO, "delete range. start key %s end key %s inner idx %u tid %u pid %u", + start_key.c_str(), end_key.c_str(), idx, id_, pid_); + batch.DeleteRange(cf_hs_[idx + 1], rocksdb::Slice(start_key), rocksdb::Slice(end_key)); + } + } + } + rocksdb::Status s = db_->Write(write_opts_, &batch); + if (!s.ok()) { + PDLOG(WARNING, "delete failed, tid %u pid %u msg %s", id_, pid_, s.ToString().c_str()); + return {-1, s.ToString()}; + } + return {}; +} + void DiskTable::SchedGc() { GcHead(); UpdateTTL(); diff --git a/src/storage/disk_table.h b/src/storage/disk_table.h index 8c2c5d3a71a..be549d0c2cd 100644 --- a/src/storage/disk_table.h +++ b/src/storage/disk_table.h @@ -181,6 +181,8 @@ class DiskTable : public Table { bool Delete(const ::openmldb::api::LogEntry& entry) override; + base::Status Truncate(); + bool Delete(uint32_t idx, const std::string& pk, const std::optional& start_ts, const std::optional& end_ts) override; diff --git a/src/storage/mem_table_snapshot.cc b/src/storage/mem_table_snapshot.cc index 3eaacd3e2ac..2b085df59be 100644 --- a/src/storage/mem_table_snapshot.cc +++ b/src/storage/mem_table_snapshot.cc @@ -1041,5 +1041,39 @@ ::openmldb::base::Status MemTableSnapshot::ExtractIndexData(const std::shared_pt return status; } +int MemTableSnapshot::Truncate(uint64_t offset, uint64_t term) { + if (making_snapshot_.load(std::memory_order_acquire)) { + PDLOG(INFO, "snapshot is doing now!"); + return -1; + } + if (offset < offset_) { + PDLOG(WARNING, "end_offset %lu less than offset_ %lu, do nothing", offset, offset_); + return -1; + } + making_snapshot_.store(true, std::memory_order_release); + absl::Cleanup clean = [this] { + this->making_snapshot_.store(false, std::memory_order_release); + this->delete_collector_.Clear(); + }; + MemSnapshotMeta snapshot_meta(GenSnapshotName(), snapshot_path_, FLAGS_snapshot_compression); + snapshot_meta.term = term; + snapshot_meta.count = 0; + snapshot_meta.offset = offset; + auto wh = ::openmldb::log::CreateWriteHandle(FLAGS_snapshot_compression, + snapshot_meta.snapshot_name, snapshot_meta.tmp_file_path); + if (!wh) { + PDLOG(WARNING, "fail to create file %s", snapshot_meta.tmp_file_path.c_str()); + return -1; + } + wh->EndLog(); + wh.reset(); + auto status = WriteSnapshot(snapshot_meta); + if (!status.OK()) { + PDLOG(WARNING, "write snapshot failed. tid %u pid %u msg is %s ", tid_, pid_, status.GetMsg().c_str()); + return -1; + } + return 0; +} + } // namespace storage } // namespace openmldb diff --git a/src/storage/mem_table_snapshot.h b/src/storage/mem_table_snapshot.h index caad8fd182c..54da3a8c8c4 100644 --- a/src/storage/mem_table_snapshot.h +++ b/src/storage/mem_table_snapshot.h @@ -192,6 +192,8 @@ class MemTableSnapshot : public Snapshot { int CheckDeleteAndUpdate(std::shared_ptr
table, ::openmldb::api::LogEntry* new_entry); + int Truncate(uint64_t offset, uint64_t term); + private: // load single snapshot to table void RecoverSingleSnapshot(const std::string& path, std::shared_ptr
table, std::atomic* g_succ_cnt, diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 16958e3aeb2..ee0150d4548 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -2412,7 +2412,7 @@ void TabletImpl::SetExpire(RpcController* controller, const ::openmldb::api::Set } void TabletImpl::MakeSnapshotInternal(uint32_t tid, uint32_t pid, uint64_t end_offset, - std::shared_ptr<::openmldb::api::TaskInfo> task) { + std::shared_ptr<::openmldb::api::TaskInfo> task, bool is_force) { PDLOG(INFO, "MakeSnapshotInternal begin, tid[%u] pid[%u]", tid, pid); std::shared_ptr
table; std::shared_ptr snapshot; @@ -2455,7 +2455,7 @@ void TabletImpl::MakeSnapshotInternal(uint32_t tid, uint32_t pid, uint64_t end_o uint64_t cur_offset = replicator->GetOffset(); uint64_t snapshot_offset = snapshot->GetOffset(); int ret = 0; - if (cur_offset < snapshot_offset + FLAGS_make_snapshot_threshold_offset && end_offset == 0) { + if (!is_force && cur_offset < snapshot_offset + FLAGS_make_snapshot_threshold_offset && end_offset == 0) { PDLOG(INFO, "offset can't reach the threshold. tid[%u] pid[%u] " "cur_offset[%lu], snapshot_offset[%lu] end_offset[%lu]", @@ -2528,7 +2528,7 @@ void TabletImpl::MakeSnapshot(RpcController* controller, const ::openmldb::api:: break; } } - snapshot_pool_.AddTask(boost::bind(&TabletImpl::MakeSnapshotInternal, this, tid, pid, offset, task_ptr)); + snapshot_pool_.AddTask(boost::bind(&TabletImpl::MakeSnapshotInternal, this, tid, pid, offset, task_ptr, false)); response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); return; @@ -2562,7 +2562,7 @@ void TabletImpl::SchedMakeSnapshot() { } for (auto iter = table_set.begin(); iter != table_set.end(); ++iter) { PDLOG(INFO, "start make snapshot tid[%u] pid[%u]", iter->first, iter->second); - MakeSnapshotInternal(iter->first, iter->second, 0, std::shared_ptr<::openmldb::api::TaskInfo>()); + MakeSnapshotInternal(iter->first, iter->second, 0, std::shared_ptr<::openmldb::api::TaskInfo>(), false); } // delay task one hour later avoid execute more than one time snapshot_pool_.DelayTask(FLAGS_make_snapshot_check_interval + 60 * 60 * 1000, @@ -3389,6 +3389,110 @@ void TabletImpl::CreateTable(RpcController* controller, const ::openmldb::api::C response->set_msg("ok"); } +void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api::TruncateTableRequest* request, + ::openmldb::api::TruncateTableResponse* response, Closure* done) { + brpc::ClosureGuard done_guard(done); + uint32_t tid = request->tid(); + uint32_t pid = request->pid(); + if (auto status = TruncateTableInternal(tid, pid); !status.OK()) { + base::SetResponseStatus(status, response); + return; + } + auto aggrs = GetAggregators(tid, pid); + if (aggrs) { + for (const auto& aggr : *aggrs) { + auto agg_table = aggr->GetAggTable(); + if (!agg_table) { + PDLOG(WARNING, "aggrate table does not exist. tid[%u] pid[%u] index pos[%u]", + tid, pid, aggr->GetIndexPos()); + response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist); + response->set_msg("aggrate table does not exist"); + return; + } + uint32_t agg_tid = agg_table->GetId(); + uint32_t agg_pid = agg_table->GetPid(); + if (auto status = TruncateTableInternal(agg_tid, agg_pid); !status.OK()) { + PDLOG(WARNING, "truncate aggrate table failed. tid[%u] pid[%u] index pos[%u]", + agg_tid, agg_pid, aggr->GetIndexPos()); + base::SetResponseStatus(status, response); + return; + } + PDLOG(INFO, "truncate aggrate table success. tid[%u] pid[%u] index pos[%u]", + agg_tid, agg_pid, aggr->GetIndexPos()); + } + } + response->set_code(::openmldb::base::ReturnCode::kOk); + response->set_msg("ok"); +} + +base::Status TabletImpl::TruncateTableInternal(uint32_t tid, uint32_t pid) { + std::shared_ptr
table; + std::shared_ptr snapshot; + std::shared_ptr replicator; + { + std::lock_guard spin_lock(spin_mutex_); + table = GetTableUnLock(tid, pid); + if (!table) { + DEBUGLOG("table does not exist. tid %u pid %u", tid, pid); + return {::openmldb::base::ReturnCode::kTableIsNotExist, "table not found"}; + } + snapshot = GetSnapshotUnLock(tid, pid); + if (!snapshot) { + PDLOG(WARNING, "snapshot does not exist. tid[%u] pid[%u]", tid, pid); + return {::openmldb::base::ReturnCode::kSnapshotIsNotExist, "snapshot not found"}; + } + replicator = GetReplicatorUnLock(tid, pid); + if (!replicator) { + PDLOG(WARNING, "replicator does not exist. tid[%u] pid[%u]", tid, pid); + return {::openmldb::base::ReturnCode::kReplicatorIsNotExist, "replicator not found"}; + } + } + if (replicator->GetOffset() == 0) { + PDLOG(INFO, "table is empty, truncate success. tid[%u] pid[%u]", tid, pid); + return {}; + } + if (table->GetTableStat() == ::openmldb::storage::kMakingSnapshot) { + PDLOG(WARNING, "making snapshot task is running now. tid[%u] pid[%u]", tid, pid); + return {::openmldb::base::ReturnCode::kTableStatusIsKmakingsnapshot, "table status is kMakingSnapshot"}; + } else if (table->GetTableStat() == ::openmldb::storage::kLoading) { + PDLOG(WARNING, "table is loading now. tid[%u] pid[%u]", tid, pid); + return {::openmldb::base::ReturnCode::kTableIsLoading, "table is loading data"}; + } + if (table->GetStorageMode() == openmldb::common::kMemory) { + auto table_meta = table->GetTableMeta(); + std::shared_ptr
new_table; + new_table = std::make_shared(*table_meta); + if (!new_table->Init()) { + PDLOG(WARNING, "fail to init table. tid %u, pid %u", tid, pid); + return {::openmldb::base::ReturnCode::kTableMetaIsIllegal, "fail to init table"}; + } + new_table->SetTableStat(::openmldb::storage::kNormal); + { + std::lock_guard spin_lock(spin_mutex_); + tables_[tid].insert_or_assign(pid, new_table); + } + auto mem_snapshot = std::dynamic_pointer_cast(snapshot); + mem_snapshot->Truncate(replicator->GetOffset(), replicator->GetLeaderTerm()); + if (table_meta->mode() == ::openmldb::api::TableMode::kTableLeader) { + if (catalog_->AddTable(*table_meta, new_table)) { + LOG(INFO) << "add table " << table_meta->name() << " to catalog with db " << table_meta->db(); + } else { + LOG(WARNING) << "fail to add table " << table_meta->name() + << " to catalog with db " << table_meta->db(); + return {::openmldb::base::ReturnCode::kCatalogUpdateFailed, "fail to update catalog"}; + } + } + } else { + auto disk_table = std::dynamic_pointer_cast(table); + if (auto status = disk_table->Truncate(); !status.OK()) { + return {::openmldb::base::ReturnCode::kTruncateTableFailed, status.GetMsg()}; + } + snapshot_pool_.AddTask(boost::bind(&TabletImpl::MakeSnapshotInternal, this, tid, pid, 0, nullptr, true)); + } + PDLOG(INFO, "truncate table success. tid[%u] pid[%u]", tid, pid); + return {}; +} + void TabletImpl::ExecuteGc(RpcController* controller, const ::openmldb::api::ExecuteGcRequest* request, ::openmldb::api::GeneralResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); @@ -3798,13 +3902,11 @@ int TabletImpl::CreateTableInternal(const ::openmldb::api::TableMeta* table_meta return -1; } std::string table_db_path = GetDBPath(db_root_path, tid, pid); - Table* table_ptr; if (table_meta->storage_mode() == openmldb::common::kMemory) { - table_ptr = new MemTable(*table_meta); + table = std::make_shared(*table_meta); } else { - table_ptr = new DiskTable(*table_meta, table_db_path); + table = std::make_shared(*table_meta, table_db_path); } - table.reset(table_ptr); if (!table->Init()) { PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid()); diff --git a/src/tablet/tablet_impl.h b/src/tablet/tablet_impl.h index 7207b3ab8bd..83135ad72e6 100644 --- a/src/tablet/tablet_impl.h +++ b/src/tablet/tablet_impl.h @@ -109,6 +109,9 @@ class TabletImpl : public ::openmldb::api::TabletServer { void DropTable(RpcController* controller, const ::openmldb::api::DropTableRequest* request, ::openmldb::api::DropTableResponse* response, Closure* done); + void TruncateTable(RpcController* controller, const ::openmldb::api::TruncateTableRequest* request, + ::openmldb::api::TruncateTableResponse* response, Closure* done); + void Refresh(RpcController* controller, const ::openmldb::api::RefreshRequest* request, ::openmldb::api::GeneralResponse* response, Closure* done); @@ -308,7 +311,7 @@ class TabletImpl : public ::openmldb::api::TabletServer { int CreateTableInternal(const ::openmldb::api::TableMeta* table_meta, std::string& msg); // NOLINT void MakeSnapshotInternal(uint32_t tid, uint32_t pid, uint64_t end_offset, - std::shared_ptr<::openmldb::api::TaskInfo> task); + std::shared_ptr<::openmldb::api::TaskInfo> task, bool is_force); void SendSnapshotInternal(const std::string& endpoint, uint32_t tid, uint32_t pid, uint32_t remote_tid, std::shared_ptr<::openmldb::api::TaskInfo> task); @@ -327,6 +330,8 @@ class TabletImpl : public ::openmldb::api::TabletServer { uint32_t partition_num, uint64_t last_time, std::shared_ptr<::openmldb::api::TaskInfo> task); + base::Status TruncateTableInternal(uint32_t tid, uint32_t pid); + void ExtractIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table, std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot, const std::vector<::openmldb::common::ColumnKey>& column_key, diff --git a/src/tablet/tablet_impl_test.cc b/src/tablet/tablet_impl_test.cc index 0780e05af69..1a2de9e66d8 100644 --- a/src/tablet/tablet_impl_test.cc +++ b/src/tablet/tablet_impl_test.cc @@ -1423,6 +1423,42 @@ TEST_P(TabletImplTest, ScanWithLatestN) { ASSERT_FALSE(kv_it.Valid()); } +TEST_P(TabletImplTest, Truncate) { + ::openmldb::common::StorageMode storage_mode = GetParam(); + TabletImpl tablet; + uint32_t id = counter++; + tablet.Init(""); + ASSERT_EQ(0, CreateDefaultTable("db0", "t0", id, 1, 0, 0, kAbsoluteTime, storage_mode, &tablet)); + MockClosure closure; + for (int ts = 100; ts < 200; ts++) { + ::openmldb::api::PutRequest prequest; + PackDefaultDimension("test1", &prequest); + prequest.set_time(ts); + prequest.set_value(::openmldb::test::EncodeKV("test1", "test" + std::to_string(ts))); + prequest.set_tid(id); + prequest.set_pid(1); + ::openmldb::api::PutResponse presponse; + tablet.Put(NULL, &prequest, &presponse, &closure); + ASSERT_EQ(0, presponse.code()); + } + ::openmldb::api::TraverseRequest sr; + sr.set_tid(id); + sr.set_pid(1); + sr.set_limit(1000); + auto srp = std::make_shared<::openmldb::api::TraverseResponse>(); + tablet.Traverse(NULL, &sr, srp.get(), &closure); + ASSERT_EQ(0, srp->code()); + ASSERT_EQ(100, (signed)srp->count()); + ::openmldb::api::TruncateTableRequest tr; + tr.set_tid(id); + tr.set_pid(1); + auto trp = std::make_shared<::openmldb::api::TruncateTableResponse>(); + tablet.TruncateTable(NULL, &tr, trp.get(), &closure); + ASSERT_EQ(0, trp->code()); + tablet.Traverse(NULL, &sr, srp.get(), &closure); + ASSERT_EQ(0, srp->code()); + ASSERT_EQ(0, (signed)srp->count()); +} TEST_P(TabletImplTest, Traverse) { ::openmldb::common::StorageMode storage_mode = GetParam();