Skip to content

Commit

Permalink
Add coprocessor support in TiFlash (#161)
Browse files Browse the repository at this point in the history
* basic framework for coprocessor support in tiflash

* basic support for InterpreterDagRequestV2

* code refine

* tipb submodule use tipb master branch

* rewrite build flow in InterpreterDagRequest

* rename Dag to DAG

* Update tipb submodule

* basic support for selection/limit/topn executor in InterpreterDAGRequest

* basic support for selection/limit/topn executor in InterpreterDAGRequest (#150)

* merge pingcap/cop branch

* Code reorg

* Format

* merge pingcap/cop

* Refine code

* basic support for dag agg executor

* Code refine

* Refine code

* Another way of getting codec flag

* fix cop test regression (#157)

* fix cop test regression

* address comments

* format code

* fix npe during dag execute (#160)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* Add tipb cpp gen in build script

* Fix build error and adjust some formats

* Fix build error

* Fix build error

* Update flash configs

* Format

*  throw exception when meet error duing cop request handling (#162)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver (#166)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema (#167)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* Move flash/cop/dag to individual library

* DAG planner fix and mock dag request (#169)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Fix DAG get and lock storage

* handle error in cop request (#171)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

*  code refine && several minor bug fix (#174)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

* code refine && several minor bug fix

* address comments

* address comments

* Fix region id in mock dag

* support udf in (#175)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

* code refine && several minor bug fix

* address comments

* address comments

* support udf in

* refine code

* address comments

* address comments

* 1. fix decode literal expr error, 2. add all scalar function sig in scalar_func_map (#177)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* some bug fix (#179)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* 1. fix decode UInt literal error, 2. support mysqlDecimal type

* format code

* Support all DAG operator types in mock SQL -> DAG parser (#176)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* filter column must be uint8 in tiflash (#180)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* 1. fix decode UInt literal error, 2. support mysqlDecimal type

* format code

* filter column must be uint8 in tiflash

* address comments

* address comments

* address comments

* remove useless include

* 1. fix encode null error, 2. fix empty field type generated by TiFlash (#182)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* 1. fix decode UInt literal error, 2. support mysqlDecimal type

* format code

* filter column must be uint8 in tiflash

* address comments

* address comments

* address comments

* remove useless include

* 1. fix encode null error, 2. fix empty field type generated by TiFlash

* check validation of dag exprs field type (#183)

* check validation of dag exprs field type

* format code

* address comments

*  add more coprocessor mock tests (#185)

* check validation of dag exprs field type

* format code

* address comments

* add more filter test

* add data type tests

* remove useless comment

* disable decimal test

*  add some log about implicit cast (#188)

* check validation of dag exprs field type

* format code

* address comments

* add more filter test

* add data type tests

* remove useless comment

* disable decimal test

* add some log about implicit cast

* address comment

* Pass DAG tests after merging master (#199)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* Pass tests after merging master

* Fix date/datetime/bit encode error (#200)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* Pass tests after merging master

* Copy some changes from xufei

* Enable date/datetime test

* Enable date/datetime test

* Refine code

* Adjust date/datetime tiflash rep to UInt

* Fix datetime to Int

* Typo

* improve dag execution time collection (#202)

* improve dag execution time collection

* address comment

* update comments

* update comment

* update comment

*  column id in table scan operator may be -1 (#205)

* improve dag execution time collection

* address comment

* update comments

* update comment

* update comment

* column id in table scan operator may be -1

* column id in table scan operator may be -1

* quick fix for decimal encode (#210)

* quick fix for decimal encode

* address comments

* update comments

* support udf like with 3 arguments (#212)

* support udf like with 3 arguments

* address comments

* add some comments

* Flash-473 optimize date and datetime comparison (#221)

* support udf like with 3 arguments

* address comments

* add some comments

* Flash-473 optimize date and datetime comparison

* address comments

* FLASH-479 select from empty table throw error in tiflash (#223)

* 1. select from empty table throw error in tiflash, 2. add some logs, 3. disable timestamp literal in DAG request

* revert unrelated change

* Update flash service port

* fix bug in DAGBlockOutputStream (#230)

* FLASH-475: Support BATCH COMMANDS in flash service (#232)

* Initial batch command support

* Add config to control thread pool size

* Address comments

* FLASH-483: Combine raft service and flash service (#235)

* Combine raft service and flash service

* Address comment and fix build error

* Update configs

* Fix build error

* Fix test regression

* Fix null value bug in datum

* FLASH-490: Fix table scan with -1 column ID and no agg (#240)

* Fix table scan with -1 column ID and no agg

* Add break

* Remove useless includes

* Use dag context to store void ft instead of dag query source

* Fix decimal type reverse get

* Change adding smallest column to adding handle column, address comments

* throw error if the cop request is not based on full region scan (#247)

* throw error if the cop request is not based on full region scan

* format code

* FLASH-437 Support time zone in coprocessor (#259)

* do not allow timestamp literal in DAG request

* refine code

* fix cop date type encode error

* support tz info in DAG request

* address comments

* Address comment

* FLASH-489 support key condition for coprocessor query (#261)

* support key condition for coprocessor query

* add tests

* remove useless code

* check validation when build RPNElement for function in/notIn

* address comments

* address comments

* only return execute summaies if requested (#264)

* Refine service init (#265)

* FLASH-554 cop check range should be based on region range (#270)

* only return execute summaies if requested

* cop check range should be based on region range

* address comments

* add tests

* minor improve

* minor improve (#273)

* Fix mutex on timezone retrieval (#276)

* fix mutex contention

* add const ref

* Fix race condition of batch command handling (#277)

* address comment

* address comments

* address comments

* Fix NULL order for dag (#281)

* refine get actions in DAGExpressionAnalyzer, fix bug in dbgFuncCoprocessor (#282)

* remove duplicate agg funcs (#283)

* 1. remove duplicate agg funcs, 2. for column ref expr, change column_id to column_index since the value stored in column ref expr is not column id

* bug fix

* address comments

* Update dbms/src/Flash/BatchCommandsHandler.cpp

Co-Authored-By: JaySon <[email protected]>

* revert unnecessary changes
  • Loading branch information
zanmato1984 authored Oct 17, 2019
1 parent 8fbd0f8 commit 5771f0c
Show file tree
Hide file tree
Showing 103 changed files with 6,847 additions and 937 deletions.
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
[submodule "contrib/kvproto"]
path = contrib/kvproto
url = https://github.com/pingcap/kvproto.git
[submodule "contrib/tipb"]
path = contrib/tipb
url = https://github.com/pingcap/tipb.git
branch = master
[submodule "contrib/client-c"]
path = contrib/client-c
url = [email protected]:tikv/client-c.git
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ include (cmake/find_capnp.cmake)
include (cmake/find_llvm.cmake)
include (cmake/find_grpc.cmake)
include (cmake/find_kvproto.cmake)
include (cmake/find_tipb.cmake)


include (cmake/find_contrib_lib.cmake)
Expand Down
10 changes: 10 additions & 0 deletions cmake/find_tipb.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/tipb/cpp/tipb/select.pb.h")
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/tipb/proto/select.proto")
message (FATAL_ERROR "tipb cpp files in contrib/tipb is missing. Try go to contrib/tipb, and run ./generate_cpp.sh")
else()
message (FATAL_ERROR "tipb submodule in contrib/tipb is missing. Try run 'git submodule update --init --recursive', and go to contrib/tipb, and run ./generate_cpp.sh")
endif()
endif ()

message(STATUS "Using tipb: ${ClickHouse_SOURCE_DIR}/contrib/tipb/cpp")
1 change: 1 addition & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory (kvproto/cpp)
add_subdirectory (client-c)
add_subdirectory (tipb/cpp)

if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast")
Expand Down
1 change: 1 addition & 0 deletions contrib/tipb
Submodule tipb added at b2d318
3 changes: 3 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ add_headers_and_sources(dbms src/Storages/Page)
add_headers_and_sources(dbms src/Raft)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
add_headers_only(dbms src/Flash/Coprocessor)
add_headers_only(dbms src/Server)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
Expand Down Expand Up @@ -149,8 +150,10 @@ target_link_libraries (dbms
clickhouse_parsers
clickhouse_common_config
clickhouse_common_io
flash_service
kvproto
kv_client
tipb
${Protobuf_LIBRARIES}
gRPC::grpc++_unsecure
${CURL_LIBRARIES}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ add_subdirectory (AggregateFunctions)
add_subdirectory (Server)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Analyzers)
add_subdirectory (Analyzers)
add_subdirectory (Flash)
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ namespace ErrorCodes
extern const int SCHEMA_SYNC_ERROR = 10003;
extern const int SCHEMA_VERSION_ERROR = 10004;
extern const int DDL_ERROR = 10005;
extern const int COP_BAD_DAG_REQUEST = 10006;
}

}
11 changes: 11 additions & 0 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,15 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
to_time = to_my_time.toPackedUInt();
}

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone)
{
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
epoch += offset;
MyDateTime to_my_time(time_zone.toYear(epoch), time_zone.toMonth(epoch), time_zone.toDayOfMonth(epoch),
time_zone.toHour(epoch), time_zone.toMinute(epoch), time_zone.toSecond(epoch), from_my_time.micro_second);
to_time = to_my_time.toPackedUInt();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Common/MyTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ Field parseMyDateTime(const String & str);

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to);

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone);

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF
#define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1

#define DEFAULT_DAG_RECORDS_PER_CHUNK 64L

/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
*/
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/DataStreams/BlockStreamProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct BlockStreamProfileInfo
size_t rows = 0;
size_t blocks = 0;
size_t bytes = 0;
// execution time is the total time spent on current stream and all its children streams
// note that it is different from total_stopwatch.elapsed(), which includes not only the
// time spent on current stream and all its children streams, but also the time of its
// parent streams
UInt64 execution_time = 0;

using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>;

Expand All @@ -45,6 +50,8 @@ struct BlockStreamProfileInfo

void update(Block & block);

void updateExecutionTime(UInt64 time) { execution_time += time; }

/// Binary serialization and deserialization of main fields.
/// Writes only main fields i.e. fields that required by internal transmission protocol.
void read(ReadBuffer & in);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Block IProfilingBlockInputStream::read()
if (isCancelledOrThrowIfKilled())
return res;

auto start_time = info.total_stopwatch.elapsed();

if (!checkTimeLimit())
limit_exceeded_need_break = true;

Expand Down Expand Up @@ -83,6 +85,7 @@ Block IProfilingBlockInputStream::read()
}
#endif

info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
return res;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataTypes/DataTypeMyDateTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ DataTypeMyDateTime::DataTypeMyDateTime(int fraction_)
{
fraction = fraction_;
if (fraction < 0 || fraction > 6)
throw Exception("fraction must >= 0 and < 6", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("fraction must >= 0 and <= 6", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

void DataTypeMyDateTime::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
Expand Down
115 changes: 69 additions & 46 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <DataStreams/StringStreamBlockInputStream.h>
#include <Debug/ClusterManage.h>
#include <Debug/DBGInvoker.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Debug/dbgFuncMockTiDBData.h>
#include <Debug/dbgFuncMockTiDBTable.h>
#include <Debug/dbgFuncRegion.h>
Expand All @@ -30,48 +31,50 @@ void dbgFuncSleep(Context &, const ASTs & args, DBGInvoker::Printer output)

DBGInvoker::DBGInvoker()
{
regFunc("echo", dbgFuncEcho);
regSchemalessFunc("echo", dbgFuncEcho);
// TODO: remove this, use sleep in bash script
regFunc("sleep", dbgFuncSleep);

regFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
regFunc("drop_tidb_partition", MockTiDBTable::dbgFuncDropTiDBPartition);
regFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable);
regFunc("drop_tidb_db", MockTiDBTable::dbgFuncDropTiDBDB);
regFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable);
regFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable);
regFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable);
regFunc("rename_column_in_tidb_table", MockTiDBTable::dbgFuncRenameColumnInTiDBTable);
regFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);

regFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regFunc("raft_insert_row", dbgFuncRaftInsertRow);
regFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull);
regFunc("raft_insert_rows", dbgFuncRaftInsertRows);
regFunc("raft_update_rows", dbgFuncRaftUpdateRows);
regFunc("raft_delete_rows", dbgFuncRaftDelRows);
regFunc("raft_delete_row", dbgFuncRaftDeleteRow);

regFunc("put_region", dbgFuncPutRegion);
regFunc("region_snapshot", dbgFuncRegionSnapshot);
regFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData);

regFunc("try_flush", dbgFuncTryFlush);
regFunc("try_flush_region", dbgFuncTryFlushRegion);

regFunc("dump_all_region", dbgFuncDumpAllRegion);
regFunc("dump_all_mock_region", dbgFuncDumpAllMockRegion);

regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regFunc("refresh_schemas", dbgFuncRefreshSchemas);
regFunc("reset_schemas", dbgFuncResetSchemas);

regFunc("dump_region_table", ClusterManage::dumpRegionTable);
regFunc("find_region_by_range", ClusterManage::findRegionByRange);
regSchemalessFunc("sleep", dbgFuncSleep);

regSchemalessFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regSchemalessFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regSchemalessFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
regSchemalessFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable);
regSchemalessFunc("drop_tidb_db", MockTiDBTable::dbgFuncDropTiDBDB);
regSchemalessFunc("drop_tidb_partition", MockTiDBTable::dbgFuncDropTiDBPartition);
regSchemalessFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable);
regSchemalessFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable);
regSchemalessFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable);
regSchemalessFunc("rename_column_in_tidb_table", MockTiDBTable::dbgFuncRenameColumnInTiDBTable);
regSchemalessFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regSchemalessFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);

regSchemalessFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regSchemalessFunc("raft_insert_row", dbgFuncRaftInsertRow);
regSchemalessFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull);
regSchemalessFunc("raft_insert_rows", dbgFuncRaftInsertRows);
regSchemalessFunc("raft_update_rows", dbgFuncRaftUpdateRows);
regSchemalessFunc("raft_delete_rows", dbgFuncRaftDelRows);
regSchemalessFunc("raft_delete_row", dbgFuncRaftDeleteRow);

regSchemalessFunc("put_region", dbgFuncPutRegion);
regSchemalessFunc("region_snapshot", dbgFuncRegionSnapshot);
regSchemalessFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData);

regSchemalessFunc("try_flush", dbgFuncTryFlush);
regSchemalessFunc("try_flush_region", dbgFuncTryFlushRegion);

regSchemalessFunc("dump_all_region", dbgFuncDumpAllRegion);
regSchemalessFunc("dump_all_mock_region", dbgFuncDumpAllMockRegion);
regSchemalessFunc("dump_region_table", ClusterManage::dumpRegionTable);
regSchemalessFunc("find_region_by_range", ClusterManage::findRegionByRange);

regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas);
regSchemalessFunc("reset_schemas", dbgFuncResetSchemas);

regSchemafulFunc("dag", dbgFuncDAG);
regSchemafulFunc("mock_dag", dbgFuncMockDAG);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down Expand Up @@ -103,10 +106,25 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or
name = ori_name.substr(prefix_not_print_res.size(), ori_name.size() - prefix_not_print_res.size());
}

auto it = funcs.find(name);
if (it == funcs.end())
throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS);
BlockInputStreamPtr res;
auto it_schemaless = schemaless_funcs.find(name);
if (it_schemaless != schemaless_funcs.end())
res = invokeSchemaless(context, name, it_schemaless->second, args);
else
{
auto it_schemaful = schemaful_funcs.find(name);
if (it_schemaful != schemaful_funcs.end())
res = invokeSchemaful(context, name, it_schemaful->second, args);
else
throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS);
}

return print_res ? res : std::shared_ptr<StringStreamBlockInputStream>();
}

BlockInputStreamPtr DBGInvoker::invokeSchemaless(
Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args)
{
std::stringstream col_name;
col_name << name << "(";
for (size_t i = 0; i < args.size(); ++i)
Expand All @@ -119,9 +137,14 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or
std::shared_ptr<StringStreamBlockInputStream> res = std::make_shared<StringStreamBlockInputStream>(col_name.str());
Printer printer = [&](const std::string & s) { res->append(s); };

(it->second)(context, args, printer);
func(context, args, printer);

return print_res ? res : std::shared_ptr<StringStreamBlockInputStream>();
return res;
}

BlockInputStreamPtr DBGInvoker::invokeSchemaful(Context & context, const std::string &, const SchemafulDBGFunc & func, const ASTs & args)
{
return func(context, args);
}

} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Debug/DBGInvoker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ class DBGInvoker
{
public:
using Printer = std::function<void(const std::string &)>;
using DBGFunc = std::function<void(Context & context, const ASTs & args, Printer printer)>;
using SchemalessDBGFunc = std::function<void(Context & context, const ASTs & args, Printer printer)>;
using SchemafulDBGFunc = std::function<BlockInputStreamPtr(Context & context, const ASTs & args)>;

DBGInvoker();

void regFunc(const std::string & name, DBGFunc func) { funcs[name] = func; }
void regSchemalessFunc(const std::string & name, SchemalessDBGFunc func) { schemaless_funcs[name] = func; }
void regSchemafulFunc(const std::string & name, SchemafulDBGFunc func) { schemaful_funcs[name] = func; }

BlockInputStreamPtr invoke(Context & context, const std::string & ori_name, const ASTs & args);
BlockInputStreamPtr invokeSchemaless(Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args);
BlockInputStreamPtr invokeSchemaful(Context & context, const std::string & name, const SchemafulDBGFunc & func, const ASTs & args);

private:
std::unordered_map<std::string, DBGFunc> funcs;
std::unordered_map<std::string, SchemalessDBGFunc> schemaless_funcs;
std::unordered_map<std::string, SchemafulDBGFunc> schemaful_funcs;
};

} // namespace DB
Loading

0 comments on commit 5771f0c

Please sign in to comment.