From 059f267385d65963005d84c57c98883574d6ddf2 Mon Sep 17 00:00:00 2001 From: xufei Date: Tue, 6 Aug 2019 14:33:32 +0800 Subject: [PATCH] fix cop test regression (#157) * fix cop test regression * address comments * format code --- dbms/src/Flash/Coprocessor/tests/cop_test.cpp | 10 +++++++--- dbms/src/Interpreters/InterpreterDAG.cpp | 18 +++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp index 79e65a34349..0ed89ec308a 100644 --- a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp +++ b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp @@ -71,11 +71,15 @@ grpc::Status rpcTest() tipb::Executor * executor = dagRequest.add_executors(); executor->set_tp(tipb::ExecType::TypeTableScan); tipb::TableScan * ts = executor->mutable_tbl_scan(); - ts->set_table_id(41); + ts->set_table_id(44); tipb::ColumnInfo * ci = ts->add_columns(); ci->set_column_id(1); + ci->set_tp(0xfe); + ci->set_flag(1); ci = ts->add_columns(); ci->set_column_id(2); + ci->set_tp(8); + ci->set_flag(1); dagRequest.add_output_offsets(1); dagRequest.add_output_offsets(0); dagRequest.add_output_offsets(1); @@ -95,7 +99,7 @@ grpc::Status rpcTest() col->set_val(ss.str()); value->set_tp(tipb::ExprType::Int64); ss.str(""); - DB::EncodeNumber(123, ss); + DB::EncodeNumber(888, ss); value->set_val(std::string(ss.str())); // agg: count(s) group by i; @@ -147,7 +151,7 @@ grpc::Status rpcTest() kvrpcpb::Context * ctx = request.mutable_context(); ctx->set_region_id(2); auto region_epoch = ctx->mutable_region_epoch(); - region_epoch->set_version(20); + region_epoch->set_version(21); region_epoch->set_conf_ver(2); request.set_tp(DAGREQUEST); request.set_data(dagRequest.SerializeAsString()); diff --git a/dbms/src/Interpreters/InterpreterDAG.cpp b/dbms/src/Interpreters/InterpreterDAG.cpp index 8b235a245cd..4e63ae5935c 100644 --- a/dbms/src/Interpreters/InterpreterDAG.cpp +++ b/dbms/src/Interpreters/InterpreterDAG.cpp @@ -46,7 +46,19 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) } TableID table_id = ts.table_id(); // TODO: Get schema version from DAG request. - getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION); + if (context.getSettingsRef().schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION) + { + storage = context.getTMTContext().getStorages().get(table_id); + if (storage == nullptr) + { + throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); + } + table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__); + } + else + { + getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION); + } Names required_columns; for (const tipb::ColumnInfo & ci : ts.columns()) @@ -290,7 +302,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false); } - if (storage->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) + if (storage_->getData().merging_params.mode != MergeTreeData::MergingParams::Txn) throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + std::to_string(table_id) + " is not allowed", ErrorCodes::LOGICAL_ERROR); @@ -299,7 +311,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64 auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__); /// Check schema version. - auto storage_schema_version = storage->getTableInfo().schema_version; + auto storage_schema_version = storage_->getTableInfo().schema_version; if (storage_schema_version > schema_version) throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version),