Skip to content

Commit

Permalink
merge master to Feature branch: planner_refactory ref pingcap#4739
Browse files Browse the repository at this point in the history
  • Loading branch information
SeaRise committed Jul 18, 2022
1 parent d027469 commit 6a0c56b
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 34 deletions.
2 changes: 1 addition & 1 deletion contrib/tiflash-proxy
Submodule tiflash-proxy updated 83 files
+3 −1 .github/workflows/pr-ci.yml
+153 −1 Cargo.lock
+2 −0 Cargo.toml
+1 −1 components/engine_rocks/src/lib.rs
+96 −0 components/proxy_server/Cargo.toml
+91 −0 components/proxy_server/src/config.rs
+48 −0 components/proxy_server/src/lib.rs
+46 −6 components/proxy_server/src/proxy.rs
+1,845 −0 components/proxy_server/src/run.rs
+102 −0 components/proxy_server/src/setup.rs
+1 −1 components/proxy_server/src/util.rs
+1 −0 components/raftstore/Cargo.toml
+20 −0 components/raftstore/src/coprocessor/dispatcher.rs
+3 −0 components/raftstore/src/coprocessor/mod.rs
+16 −7 components/raftstore/src/engine_store_ffi/mod.rs
+179 −0 components/raftstore/src/engine_store_ffi/observer.rs
+10 −8 components/raftstore/src/engine_store_ffi/read_index_helper.rs
+2 −0 components/raftstore/src/engine_store_ffi/utils.rs
+0 −1 components/raftstore/src/store/bootstrap.rs
+6 −11 components/raftstore/src/store/fsm/apply.rs
+6 −0 components/server/Cargo.toml
+1 −36 components/server/src/lib.rs
+1 −252 components/server/src/server.rs
+5 −1 components/test_raftstore/src/cluster.rs
+2 −2 components/test_raftstore/src/pd.rs
+68 −0 engine_tiflash/Cargo.toml
+11 −0 engine_tiflash/src/cf_names.rs
+110 −0 engine_tiflash/src/cf_options.rs
+254 −0 engine_tiflash/src/compact.rs
+299 −0 engine_tiflash/src/compact_listener.rs
+25 −0 engine_tiflash/src/compat.rs
+359 −0 engine_tiflash/src/config.rs
+94 −0 engine_tiflash/src/db_options.rs
+39 −0 engine_tiflash/src/db_vector.rs
+98 −0 engine_tiflash/src/decode_properties.rs
+69 −0 engine_tiflash/src/encryption.rs
+401 −0 engine_tiflash/src/engine.rs
+83 −0 engine_tiflash/src/engine_iterator.rs
+189 −0 engine_tiflash/src/event_listener.rs
+107 −0 engine_tiflash/src/file_system.rs
+32 −0 engine_tiflash/src/flow_control_factors.rs
+133 −0 engine_tiflash/src/flow_listener.rs
+151 −0 engine_tiflash/src/import.rs
+120 −0 engine_tiflash/src/lib.rs
+38 −0 engine_tiflash/src/logger.rs
+607 −0 engine_tiflash/src/misc.rs
+76 −0 engine_tiflash/src/mvcc_properties.rs
+139 −0 engine_tiflash/src/options.rs
+36 −0 engine_tiflash/src/perf_context.rs
+401 −0 engine_tiflash/src/perf_context_impl.rs
+43 −0 engine_tiflash/src/perf_context_metrics.rs
+809 −0 engine_tiflash/src/properties.rs
+356 −0 engine_tiflash/src/raft_engine.rs
+216 −0 engine_tiflash/src/range_properties.rs
+20 −0 engine_tiflash/src/raw.rs
+225 −0 engine_tiflash/src/raw_util.rs
+1,639 −0 engine_tiflash/src/rocks_metrics.rs
+190 −0 engine_tiflash/src/rocks_metrics_defs.rs
+112 −0 engine_tiflash/src/snapshot.rs
+403 −0 engine_tiflash/src/sst.rs
+56 −0 engine_tiflash/src/sst_partitioner.rs
+76 −0 engine_tiflash/src/table_properties.rs
+195 −0 engine_tiflash/src/ttl_properties.rs
+225 −0 engine_tiflash/src/util.rs
+213 −0 engine_tiflash/src/write_batch.rs
+1 −0 mock-engine-store/Cargo.toml
+41 −2 mock-engine-store/src/lib.rs
+51 −0 new-mock-engine-store/Cargo.toml
+30 −0 new-mock-engine-store/src/config.rs
+1,010 −0 new-mock-engine-store/src/lib.rs
+1,210 −0 new-mock-engine-store/src/mock_cluster.rs
+509 −0 new-mock-engine-store/src/node.rs
+841 −0 new-mock-engine-store/src/transport_simulate.rs
+1 −1 raftstore-proxy/Cargo.toml
+2 −2 raftstore-proxy/src/lib.rs
+10 −0 tests/Cargo.toml
+0 −1 tests/failpoints/cases/mod.rs
+3 −3 tests/failpoints/cases/test_snap.rs
+2 −0 tests/proxy/ingest.rs
+12 −0 tests/proxy/mod.rs
+175 −0 tests/proxy/normal.rs
+45 −41 tests/proxy/proxy.rs
+1 −0 tests/proxy/util.rs
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::OctString, "cast"},
//{tipb::ScalarFuncSig::Ord, "cast"},
//{tipb::ScalarFuncSig::Quote, "cast"},
//{tipb::ScalarFuncSig::Repeat, "cast"},
{tipb::ScalarFuncSig::Repeat, "repeat"},
{tipb::ScalarFuncSig::Replace, "replaceAll"},
{tipb::ScalarFuncSig::ReverseUTF8, "reverseUTF8"},
{tipb::ScalarFuncSig::Reverse, "reverse"},
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Flash/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ BlockInputStreams Planner::execute()

bool Planner::isSupported(const DAGQueryBlock & query_block)
{
/// todo support fine grained shuffle
static auto disable_fine_frained_shuffle = [](const DAGQueryBlock & query_block) {
return !enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count())
&& (!query_block.exchange_sender || !enableFineGrainedShuffle(query_block.exchange_sender->fine_grained_shuffle_stream_count()));
};
return query_block.source
&& (query_block.source->tp() == tipb::ExecType::TypeProjection
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver);
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver)
&& disable_fine_frained_shuffle(query_block);
}

DAGContext & Planner::dagContext() const
Expand Down
23 changes: 14 additions & 9 deletions dbms/src/Flash/Planner/plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,39 @@ void PhysicalAggregation::transformImpl(DAGPipeline & pipeline, Context & contex
is_final_agg);

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
stream_with_non_joined_data,
pipeline.streams_with_non_joined_data,
params,
context.getFileProvider(),
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();
pipeline.firstStream() = std::move(stream);

// should record for agg before restore concurrency. See #3804.
recordProfileStreams(pipeline, context);
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log);
}
else
{
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
BlockInputStreams inputs;
if (!pipeline.streams.empty())
inputs.push_back(pipeline.firstStream());
else
pipeline.streams.resize(1);
if (stream_with_non_joined_data)
inputs.push_back(stream_with_non_joined_data);

if (!pipeline.streams_with_non_joined_data.empty())
inputs.push_back(pipeline.streams_with_non_joined_data.at(0));

pipeline.streams.resize(1);
pipeline.streams_with_non_joined_data.clear();

pipeline.firstStream() = std::make_shared<AggregatingBlockInputStream>(
std::make_shared<ConcatBlockInputStream>(inputs, log->identifier()),
params,
Expand Down
11 changes: 5 additions & 6 deletions dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ PhysicalPlanNodePtr PhysicalExchangeReceiver::build(
const String & executor_id,
const LoggerPtr & log)
{
const auto & mpp_exchange_receiver_map = context.getDAGContext()->getMPPExchangeReceiverMap();

auto it = mpp_exchange_receiver_map.find(executor_id);
if (unlikely(it == mpp_exchange_receiver_map.end()))
auto mpp_exchange_receiver = context.getDAGContext()->getMPPExchangeReceiver(executor_id);
if (unlikely(mpp_exchange_receiver == nullptr))
throw TiFlashException(
fmt::format("Can not find exchange receiver for {}", executor_id),
Errors::Planner::Internal);
/// todo support fine grained shuffle
assert(!enableFineGrainedShuffle(mpp_exchange_receiver->getFineGrainedShuffleStreamCount()));

const auto & mpp_exchange_receiver = it->second;
NamesAndTypes schema = toNamesAndTypes(mpp_exchange_receiver->getOutputSchema());
auto physical_exchange_receiver = std::make_shared<PhysicalExchangeReceiver>(
executor_id,
Expand All @@ -69,7 +68,7 @@ void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & c
auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id];
for (size_t i = 0; i < max_streams; ++i)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id);
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id, /*stream_id=*/0);
exchange_receiver_io_input_streams.push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, log->identifier());
stream->setExtraInfo("squashing after exchange receiver");
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con

RUNTIME_ASSERT(dag_context.isMPPTask() && dag_context.tunnel_set != nullptr, log, "exchange_sender only run in MPP");

/// todo support fine grained shuffle
int stream_id = 0;
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr>>(
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, false>>(
dag_context.tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_type,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dag_context);
dag_context,
0,
0);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
});
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/plans/PhysicalTopN.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void PhysicalTopN::transformImpl(DAGPipeline & pipeline, Context & context, size

executeExpression(pipeline, before_sort_actions, log, "before TopN");

orderStreams(pipeline, max_streams, order_descr, limit, context, log);
orderStreams(pipeline, max_streams, order_descr, limit, false, context, log);
}

void PhysicalTopN::finalize(const Names & parent_require)
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class PhysicalPlanTestRunner : public DB::tests::ExecutorTest
ASSERT_EQ(Poco::trim(expected_streams), Poco::trim(fb.toString()));
}

readAndAssertBlock(final_stream, expect_columns);
ASSERT_COLUMNS_EQ_R(readBlock(final_stream), expect_columns);
}

LoggerPtr log = Logger::get("PhysicalPlanTestRunner", "test_physical_plan");
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/Functions/tests/gtest_strings_format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ class StringFormat : public DB::tests::FunctionTest
FieldType(static_cast<Native>(-9999999), 4),
FieldType(static_cast<Native>(-3333330), 4)}),
createConstColumn<Nullable<Int16>>(4, 3)));
/// known issue https://github.com/pingcap/tiflash/issues/4891
/*
ASSERT_COLUMN_EQ(
createColumn<Nullable<String>>({"-999.9999", "-1,000", "-1,000", "-999.999900000000000000000000000000", "-999.99990", "-1,000.0", "-1,000.00"}),
executeFunction(
Expand All @@ -85,7 +83,6 @@ class StringFormat : public DB::tests::FunctionTest
1,
FieldType(static_cast<Native>(-9999999), 4)),
createConstColumn<Nullable<Int8>>(1, 3)));
*/
ASSERT_COLUMN_EQ(
createColumn<Nullable<String>>({"12,332.1000", "12,332", "12,332.300000000000000000000000000000", "-12,332.30000", "-1,000.0", "-333.33", {}}),
executeFunction(
Expand All @@ -111,8 +108,6 @@ class StringFormat : public DB::tests::FunctionTest
FieldType(static_cast<Native>(-9999999), 4),
FieldType(static_cast<Native>(-3333330), 4)}),
createConstColumn<Nullable<UInt16>>(4, 3)));
/// known issue https://github.com/pingcap/tiflash/issues/4891
/*
ASSERT_COLUMN_EQ(
createColumn<Nullable<String>>({"-999.9999", "-1,000", "-999.999900000000000000000000000000", "-999.99990", "-1,000.0", "-1,000.00"}),
executeFunction(
Expand All @@ -131,7 +126,6 @@ class StringFormat : public DB::tests::FunctionTest
1,
FieldType(static_cast<Native>(-9999999), 4)),
createConstColumn<Nullable<UInt8>>(1, 3)));
*/
}

template <typename Integer>
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,11 @@ struct Settings
M(SettingUInt64, async_pollers_per_cq, 200, "grpc async pollers per cqs") \
M(SettingUInt64, async_cqs, 1, "grpc async cqs") \
M(SettingUInt64, preallocated_request_count_per_poller, 20, "grpc preallocated_request_count_per_poller") \
\
M(SettingUInt64, manual_compact_pool_size, 1, "The number of worker threads to handle manual compact requests.") \
M(SettingUInt64, manual_compact_max_concurrency, 10, "Max concurrent tasks. It should be larger than pool size.") \
M(SettingUInt64, manual_compact_more_until_ms, 60000, "Continuously compact more segments until reaching specified elapsed time. If 0 is specified, only one segment will be compacted each round.") \
\
M(SettingBool, enable_planner, true, "Enable planner")
// clang-format on
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT};
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@
#if ENABLE_TIFLASH_PAGECTL
#include <Storages/Page/tools/PageCtl/PageStorageCtl.h>
#endif
#if ENABLE_TIFLASH_PAGECTL
#include <Storages/Page/tools/PageCtl/PageStorageCtl.h>
#endif
#include <Common/StringUtils/StringUtils.h>
#include <Server/DTTool/DTTool.h>

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/TestUtils/TiFlashTestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace DB::tests
{
std::unique_ptr<Context> TiFlashTestEnv::global_context = nullptr;

void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageRunMode ps_run_mode)
void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageRunMode ps_run_mode, uint64_t bg_thread_count)
{
// set itself as global context
global_context = std::make_unique<DB::Context>(DB::Context::createGlobal());
Expand All @@ -39,6 +39,11 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR
KeyManagerPtr key_manager = std::make_shared<MockKeyManager>(false);
global_context->initializeFileProvider(key_manager, false);

// initialize background & blockable background thread pool
Settings & settings = global_context->getSettingsRef();
global_context->initializeBackgroundPool(bg_thread_count == 0 ? settings.background_pool_size.get() : bg_thread_count);
global_context->initializeBlockableBackgroundPool(bg_thread_count == 0 ? settings.background_pool_size.get() : bg_thread_count);

// Theses global variables should be initialized by the following order
// 1. capacity
// 2. path pool
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TestUtils/TiFlashTestEnv.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TiFlashTestEnv

static Context getContext(const DB::Settings & settings = DB::Settings(), Strings testdata_path = {});

static void initializeGlobalContext(Strings testdata_path = {}, PageStorageRunMode ps_run_mode = PageStorageRunMode::ONLY_V3);
static void initializeGlobalContext(Strings testdata_path = {}, PageStorageRunMode ps_run_mode = PageStorageRunMode::ONLY_V3, uint64_t bg_thread_count = 0);
static Context & getGlobalContext() { return *global_context; }
static void shutdown();

Expand Down
27 changes: 27 additions & 0 deletions dbms/src/TestUtils/mockExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Debug/astToExecutor.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
Expand All @@ -23,6 +24,8 @@
#include <TestUtils/mockExecutor.h>
#include <tipb/executor.pb.h>

#include <unordered_set>

namespace DB::tests
{
ASTPtr buildColumn(const String & column_name)
Expand Down Expand Up @@ -92,6 +95,30 @@ std::shared_ptr<tipb::DAGRequest> DAGRequestBuilder::build(MockDAGRequestContext
return dag_request_ptr;
}

// Currently Sort and Window Executors don't support columnPrune.
// TODO: support columnPrume for Sort and Window.
void columnPrune(ExecutorPtr executor)
{
std::unordered_set<String> used_columns;
for (auto & schema : executor->output_schema)
used_columns.emplace(schema.first);
executor->columnPrune(used_columns);
}


// Split a DAGRequest into multiple QueryTasks which can be dispatched to multiple Compute nodes.
// Currently we don't support window functions.
QueryTasks DAGRequestBuilder::buildMPPTasks(MockDAGRequestContext & mock_context)
{
columnPrune(root);
// enable mpp
properties.is_mpp_query = true;
auto query_tasks = queryPlanToQueryTasks(properties, root, executor_index, mock_context.context);
root.reset();
executor_index = 0;
return query_tasks;
}

DAGRequestBuilder & DAGRequestBuilder::mockTable(const String & db, const String & table, const MockColumnInfoVec & columns)
{
assert(!columns.empty());
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/TestUtils/mockExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Core/ColumnsWithTypeAndName.h>
#include <Debug/astToExecutor.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <tipb/executor.pb.h>
Expand Down Expand Up @@ -62,6 +63,7 @@ class DAGRequestBuilder
}

std::shared_ptr<tipb::DAGRequest> build(MockDAGRequestContext & mock_context);
QueryTasks buildMPPTasks(MockDAGRequestContext & mock_context);

DAGRequestBuilder & mockTable(const String & db, const String & table, const MockColumnInfoVec & columns);
DAGRequestBuilder & mockTable(const MockTableName & name, const MockColumnInfoVec & columns);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashModeOnLogicalTable(
for (const auto & part_def : table_info->partition.definitions)
{
auto new_part_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper);
auto part_storage = tmt_context.getStorages().get(table_info->id);
auto part_storage = tmt_context.getStorages().get(new_part_table_info->id);
if (unlikely(part_storage == nullptr))
{
throw TiFlashException(fmt::format("miss table in TiFlash : {}", name_mapper.debugCanonicalName(*db_info, *new_part_table_info)),
Expand Down

0 comments on commit 6a0c56b

Please sign in to comment.