Skip to content

Commit

Permalink
Planner: Support Planner Interpreter (#5321)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise authored Aug 10, 2022
1 parent 45792ca commit f470b46
Show file tree
Hide file tree
Showing 94 changed files with 6,586 additions and 45 deletions.
17 changes: 17 additions & 0 deletions dbms/src/Common/TiFlashException.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ namespace DB
"This error usually occurs when the TiFlash server is busy or the TiFlash node is down.\n", \
""); \
) \
C(Planner, \
E(BadRequest, "Bad TiDB DAGRequest.", \
"This error is usually caused by incorrect TiDB DAGRequest. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Unimplemented, "Some features are unimplemented.", \
"This error may caused by unmatched TiDB and TiFlash versions, \n" \
"and should not occur in common case. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Internal, "TiFlash Planner internal error.", \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
) \
C(Table, \
E(SchemaVersionError, "Schema version of target table in TiFlash is different from that in query.", \
"TiFlash will sync the newest schema from TiDB before processing every query. \n" \
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ Block::Block(const ColumnsWithTypeAndName & data_)
}


Block::Block(const NamesAndTypes & names_and_types)
{
data.reserve(names_and_types.size());
for (const auto & name_and_type : names_and_types)
data.emplace_back(name_and_type.type, name_and_type.name);
initializeIndexByName();
}


void Block::initializeIndexByName()
{
for (size_t i = 0, size = data.size(); i < size; ++i)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class Block
Block() = default;
Block(std::initializer_list<ColumnWithTypeAndName> il);
explicit Block(const ColumnsWithTypeAndName & data_);
explicit Block(const NamesAndTypes & names_and_types);

/// insert the column at the specified position
void insert(size_t position, const ColumnWithTypeAndName & elem);
Expand Down
27 changes: 27 additions & 0 deletions dbms/src/Core/NamesAndTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadBuffer.h>
Expand All @@ -31,6 +32,32 @@ namespace ErrorCodes
extern const int THERE_IS_NO_COLUMN;
}

String dumpJsonStructure(const NamesAndTypes & names_and_types)
{
FmtBuffer bf;
bf.append("[");
bf.joinStr(
names_and_types.cbegin(),
names_and_types.cend(),
[](const auto & name_and_type, FmtBuffer & fb) {
fb.fmtAppend(
R"json({{"name":"{}","type":{}}})json",
name_and_type.name,
(name_and_type.type ? "\"" + name_and_type.type->getName() + "\"" : "null"));
},
", ");
bf.append("]");
return bf.toString();
}

Names toNames(const NamesAndTypes & names_and_types)
{
Names names;
names.reserve(names_and_types.size());
for (const auto & name_and_type : names_and_types)
names.push_back(name_and_type.name);
return names;
}

void NamesAndTypesList::readText(ReadBuffer & buf)
{
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Core/NamesAndTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct NameAndTypePair

using NamesAndTypes = std::vector<NameAndTypePair>;

String dumpJsonStructure(const NamesAndTypes & names_and_types);

Names toNames(const NamesAndTypes & names_and_types);

class NamesAndTypesList : public std::list<NameAndTypePair>
{
public:
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)

Expand All @@ -25,5 +27,6 @@ target_link_libraries(flash_service dbms)

if (ENABLE_TESTS)
add_subdirectory(Coprocessor/tests)
add_subdirectory(Planner/tests)
add_subdirectory(tests)
endif ()
8 changes: 1 addition & 7 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
Expand Down Expand Up @@ -117,13 +118,6 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down
6 changes: 2 additions & 4 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
#include <DataStreams/copyData.h>
#include <Flash/Coprocessor/DAGBlockOutputStream.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/StreamWriter.h>
#include <Flash/Coprocessor/StreamingDAGResponseWriter.h>
#include <Flash/Coprocessor/UnaryDAGResponseWriter.h>
#include <Flash/executeQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <pingcap/Exception.h>
Expand Down Expand Up @@ -89,10 +88,9 @@ void DAGDriver<batch>::execute()
try
{
auto start_time = Clock::now();
DAGQuerySource dag(context);
DAGContext & dag_context = *context.getDAGContext();

BlockIO streams = executeQuery(dag, context, internal, QueryProcessingStage::Complete);
BlockIO streams = executeQuery(context, internal, QueryProcessingStage::Complete);
if (!streams.in || streams.out)
// Only query is allowed, so streams.in must not be null and streams.out must be null
throw TiFlashException("DAG is not query.", Errors::Coprocessor::Internal);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class DAGExpressionAnalyzer : private boost::noncopyable

const Context & getContext() const { return context; }

void reset(const std::vector<NameAndTypePair> & source_columns_)
{
source_columns = source_columns_;
prepared_sets.clear();
}

const std::vector<NameAndTypePair> & getCurrentInputColumns() const;

DAGPreparedSets & getPreparedSets() { return prepared_sets; }
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
Expand Down Expand Up @@ -161,7 +162,7 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
{
if (context.columnsForTestEmpty() || context.columnsForTest(table_scan.getTableScanExecutorID()).empty())
{
auto names_and_types = genNamesAndTypes(table_scan);
auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
Expand All @@ -180,7 +181,7 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s

void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
const auto push_down_filter = PushDownFilter::toPushDownFilter(query_block.selection_name, query_block.selection);
const auto push_down_filter = PushDownFilter::pushDownFilterFrom(query_block.selection_name, query_block.selection);

DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams);
storage_interpreter.execute(pipeline);
Expand Down
45 changes: 45 additions & 0 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <common/types.h>
#include <tipb/executor.pb.h>

namespace DB
{
static constexpr std::string_view enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

struct FineGrainedShuffle
{
explicit FineGrainedShuffle(const tipb::Executor * executor)
: stream_count(executor ? executor->fine_grained_shuffle_stream_count() : 0)
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
{}

bool enable() const
{
return enableFineGrainedShuffle(stream_count);
}

const UInt64 stream_count;
const UInt64 batch_size;
};
} // namespace DB
34 changes: 25 additions & 9 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,50 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Storages/MutableSupport.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan)
namespace
{
DataTypePtr getPkType(const ColumnInfo & column_info)
{
const auto & pk_data_type = getDataTypeByColumnInfoForComputingLayer(column_info);
/// primary key type must be tidb_pk_column_int_type or tidb_pk_column_string_type.
RUNTIME_CHECK(
pk_data_type->equals(*MutableSupport::tidb_pk_column_int_type) || pk_data_type->equals(*MutableSupport::tidb_pk_column_string_type),
Exception(
fmt::format(
"Actual pk_data_type {} is not {} or {}",
pk_data_type->getName(),
MutableSupport::tidb_pk_column_int_type->getName(),
MutableSupport::tidb_pk_column_string_type->getName()),
ErrorCodes::LOGICAL_ERROR));
return pk_data_type;
}
} // namespace

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
TiDB::ColumnInfo column_info;
const auto & ci = table_scan.getColumns()[i];
column_info.tp = static_cast<TiDB::TP>(ci.tp());
column_info.id = ci.column_id();

auto column_info = TiDB::toTiDBColumnInfo(table_scan.getColumns()[i]);
switch (column_info.id)
{
case TiDBPkColumnID:
// TODO: need to check if the type of pk_handle_columns matches the type that used in delta merge tree.
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(MutableSupport::tidb_pk_column_name, getPkType(column_info));
break;
case ExtraTableIDColumnID:
names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type);
break;
default:
names_and_types.emplace_back(fmt::format("mock_table_scan_{}", i), getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(fmt::format("{}_{}", column_prefix, i), getDataTypeByColumnInfoForComputingLayer(column_info));
}
}
return names_and_types;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/StringRef.h>

namespace DB
{
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan);
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);
} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Flash/Coprocessor/PushDownFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,22 @@ tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor
}
}

PushDownFilter PushDownFilter::toPushDownFilter(const String & executor_id, const tipb::Executor * executor)
PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor)
{
if (!executor || !executor->has_selection())
{
return {"", {}};
}

return pushDownFilterFrom(executor_id, executor->selection());
}

PushDownFilter PushDownFilter::pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection)
{
std::vector<const tipb::Expr *> conditions;
for (const auto & condition : executor->selection().conditions())
for (const auto & condition : selection.conditions())
conditions.push_back(&condition);

return {executor_id, conditions};
}
} // namespace DB
} // namespace DB
8 changes: 6 additions & 2 deletions dbms/src/Flash/Coprocessor/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ namespace DB
{
struct PushDownFilter
{
static PushDownFilter toPushDownFilter(const String & executor_id, const tipb::Executor * executor);
static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Executor * executor);

static PushDownFilter pushDownFilterFrom(const String & executor_id, const tipb::Selection & selection);

PushDownFilter() = default;

PushDownFilter(
const String & executor_id_,
Expand All @@ -36,4 +40,4 @@ struct PushDownFilter
String executor_id;
std::vector<const tipb::Expr *> conditions;
};
} // namespace DB
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <fmt/core.h>
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
#include <Flash/Mpp/MinTSOScheduler.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Flash/executeQuery.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <fmt/core.h>
Expand Down Expand Up @@ -319,8 +319,7 @@ void MPPTask::preprocess()
{
auto start_time = Clock::now();
initExchangeReceivers();
DAGQuerySource dag(*context);
executeQuery(dag, *context, false, QueryProcessingStage::Complete);
executeQuery(*context);
auto end_time = Clock::now();
dag_context->compile_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count();
mpp_task_statistics.setCompileTimestamp(start_time, end_time);
Expand Down
Loading

0 comments on commit f470b46

Please sign in to comment.