Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alter tmt and pass schema tests #114

Merged
merged 15 commits into from
Jul 19, 2019
9 changes: 2 additions & 7 deletions dbms/src/Debug/MockSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,8 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table)
auto & tmt_context = context.getTMTContext();

/// Get table schema json.
auto table_id = table->id();

String table_info_json = table->table_info.serialize(false);

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Table " << table_id << " info json: " << table_info_json);

TableInfo table_info(table_info_json, false);
TableInfo table_info = table->table_info;
auto table_id = table_info.id;

auto storage = tmt_context.getStorages().get(table_id);

Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Debug/dbgFuncSchema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ void dbgFuncEnableSchemaSyncService(Context & context, const ASTs & args, DBGInv
bool enable = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value) == "true";

if (enable)
context.initializeSchemaSyncService();
{
if (!context.getSchemaSyncService())
context.initializeSchemaSyncService();
}
else
context.getSchemaSyncService().reset();
{
if (context.getSchemaSyncService())
context.getSchemaSyncService().reset();
}

std::stringstream ss;
ss << "schema sync service " << (enable ? "enabled" : "disabled");
Expand Down
5 changes: 1 addition & 4 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <Storages/Transaction/SchemaSyncService.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/PartPathSelector.h>
#include <Storages/CompressionSettingsSelector.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/Settings.h>
#include <Interpreters/RuntimeComponentsFactory.h>
Expand Down Expand Up @@ -1464,11 +1463,9 @@ void Context::initializeSchemaSyncService()
shared->schema_sync_service = std::make_shared<SchemaSyncService>(*this);
}

SchemaSyncServicePtr Context::getSchemaSyncService()
SchemaSyncServicePtr & Context::getSchemaSyncService()
{
auto lock = getLock();
if (!shared->schema_sync_service)
throw Exception("Schema Sync Service is not initialized.", ErrorCodes::LOGICAL_ERROR);
return shared->schema_sync_service;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class Context
RaftService & getRaftService();

void initializeSchemaSyncService();
SchemaSyncServicePtr getSchemaSyncService();
SchemaSyncServicePtr & getSchemaSyncService();

void initializePartPathSelector(const std::vector<std::string> & all_path);
PartPathSelector & getPartPathSelector();
Expand Down
55 changes: 21 additions & 34 deletions dbms/src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ void StorageMergeTree::alter(
const String & database_name,
const String & table_name,
const Context & context)
{
alterInternal(params, database_name, table_name, std::nullopt, context);
}

void StorageMergeTree::alterForTMT(
const AlterCommands & params,
const TiDB::TableInfo & table_info,
const String & database_name,
const Context & context)
{
alterInternal(params, database_name, table_info.name, std::optional<std::reference_wrapper<const TableInfo>>(table_info), context);
}

void StorageMergeTree::alterInternal(
const AlterCommands & params,
const String & database_name,
const String & table_name,
const std::optional<std::reference_wrapper<const TiDB::TableInfo>> table_info,
const Context & context)
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.merges_blocker.cancel();
Expand Down Expand Up @@ -379,6 +398,8 @@ void StorageMergeTree::alter(

context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier);
setColumns(std::move(new_columns));
if (table_info)
setTableInfo(table_info->get());

if (primary_key_is_modified)
{
Expand All @@ -397,40 +418,6 @@ void StorageMergeTree::alter(
data.loadDataParts(false);
}

void StorageMergeTree::alterForTMT(
const AlterCommands & params,
const TiDB::TableInfo & table_info,
const String & database_name,
const Context & context)
{
const String & table_name = table_info.name;
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
auto merge_blocker = merger.merges_blocker.cancel();

auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__);

data.checkAlter(params);

auto new_columns = data.getColumns();
params.apply(new_columns);

// std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;

auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__);

IDatabase::ASTModifier storage_modifier = [this] (IAST & ast)
{
auto & storage_ast = typeid_cast<ASTStorage &>(ast);

auto literal = std::make_shared<ASTLiteral>(Field(data.table_info->serialize(true)));
typeid_cast<ASTExpressionList &>(*storage_ast.engine->arguments).children.back() = literal;
};

context.getDatabase(database_name)->alterTable(context, table_name, new_columns, storage_modifier);
setTableInfo(table_info);
setColumns(std::move(new_columns));
}

/// While exists, marks parts as 'currently_merging' and reserves free space on filesystem.
/// It's possible to mark parts before.
struct CurrentlyMergingPartsTagger
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class StorageMergeTree : public ext::shared_ptr_helper<StorageMergeTree>, public

void alterForTMT(const AlterCommands & params, const TiDB::TableInfo & table_info, const String & database_name, const Context & context);

void alterInternal(const AlterCommands & params, const String & database_name, const String & table_name, const std::optional<std::reference_wrapper<const TiDB::TableInfo>> table_info, const Context & context);

bool checkTableCanBeDropped() const override;

const TableInfo & getTableInfo() const;
Expand Down
8 changes: 5 additions & 3 deletions tests/mutable-test/txn_schema/alter.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
=> DBGInvoke __set_flush_threshold(1000000, 1000000)
=> DBGInvoke __mock_schema_syncer('true')

=> DBGInvoke __enable_schema_sync_service('false')

# Sync add column by checking missing column in CH when flushing.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String')
=> DBGInvoke __refresh_schema(default, test)
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> select col_1 from default.test
=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)')
Expand Down Expand Up @@ -71,7 +73,7 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier

# Sync add column and type change together by checking value overflow in CH when flushing.
=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_3 UInt8')
=> DBGInvoke __refresh_schema(default, test)
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_3 UInt64')
=> DBGInvoke __raft_insert_row(default, test, 4, 55, 0, 256)
=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_4 Nullable(UInt8)')
Expand Down Expand Up @@ -109,7 +111,7 @@ Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier
│ -9223372036854775807 │ 18446744073709551615 │ 1 │
│ 9223372036854775807 │ 18446744073709551615 │ 1 │
└──────────────────────┴──────────────────────┴───────┘
=> DBGInvoke __refresh_schema(default, test)
=> DBGInvoke __refresh_schemas()
=> selraw nokvstore col_3 from default.test
Received exception from server (version {#WORD}):
Code: 47. DB::Exception: Received from {#WORD} DB::Exception: Unknown identifier: col_3.
Expand Down
6 changes: 3 additions & 3 deletions tests/mutable-test/txn_schema/drop.test
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
=> DBGInvoke __mock_schema_syncer('true')

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String')
=> DBGInvoke __refresh_schema(default, test)
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> select col_1 from default.test
=> DBGInvoke __add_column_to_tidb_table(default, test, 'col_2 Nullable(Int8)')
Expand All @@ -17,7 +17,7 @@ Received exception from server (version {#WORD}):
Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist..

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)')
=> DBGInvoke __refresh_schema(default, test)
=> DBGInvoke __refresh_schemas()
=> select col_1, col_2 from default.test
=> DBGInvoke __drop_column_from_tidb_table(default, test, col_2)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2')
Expand All @@ -28,7 +28,7 @@ Received exception from server (version {#WORD}):
Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist..

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)')
=> DBGInvoke __refresh_schema(default, test)
=> DBGInvoke __refresh_schemas()
=> select col_1, col_2 from default.test
=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Nullable(Int64)')
=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test2', 256)
Expand Down