diff --git a/dbms/src/Debug/MockSchemaSyncer.cpp b/dbms/src/Debug/MockSchemaSyncer.cpp index 9addcfcc148..ef0f2966da4 100644 --- a/dbms/src/Debug/MockSchemaSyncer.cpp +++ b/dbms/src/Debug/MockSchemaSyncer.cpp @@ -261,13 +261,8 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table) auto & tmt_context = context.getTMTContext(); /// Get table schema json. - auto table_id = table->id(); - - String table_info_json = table->table_info.serialize(false); - - LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << " info json: " << table_info_json); - - TableInfo table_info(table_info_json, false); + TableInfo table_info = table->table_info; + auto table_id = table_info.id; auto storage = tmt_context.getStorages().get(table_id); diff --git a/dbms/src/Debug/dbgFuncSchema.cpp b/dbms/src/Debug/dbgFuncSchema.cpp index dc45d067e76..5431f9baaad 100644 --- a/dbms/src/Debug/dbgFuncSchema.cpp +++ b/dbms/src/Debug/dbgFuncSchema.cpp @@ -24,9 +24,15 @@ void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInv bool enable = safeGet(typeid_cast(*args[0]).value) == "true"; if (enable) - context.initializeSchemaSyncService(); + { + if (!context.getSchemaSyncService()) + context.initializeSchemaSyncService(); + } else - context.getSchemaSyncService().reset(); + { + if (context.getSchemaSyncService()) + context.getSchemaSyncService().reset(); + } std::stringstream ss; ss << "schema sync service " << (enable ? "enabled" : "disabled"); diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index f7f599da4d3..afe6faf59c6 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -1464,11 +1463,9 @@ void Context::initializeSchemaSyncService() shared->schema_sync_service = std::make_shared(*this); } -SchemaSyncServicePtr Context::getSchemaSyncService() +SchemaSyncServicePtr & Context::getSchemaSyncService() { auto lock = getLock(); - if (!shared->schema_sync_service) - throw Exception("Schema Sync Service is not initialized.", ErrorCodes::LOGICAL_ERROR); return shared->schema_sync_service; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index aad36ff4e9c..61d97e7fefa 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -371,7 +371,7 @@ class Context RaftService & getRaftService(); void initializeSchemaSyncService(); - SchemaSyncServicePtr getSchemaSyncService(); + SchemaSyncServicePtr & getSchemaSyncService(); void initializePartPathSelector(const std::vector & all_path); PartPathSelector & getPartPathSelector(); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index b9d883756e4..95e911ad04c 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -318,6 +318,25 @@ void StorageMergeTree::alter( const String & database_name, const String & table_name, const Context & context) +{ + alterInternal(params, database_name, table_name, std::nullopt, context); +} + +void StorageMergeTree::alterForTMT( + const AlterCommands & params, + const TiDB::TableInfo & table_info, + const String & database_name, + const Context & context) +{ + alterInternal(params, database_name, table_info.name, std::optional>(table_info), context); +} + +void StorageMergeTree::alterInternal( + const AlterCommands & params, + const String & database_name, + const String & table_name, + const std::optional> table_info, + const Context & context) { /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. auto merge_blocker = merger.merges_blocker.cancel(); @@ -379,6 +398,8 @@ void StorageMergeTree::alter( context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier); setColumns(std::move(new_columns)); + if (table_info) + setTableInfo(table_info->get()); if (primary_key_is_modified) { @@ -397,40 +418,6 @@ void StorageMergeTree::alter( data.loadDataParts(false); } -void StorageMergeTree::alterForTMT( - const AlterCommands & params, - const TiDB::TableInfo & table_info, - const String & database_name, - const Context & context) -{ - const String & table_name = table_info.name; - /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. - auto merge_blocker = merger.merges_blocker.cancel(); - - auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__); - - data.checkAlter(params); - - auto new_columns = data.getColumns(); - params.apply(new_columns); - -// std::vector transactions; - - auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__); - - IDatabase::ASTModifier storage_modifier = [this] (IAST & ast) - { - auto & storage_ast = typeid_cast(ast); - - auto literal = std::make_shared(Field(data.table_info->serialize(true))); - typeid_cast(*storage_ast.engine->arguments).children.back() = literal; - }; - - context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier); - setTableInfo(table_info); - setColumns(std::move(new_columns)); -} - /// While exists, marks parts as 'currently_merging' and reserves free space on filesystem. /// It's possible to mark parts before. struct CurrentlyMergingPartsTagger diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 8582e8232d5..6cdfe9ea03d 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -81,6 +81,8 @@ class StorageMergeTree : public ext::shared_ptr_helper, public void alterForTMT(const AlterCommands & params, const TiDB::TableInfo & table_info, const String & database_name, const Context & context); + void alterInternal(const AlterCommands & params, const String & database_name, const String & table_name, const std::optional> table_info, const Context & context); + bool checkTableCanBeDropped() const override; const TableInfo & getTableInfo() const; diff --git a/tests/mutable-test/txn_schema/alter.test b/tests/mutable-test/txn_schema/alter.test index c5500e9f68d..9fa67026043 100644 --- a/tests/mutable-test/txn_schema/alter.test +++ b/tests/mutable-test/txn_schema/alter.test @@ -5,9 +5,11 @@ => DBGInvoke __set_flush_threshold(1000000, 1000000) => DBGInvoke __mock_schema_syncer('true') +=> DBGInvoke __enable_schema_sync_service('false') + # Sync add column by checking missing column in CH when flushing. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => DBGInvoke __put_region(4, 0, 100, default, test) => select col_1 from default.test => DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') @@ -71,7 +73,7 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier # Sync add column and type change together by checking value overflow in CH when flushing. => DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 UInt8') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => DBGInvoke __modify_column_in_tidb_table(default, test, 'col_3 UInt64') => DBGInvoke __raft_insert_row(default, test, 4, 55, 0, 256) => DBGInvoke __add_column_to_tidb_table(default, test, 'col_4 Nullable(UInt8)') @@ -109,7 +111,7 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier │ -9223372036854775807 │ 18446744073709551615 │ 1 │ │ 9223372036854775807 │ 18446744073709551615 │ 1 │ └──────────────────────┴──────────────────────┴───────┘ -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => selraw nokvstore col_3 from default.test Received exception from server (version {#WORD}): Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier: col_3. diff --git a/tests/mutable-test/txn_schema/drop.test b/tests/mutable-test/txn_schema/drop.test index f1126511591..bb71443b30f 100644 --- a/tests/mutable-test/txn_schema/drop.test +++ b/tests/mutable-test/txn_schema/drop.test @@ -5,7 +5,7 @@ => DBGInvoke __mock_schema_syncer('true') => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => DBGInvoke __put_region(4, 0, 100, default, test) => select col_1 from default.test => DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)') @@ -17,7 +17,7 @@ Received exception from server (version {#WORD}): Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => select col_1, col_2 from default.test => DBGInvoke __drop_column_from_tidb_table(default, test, col_2) => DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2') @@ -28,7 +28,7 @@ Received exception from server (version {#WORD}): Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist.. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)') -=> DBGInvoke __refresh_schema(default, test) +=> DBGInvoke __refresh_schemas() => select col_1, col_2 from default.test => DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Nullable(Int64)') => DBGInvoke __raft_insert_row(default, test, 4, 52, 'test2', 256)