From 16223d871d282e865e19f80b1a91dba6d4fe9830 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 5 Apr 2023 00:52:57 +0800 Subject: [PATCH] add GeneratedColumnPlaceholderInputStream (#6796) (#6813) close pingcap/tiflash#6801, ref pingcap/tidb#40663 --- ...neratedColumnPlaceholderBlockInputStream.h | 97 ++++++++++++ .../Coprocessor/DAGQueryBlockInterpreter.cpp | 2 + .../Coprocessor/DAGStorageInterpreter.cpp | 13 ++ .../Flash/Coprocessor/DAGStorageInterpreter.h | 2 + .../Flash/Coprocessor/InterpreterUtils.cpp | 18 +++ dbms/src/Flash/Coprocessor/InterpreterUtils.h | 6 + dbms/src/Storages/Transaction/TiDB.h | 3 +- .../fullstack-test/expr/generated_index.test | 147 ++++++++++++++++++ 8 files changed, 287 insertions(+), 1 deletion(-) create mode 100644 dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h create mode 100644 tests/fullstack-test/expr/generated_index.test diff --git a/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h b/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h new file mode 100644 index 00000000000..f220f1377d4 --- /dev/null +++ b/dbms/src/DataStreams/GeneratedColumnPlaceholderBlockInputStream.h @@ -0,0 +1,97 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class GeneratedColumnPlaceholderBlockInputStream : public IProfilingBlockInputStream +{ +public: + GeneratedColumnPlaceholderBlockInputStream( + const BlockInputStreamPtr & input, + const std::vector> & generated_column_infos_, + const String & req_id_) + : generated_column_infos(generated_column_infos_) + , log(Logger::get(req_id_)) + { + children.push_back(input); + } + + String getName() const override { return NAME; } + Block getHeader() const override + { + Block block = children.back()->getHeader(); + insertColumns(block, /*insert_data=*/false); + return block; + } + + static String getColumnName(UInt64 col_index) + { + return "generated_column_" + std::to_string(col_index); + } + +protected: + void readPrefix() override + { + RUNTIME_CHECK(!generated_column_infos.empty()); + // Validation check. + for (size_t i = 1; i < generated_column_infos.size(); ++i) + { + RUNTIME_CHECK(std::get<0>(generated_column_infos[i]) > std::get<0>(generated_column_infos[i - 1])); + } + } + + Block readImpl() override + { + Block block = children.back()->read(); + insertColumns(block, /*insert_data=*/true); + return block; + } + +private: + void insertColumns(Block & block, bool insert_data) const + { + if (!block) + return; + + for (const auto & ele : generated_column_infos) + { + const auto & col_index = std::get<0>(ele); + const auto & col_name = std::get<1>(ele); + const auto & data_type = std::get<2>(ele); + ColumnPtr column = nullptr; + if (insert_data) + column = data_type->createColumnConstWithDefaultValue(block.rows()); + else + column = data_type->createColumn(); + block.insert(col_index, ColumnWithTypeAndName{column, data_type, col_name}); + } + } + + static constexpr auto NAME = "GeneratedColumnPlaceholder"; + const std::vector> generated_column_infos; + const LoggerPtr log; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index a7d2aae92b7..d956fca4a84 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -178,6 +178,8 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s analyzer = std::make_unique(std::move(names_and_types), context); pipeline.streams.insert(pipeline.streams.end(), mock_table_scan_streams.begin(), mock_table_scan_streams.end()); } + + // Ignore handling GeneratedColumnPlaceholderBlockInputStream for now, because we don't support generated column in test framework. } diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 4c8adefebc3..d803bf8ddd1 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -375,6 +376,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) /// handle timezone/duration cast for local and remote table scan. executeCastAfterTableScan(remote_read_streams_start_index, pipeline); + /// handle generated column if necessary. + executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline); recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); /// handle pushed down filter for local and remote table scan. @@ -1041,8 +1044,18 @@ std::tuple> DAGStorageIn for (Int32 i = 0; i < table_scan.getColumnSize(); ++i) { auto const & ci = table_scan.getColumns()[i]; + auto tidb_ci = TiDB::toTiDBColumnInfo(ci); ColumnID cid = ci.column_id(); + if (tidb_ci.hasGeneratedColumnFlag()) + { + LOG_DEBUG(log, "got column({}) with generated column flag", i); + const auto & data_type = getDataTypeByColumnInfoForComputingLayer(tidb_ci); + const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i); + generated_column_infos.push_back(std::make_tuple(i, col_name, data_type)); + source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type}); + continue; + } // Column ID -1 return the handle column String name; if (cid == TiDBPkColumnID) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index efe78c25918..f97ca2cce6f 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -141,6 +141,8 @@ class DAGStorageInterpreter ManageableStoragePtr storage_for_logical_table; Names required_columns; NamesAndTypes source_columns; + // For generated column, just need a placeholder, and TiDB will fill this column. + std::vector> generated_column_infos; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index d71f8b073d1..5ac1c5dc669 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -194,4 +195,21 @@ void executeCreatingSets( log->identifier()); } } + +void executeGeneratedColumnPlaceholder( + size_t remote_read_streams_start_index, + const std::vector> & generated_column_infos, + LoggerPtr log, + DAGPipeline & pipeline) +{ + if (generated_column_infos.empty()) + return; + assert(remote_read_streams_start_index <= pipeline.streams.size()); + for (size_t i = 0; i < remote_read_streams_start_index; ++i) + { + auto & stream = pipeline.streams[i]; + stream = std::make_shared(stream, generated_column_infos, log->identifier()); + stream->setExtraInfo("generated column placeholder above table scan"); + } +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 87672e81dfa..0a3b392d95b 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -66,4 +66,10 @@ void executeCreatingSets( const Context & context, size_t max_streams, const LoggerPtr & log); + +void executeGeneratedColumnPlaceholder( + size_t remote_read_streams_start_index, + const std::vector> & generated_column_infos, + LoggerPtr log, + DAGPipeline & pipeline); } // namespace DB diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 9bd78abeed3..8ee49a5c015 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -117,7 +117,8 @@ enum TP M(NoDefaultValue, (1 << 12)) \ M(OnUpdateNow, (1 << 13)) \ M(PartKey, (1 << 14)) \ - M(Num, (1 << 15)) + M(Num, (1 << 15)) \ + M(GeneratedColumn, (1 << 23)) enum ColumnFlag { diff --git a/tests/fullstack-test/expr/generated_index.test b/tests/fullstack-test/expr/generated_index.test new file mode 100644 index 00000000000..f09f2c3fcde --- /dev/null +++ b/tests/fullstack-test/expr/generated_index.test @@ -0,0 +1,147 @@ +# Copyright 2023 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +mysql> drop table if exists test.t; +mysql> create table test.t(c1 varchar(100), c2 varchar(100)); +mysql> insert into test.t values('ABC', 'DEF'); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> alter table test.t add index idx2((lower(c2))); + +mysql> select /*+ nth_plan(1) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(2) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(3) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(4) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(5) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(6) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(7) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(8) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(9) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ +mysql> select /*+ nth_plan(10) */ * from test.t where lower(test.t.c2) = 'def'; ++------+------+ +| c1 | c2 | ++------+------+ +| ABC | DEF | ++------+------+ + +mysql> drop table if exists test.t; +mysql> create table test.t(id int, value int); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> create unique index uk on test.t((tidb_shard(id)), id); +mysql> select /*+ nth_paln(1) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(2) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(3) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(4) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(5) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(6) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(7) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(8) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(9) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+ +mysql> select /*+ nth_paln(10) */ max(value) from test.t; ++------------+ +| max(value) | ++------------+ +| NULL | ++------------+