Skip to content

Commit

Permalink
fix cop test regression (#157)
Browse files Browse the repository at this point in the history
* fix cop test regression

* address comments

* format code
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 6, 2019
1 parent bc25942 commit 059f267
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
10 changes: 7 additions & 3 deletions dbms/src/Flash/Coprocessor/tests/cop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,15 @@ grpc::Status rpcTest()
tipb::Executor * executor = dagRequest.add_executors();
executor->set_tp(tipb::ExecType::TypeTableScan);
tipb::TableScan * ts = executor->mutable_tbl_scan();
ts->set_table_id(41);
ts->set_table_id(44);
tipb::ColumnInfo * ci = ts->add_columns();
ci->set_column_id(1);
ci->set_tp(0xfe);
ci->set_flag(1);
ci = ts->add_columns();
ci->set_column_id(2);
ci->set_tp(8);
ci->set_flag(1);
dagRequest.add_output_offsets(1);
dagRequest.add_output_offsets(0);
dagRequest.add_output_offsets(1);
Expand All @@ -95,7 +99,7 @@ grpc::Status rpcTest()
col->set_val(ss.str());
value->set_tp(tipb::ExprType::Int64);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(123, ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(888, ss);
value->set_val(std::string(ss.str()));

// agg: count(s) group by i;
Expand Down Expand Up @@ -147,7 +151,7 @@ grpc::Status rpcTest()
kvrpcpb::Context * ctx = request.mutable_context();
ctx->set_region_id(2);
auto region_epoch = ctx->mutable_region_epoch();
region_epoch->set_version(20);
region_epoch->set_version(21);
region_epoch->set_conf_ver(2);
request.set_tp(DAGREQUEST);
request.set_data(dagRequest.SerializeAsString());
Expand Down
18 changes: 15 additions & 3 deletions dbms/src/Interpreters/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,19 @@ bool InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
}
TableID table_id = ts.table_id();
// TODO: Get schema version from DAG request.
getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION);
if (context.getSettingsRef().schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)
{
storage = context.getTMTContext().getStorages().get(table_id);
if (storage == nullptr)
{
throw Exception("Table " + std::to_string(table_id) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
}
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
}
else
{
getAndLockStorageWithSchemaVersion(table_id, DEFAULT_UNSPECIFIED_SCHEMA_VERSION);
}

Names required_columns;
for (const tipb::ColumnInfo & ci : ts.columns())
Expand Down Expand Up @@ -290,7 +302,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);
}

if (storage->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
if (storage_->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + std::to_string(table_id)
+ " is not allowed",
ErrorCodes::LOGICAL_ERROR);
Expand All @@ -299,7 +311,7 @@ void InterpreterDAG::getAndLockStorageWithSchemaVersion(TableID table_id, Int64
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
auto storage_schema_version = storage->getTableInfo().schema_version;
auto storage_schema_version = storage_->getTableInfo().schema_version;
if (storage_schema_version > schema_version)
throw Exception("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version)
+ " newer than query schema version " + std::to_string(schema_version),
Expand Down

0 comments on commit 059f267

Please sign in to comment.